Azure和Flysystem中Async客户端的GuzzleHttp并行进度

问题描述 投票:0回答:1

我想获取实际的区块进度,而不是所有传输的进度。目前,我不知道如何检测每个单独传输的blockId。当前正在检索的进度回调信息是毫无意义的。

这里是进度功能,包含在ServiceRestProxy.php中

原始功能https://github.com/Azure/azure-storage-php/blob/master/azure-storage-common/src/Common/Internal/ServiceRestProxy.php#L99

/**
 * Create a Guzzle client for future usage.
 *
 * @param  array $options Optional parameters for the client.
 *
 * @return Client
 */
private static function createClient(array $options)
{
    $verify = true;
    //Disable SSL if proxy has been set, and set the proxy in the client.
    $proxy = getenv('HTTP_PROXY');
    // For testing with Fiddler
    // $proxy = 'localhost:8888';
    // $verify = false;
    if (!empty($proxy)) {
        $options['proxy'] = $proxy;
    }

    if (!empty($options['verify'])) {
        $verify = $options['verify'];
    }

    $downloadTotal = 0;

    return (new \GuzzleHttp\Client(
        array_merge(
            $options,
            array(
                "defaults" => array(
                    "allow_redirects" => true,
                    "exceptions" => true,
                    "decode_content" => true,
                ),
                'cookies' => true,
                'verify' => $verify,
                'progress' => function (
                    $downloadTotal,
                    $downloadedBytes,
                    $uploadTotal,
                    $uploadedBytes
                ){
                    // i need to detect which block the progress is for.
                    echo ("progress: download: {$downloadedBytes}/{$downloadTotal}, upload: {$uploadedBytes}/{$uploadTotal}");
                }
            )
        )
    ));
}
azure guzzle progress transfer flysystem
1个回答
0
投票

我有一个解决方案来使每个块都取得进展。

我需要为此使用异步功能。更新版本。

/**
 * Send the requests concurrently. Number of concurrency can be modified
 * by inserting a new key/value pair with the key 'number_of_concurrency'
 * into the $requestOptions of $serviceOptions. Return only the promise.
 *
 * @param  callable       $generator   the generator function to generate
 *                                     request upon fulfillment
 * @param  int            $statusCode  The expected status code for each of the
 *                                     request generated by generator.
 * @param  ServiceOptions $options     The service options for the concurrent
 *                                     requests.
 *
 * @return \GuzzleHttp\Promise\Promise|\GuzzleHttp\Promise\PromiseInterface
 */
protected function sendConcurrentAsync(
    callable $generator,
    $statusCode,
    ServiceOptions $options
) {
    $client = $this->client;
    $middlewareStack = $this->createMiddlewareStack($options);

    $progress = [];

    $sendAsync = function ($request, $options) use ($client, $progress) {
        if ($request->getMethod() == 'HEAD') {
            $options['decode_content'] = false;
        }

        $options["progress"] = function(
                $downloadTotal,
                $downloadedBytes,
                $uploadTotal,
                $uploadedBytes) use($request, $progress){
            // extract blockid from url
            $url = $request->getUri()->getQuery();
            parse_str($url, $array);

            // this array can be written to file or session etc
            $progress[$array["blockid"]] = ["download_total" => $downloadTotal, "downloaded_bytes" => $downloadedBytes, "upload_total" => $uploadTotal, "uploaded_bytes" => $uploadedBytes];

            };
        return $client->sendAsync($request, $options);
    };

    $handler = $middlewareStack->apply($sendAsync);

    $requestOptions = $this->generateRequestOptions($options, $handler);

    $promises = \call_user_func(
        function () use (
            $generator,
            $handler,
            $requestOptions
        ) {
            while (is_callable($generator) && ($request = $generator())) {
                yield \call_user_func($handler, $request, $requestOptions);
            }
        }
    );

    $eachPromise = new EachPromise($promises, [
        'concurrency' => $options->getNumberOfConcurrency(),
        'fulfilled' => function ($response, $index) use ($statusCode) {
            //the promise is fulfilled, evaluate the response
            self::throwIfError(
                $response,
                $statusCode
            );
        },
        'rejected' => function ($reason, $index) {
            //Still rejected even if the retry logic has been applied.
            //Throwing exception.
            throw $reason;
        }
    ]);

    return $eachPromise->promise();
}
© www.soinside.com 2019 - 2024. All rights reserved.