extend storage api to allow directly writing a stream to storage

this removes the need for temporary storages with some external storage backends.
The new method is added to a separate interface to maintain compatibility with
storage backends implementing the storage interface directly (without inheriting common)

Currently the interface is implemented for objectstorage based storages and local storage
and used by webdav uploads

Signed-off-by: Robin Appelman <robin@icewind.nl>
This commit is contained in:
Robin Appelman 2018-10-26 19:15:23 +02:00
parent 3c442e4138
commit 93de63777e
No known key found for this signature in database
GPG Key ID: 42B69D8A64526EFB
7 changed files with 251 additions and 47 deletions

View File

@ -164,14 +164,19 @@ class File extends Node implements IFile {
$this->changeLock(ILockingProvider::LOCK_EXCLUSIVE);
}
$target = $partStorage->fopen($internalPartPath, 'wb');
if ($target === false) {
\OC::$server->getLogger()->error('\OC\Files\Filesystem::fopen() failed', ['app' => 'webdav']);
// because we have no clue about the cause we can only throw back a 500/Internal Server Error
throw new Exception('Could not write file contents');
if ($partStorage->instanceOfStorage(Storage\IWriteStreamStorage::class)) {
$count = $partStorage->writeStream($internalPartPath, $data);
$result = $count > 0;
} else {
$target = $partStorage->fopen($internalPartPath, 'wb');
if ($target === false) {
\OC::$server->getLogger()->error('\OC\Files\Filesystem::fopen() failed', ['app' => 'webdav']);
// because we have no clue about the cause we can only throw back a 500/Internal Server Error
throw new Exception('Could not write file contents');
}
list($count, $result) = \OC_Helper::streamCopy($data, $target);
fclose($target);
}
list($count, $result) = \OC_Helper::streamCopy($data, $target);
fclose($target);
if ($result === false) {
$expected = -1;
@ -185,7 +190,7 @@ class File extends Node implements IFile {
// double check if the file was fully received
// compare expected and actual size
if (isset($_SERVER['CONTENT_LENGTH']) && $_SERVER['REQUEST_METHOD'] === 'PUT') {
$expected = (int) $_SERVER['CONTENT_LENGTH'];
$expected = (int)$_SERVER['CONTENT_LENGTH'];
if ($count !== $expected) {
throw new BadRequest('expected filesize ' . $expected . ' got ' . $count);
}
@ -219,7 +224,7 @@ class File extends Node implements IFile {
$renameOkay = $storage->moveFromStorage($partStorage, $internalPartPath, $internalPath);
$fileExists = $storage->file_exists($internalPath);
if ($renameOkay === false || $fileExists === false) {
\OC::$server->getLogger()->error('renaming part file to final file failed ($run: ' . ( $run ? 'true' : 'false' ) . ', $renameOkay: ' . ( $renameOkay ? 'true' : 'false' ) . ', $fileExists: ' . ( $fileExists ? 'true' : 'false' ) . ')', ['app' => 'webdav']);
\OC::$server->getLogger()->error('renaming part file to final file failed ($run: ' . ($run ? 'true' : 'false') . ', $renameOkay: ' . ($renameOkay ? 'true' : 'false') . ', $fileExists: ' . ($fileExists ? 'true' : 'false') . ')', ['app' => 'webdav']);
throw new Exception('Could not rename part file to final file');
}
} catch (ForbiddenException $ex) {
@ -246,7 +251,7 @@ class File extends Node implements IFile {
$this->header('X-OC-MTime: accepted');
}
}
if ($view) {
$this->emitPostHooks($exists);
}
@ -443,7 +448,7 @@ class File extends Node implements IFile {
//detect aborted upload
if (isset ($_SERVER['REQUEST_METHOD']) && $_SERVER['REQUEST_METHOD'] === 'PUT') {
if (isset($_SERVER['CONTENT_LENGTH'])) {
$expected = (int) $_SERVER['CONTENT_LENGTH'];
$expected = (int)$_SERVER['CONTENT_LENGTH'];
if ($bytesWritten !== $expected) {
$chunk_handler->remove($info['index']);
throw new BadRequest(

View File

@ -28,6 +28,7 @@ namespace OC\Files\ObjectStore;
use Icewind\Streams\CallbackWrapper;
use Icewind\Streams\IteratorDirectory;
use OC\Files\Cache\CacheEntry;
use OC\Files\Stream\CountReadStream;
use OCP\Files\ObjectStore\IObjectStore;
class ObjectStoreStorage extends \OC\Files\Storage\Common {
@ -382,41 +383,8 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common {
}
public function writeBack($tmpFile, $path) {
$stat = $this->stat($path);
if (empty($stat)) {
// create new file
$stat = array(
'permissions' => \OCP\Constants::PERMISSION_ALL - \OCP\Constants::PERMISSION_CREATE,
);
}
// update stat with new data
$mTime = time();
$stat['size'] = filesize($tmpFile);
$stat['mtime'] = $mTime;
$stat['storage_mtime'] = $mTime;
// run path based detection first, to use file extension because $tmpFile is only a random string
$mimetypeDetector = \OC::$server->getMimeTypeDetector();
$mimetype = $mimetypeDetector->detectPath($path);
if ($mimetype === 'application/octet-stream') {
$mimetype = $mimetypeDetector->detect($tmpFile);
}
$stat['mimetype'] = $mimetype;
$stat['etag'] = $this->getETag($path);
$fileId = $this->getCache()->put($path, $stat);
try {
//upload to object storage
$this->objectStore->writeObject($this->getURN($fileId), fopen($tmpFile, 'r'));
} catch (\Exception $ex) {
$this->getCache()->remove($path);
$this->logger->logException($ex, [
'app' => 'objectstore',
'message' => 'Could not create object ' . $this->getURN($fileId) . ' for ' . $path,
]);
throw $ex; // make this bubble up
}
$size = filesize($tmpFile);
$this->writeStream($path, fopen($tmpFile, 'r'), $size);
}
/**
@ -433,4 +401,60 @@ class ObjectStoreStorage extends \OC\Files\Storage\Common {
public function needsPartFile() {
return false;
}
public function file_put_contents($path, $data) {
$stream = fopen('php://temp', 'r+');
fwrite($stream, $data);
rewind($stream);
return $this->writeStream($path, $stream, strlen($data)) > 0;
}
public function writeStream(string $path, $stream, int $size = null): int {
$stat = $this->stat($path);
if (empty($stat)) {
// create new file
$stat = [
'permissions' => \OCP\Constants::PERMISSION_ALL - \OCP\Constants::PERMISSION_CREATE,
];
}
// update stat with new data
$mTime = time();
$stat['size'] = (int)$size;
$stat['mtime'] = $mTime;
$stat['storage_mtime'] = $mTime;
$mimetypeDetector = \OC::$server->getMimeTypeDetector();
$mimetype = $mimetypeDetector->detectPath($path);
$stat['mimetype'] = $mimetype;
$stat['etag'] = $this->getETag($path);
$fileId = $this->getCache()->put($path, $stat);
try {
//upload to object storage
if ($size === null) {
$countStream = CountReadStream::wrap($stream, function ($writtenSize) use ($fileId, &$size) {
$this->getCache()->update($fileId, [
'size' => $writtenSize
]);
$size = $writtenSize;
});
$this->objectStore->writeObject($this->getURN($fileId), $countStream);
if (is_resource($countStream)) {
fclose($countStream);
}
} else {
$this->objectStore->writeObject($this->getURN($fileId), $stream);
}
} catch (\Exception $ex) {
$this->getCache()->remove($path);
$this->logger->logException($ex, [
'app' => 'objectstore',
'message' => 'Could not create object ' . $this->getURN($fileId) . ' for ' . $path,
]);
throw $ex; // make this bubble up
}
return $size;
}
}

View File

@ -54,6 +54,7 @@ use OCP\Files\InvalidPathException;
use OCP\Files\ReservedWordException;
use OCP\Files\Storage\ILockingStorage;
use OCP\Files\Storage\IStorage;
use OCP\Files\Storage\IWriteStreamStorage;
use OCP\ILogger;
use OCP\Lock\ILockingProvider;
use OCP\Lock\LockedException;
@ -69,7 +70,7 @@ use OCP\Lock\LockedException;
* Some \OC\Files\Storage\Common methods call functions which are first defined
* in classes which extend it, e.g. $this->stat() .
*/
abstract class Common implements Storage, ILockingStorage {
abstract class Common implements Storage, ILockingStorage, IWriteStreamStorage {
use LocalTempFileTrait;
@ -809,4 +810,20 @@ abstract class Common implements Storage, ILockingStorage {
public function needsPartFile() {
return true;
}
/**
* fallback implementation
*
* @param string $path
* @param resource $stream
* @param int $size
* @return int
*/
public function writeStream(string $path, $stream, int $size = null): int {
$target = $this->fopen($path, 'w');
list($count, $result) = \OC_Helper::streamCopy($stream, $target);
fclose($stream);
fclose($target);
return $count;
}
}

View File

@ -462,4 +462,8 @@ class Local extends \OC\Files\Storage\Common {
return parent::moveFromStorage($sourceStorage, $sourceInternalPath, $targetInternalPath);
}
}
public function writeStream(string $path, $stream, int $size = null): int {
return (int)file_put_contents($this->getSourcePath($path), $stream);
}
}

View File

@ -0,0 +1,65 @@
<?php declare(strict_types=1);
/**
* @copyright Copyright (c) 2018 Robin Appelman <robin@icewind.nl>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
namespace OC\Files\Stream;
use Icewind\Streams\Wrapper;
class CountReadStream extends Wrapper {
/** @var int */
private $count;
/** @var callback */
private $callback;
public static function wrap($source, $callback) {
$context = stream_context_create(array(
'count' => array(
'source' => $source,
'callback' => $callback,
)
));
return Wrapper::wrapSource($source, $context, 'count', self::class);
}
public function dir_opendir($path, $options) {
return false;
}
public function stream_open($path, $mode, $options, &$opened_path) {
$context = $this->loadContext('count');
$this->callback = $context['callback'];
return true;
}
public function stream_read($count) {
$result = parent::stream_read($count);
$this->count += strlen($result);
return $result;
}
public function stream_close() {
$result = parent::stream_close();
call_user_func($this->callback, $this->count);
return $result;
}
}

View File

@ -0,0 +1,40 @@
<?php declare(strict_types=1);
/**
* @copyright Copyright (c) 2018 Robin Appelman <robin@icewind.nl>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
namespace OCP\Files\Storage;
/**
* Interface that adds the ability to write a stream directly to file
*
* @since 15.0.0
*/
interface IWriteStreamStorage extends IStorage {
/**
* Write the data from a stream to a file
*
* @param string $path
* @param resource $stream
* @param int|null $size the size of the stream if known in advance
* @return int the number of bytes written
* @since 15.0.0
*/
public function writeStream(string $path, $stream, int $size = null): int;
}

View File

@ -0,0 +1,49 @@
<?php declare(strict_types=1);
/**
* @copyright Copyright (c) 2018 Robin Appelman <robin@icewind.nl>
*
* @license GNU AGPL version 3 or any later version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
namespace Test\Files\Stream;
use OC\Files\Stream\CountReadStream;
use Test\TestCase;
class CountReadStreamTest extends TestCase {
private function getStream($data) {
$handle = fopen('php://temp', 'w+');
fwrite($handle, $data);
rewind($handle);
return $handle;
}
public function testBasicCount() {
$source = $this->getStream('foo');
$stream = CountReadStream::wrap($source, function ($size) {
$this->assertEquals(3, $size);
});
stream_get_contents($stream);
}
public function testLarger() {
$stream = CountReadStream::wrap(fopen(__DIR__ . '/../../../data/testimage.mp4', 'r'), function ($size) {
$this->assertEquals(383631, $size);
});
stream_get_contents($stream);
}
}