使用AWS S3 SDK for .NET从Amazon S3下载并行批处理文件

时间:2023-02-06 00:05:04

Problem: I would like to download 100 files in parallel from AWS S3 using their .NET SDK. The downloaded content should be stored in 100 memory streams (the files are small enough, and I can take it from there). I am geting confused between Task, IAsyncResult, Parallel.*, and other different approaches in .NET 4.0.

问题:我想使用他们的.NET SDK从AWS S3并行下载100个文件。下载的内容应存储在100个内存流中(文件足够小,我可以从那里获取)。我在Task,IAsyncResult,Parallel。*和.NET 4.0中的其他不同方法之间感到困惑。

If I try to solve the problem myself, off the top of my head I imagine something like this pseudocode: (edited to add types to some variables)

如果我试着自己解决这个问题,我想象一下像这样的伪代码:(编辑为一些变量添加类型)

using Amazon;
using Amazon.S3;
using Amazon.S3.Model;

AmazonS3 _s3 = ...;
IEnumerable<GetObjectRequest> requestObjects = ...;


// Prepare to launch requests
var asyncRequests = from rq in requestObjects 
    select _s3.BeginGetObject(rq,null,null);

// Launch requests
var asyncRequestsLaunched = asyncRequests.ToList();

// Prepare to finish requests
var responses = from rq in asyncRequestsLaunched 
    select _s3.EndGetRequest(rq);

// Finish requests
var actualResponses = responses.ToList();

// Fetch data
var data = actualResponses.Select(rp => {
    var ms = new MemoryStream(); 
    rp.ResponseStream.CopyTo(ms); 
    return ms;
});

This code launches 100 requests in parallel, which is good. However, there are two problems:

此代码并行启动100个请求,这很好。但是,有两个问题:

  1. The last statement will download files serially, not in parallel. There doesn't seem to be BeginCopyTo()/EndCopyTo() method on stream...
  2. 最后一个语句将按顺序下载文件,而不是并行下载。流上似乎没有BeginCopyTo()/ EndCopyTo()方法......
  3. The preceding statement will not let go until all requests have responded. In other words none of the files will start downloading until all of them start.
  4. 在所有请求都已响应之前,上述声明不会放弃。换句话说,在所有文件都启动之前,所有文件都不会开始下载。

So here I start thinking I am heading down the wrong path...

所以在这里我开始想我正在走错路......

Help?

帮帮我?

1 个解决方案

#1


21  

It's probably easier if you break the operation down into a method that will handle one request asynchronously and then call it 100 times.

如果将操作分解为一个异步处理一个请求然后再调用100次的方法,则可能更容易。

To start, let's identify the final result you want. Since what you'll be working with is a MemoryStream it means that you'll want to return a Task<MemoryStream> from your method. The signature will look something like this:

首先,让我们确定您想要的最终结果。因为您将使用的是MemoryStream,这意味着您将要从您的方法返回Task 。签名看起来像这样:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)

Because your AmazonS3 object implements the Asynchronous Design Pattern, you can use the FromAsync method on the TaskFactory class to generate a Task<T> from a class that implements the Asynchronous Design Pattern, like so:

因为您的AmazonS3对象实现了异步设计模式,所以您可以使用TaskFactory类上的FromAsync方法从实现异步设计模式的类生成Task ,如下所示:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)
{
    Task<GetObjectResponse> response = 
        Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
            s3.BeginGetObject, s3.EndGetObject, request, null);

    // But what goes here?

So you're already in a good place, you have a Task<T> which you can wait on or get a callback on when the call completes. However, you need to somehow translate the GetObjectResponse returned from the call to Task<GetObjectResponse> into a MemoryStream.

所以你已经处在一个好位置,你有一个Task ,你可以等待,或者在通话结束时收到回叫。但是,您需要以某种方式将从对Task 的调用返回的GetObjectResponse转换为MemoryStream。

To that end, you want to use the ContinueWith method on the Task<T> class. Think of it as the asynchronous version of the Select method on the Enumerable class, it's just a projection into another Task<T> except that each time you call ContinueWith, you are potentially creating a new Task that runs that section of code.

为此,您希望在Task 类上使用ContinueWith方法。可以把它想象成Enumerable类上的Select方法的异步版本,它只是对另一个Task 的投影,除了每次调用ContinueWith时,你可能会创建一个运行该段代码的新任务。

With that, your method looks like the following:

有了它,您的方法如下所示:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)
{
    // Start the task of downloading.
    Task<GetObjectResponse> response = 
        Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
            s3.BeginGetObject, s3.EndGetObject, request, null
        );

    // Translate.
    Task<MemoryStream> translation = response.ContinueWith(t => {
        using (Task<GetObjectResponse> resp = t ){
            var ms = new MemoryStream(); 
            t.Result.ResponseStream.CopyTo(ms); 
            return ms;
        } 
    });

    // Return the full task chain.
    return translation;
}

Note that in the above you can possibly call the overload of ContinueWith passing TaskContinuationOptions.ExecuteSynchronously, as it appears you are doing minimal work (I can't tell, the responses might be huge). In the cases where you are doing very minimal work where it would be detrimental to start a new task in order to complete the work, you should pass TaskContinuationOptions.ExecuteSynchronously so that you don't waste time creating new tasks for minimal operations.

请注意,在上面你可以调用ContinueWith的重载传递TaskContinuationOptions.ExecuteSynchronously,因为看起来你做的工作很少(我不知道,响应可能很大)。如果您正在进行非常小的工作而不必为了完成工作而启动新任务,则应该传递TaskContinuationOptions.ExecuteSynchronously,这样您就不会浪费时间为最少的操作创建新任务。

Now that you have the method that can translate one request into a Task<MemoryStream>, creating a wrapper that will process any number of them is simple:

现在您已经拥有了可以将一个请求转换为Task 的方法,创建一个可以处理任意数量的包装器的包装器很简单:

static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3,
    IEnumerable<GetObjectRequest> requests)
{
    // Just call Select on the requests, passing our translation into
    // a Task<MemoryStream>.
    // Also, materialize here, so that the tasks are "hot" when
    // returned.
    return requests.Select(r => GetMemoryStreamAsync(s3, r)).
        ToArray();
}

In the above, you simply take a sequence of your GetObjectRequest instances and it will return an array of Task<MemoryStream>. The fact that it returns a materialized sequence is important. If you don't materialize it before returning, then the tasks will not be created until the sequence is iterated through.

在上面,您只需获取一系列GetObjectRequest实例,它将返回一个Task 数组。返回物化序列的事实很重要。如果在返回之前没有实现它,则在迭代序列之前不会创建任务。

Of course, if you want this behavior, then by all means, just remove the call to .ToArray(), have the method return IEnumerable<Task<MemoryStream>> and then the requests will be made as you iterate through the tasks.

当然,如果你想要这种行为,那么一定要删除对.ToArray()的调用,让方法返回IEnumerable >,然后在迭代完成任务时进行请求。

From there, you can process them one at a time (using the Task.WaitAny method in a loop) or wait for all of them to be completed (by calling the Task.WaitAll method). An example of the latter would be:

从那里,您可以一次处理一个(在循环中使用Task.WaitAny方法)或等待所有这些完成(通过调用Task.WaitAll方法)。后者的一个例子是:

static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3, 
    IEnumerable<GetObjectRequest> requests)
{
    Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests);
    Task.WaitAll(tasks);
    return tasks.Select(t => t.Result).ToList();
}

Also, it should be mentioned that this is a pretty good fit for the Reactive Extensions framework, as this very well-suited towards an IObservable<T> implementation.

此外,应该提到的是,这非常适合Reactive Extensions框架,因为它非常适合IObservable 实现。

#1


21  

It's probably easier if you break the operation down into a method that will handle one request asynchronously and then call it 100 times.

如果将操作分解为一个异步处理一个请求然后再调用100次的方法,则可能更容易。

To start, let's identify the final result you want. Since what you'll be working with is a MemoryStream it means that you'll want to return a Task<MemoryStream> from your method. The signature will look something like this:

首先,让我们确定您想要的最终结果。因为您将使用的是MemoryStream,这意味着您将要从您的方法返回Task 。签名看起来像这样:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)

Because your AmazonS3 object implements the Asynchronous Design Pattern, you can use the FromAsync method on the TaskFactory class to generate a Task<T> from a class that implements the Asynchronous Design Pattern, like so:

因为您的AmazonS3对象实现了异步设计模式,所以您可以使用TaskFactory类上的FromAsync方法从实现异步设计模式的类生成Task ,如下所示:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)
{
    Task<GetObjectResponse> response = 
        Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
            s3.BeginGetObject, s3.EndGetObject, request, null);

    // But what goes here?

So you're already in a good place, you have a Task<T> which you can wait on or get a callback on when the call completes. However, you need to somehow translate the GetObjectResponse returned from the call to Task<GetObjectResponse> into a MemoryStream.

所以你已经处在一个好位置,你有一个Task ,你可以等待,或者在通话结束时收到回叫。但是,您需要以某种方式将从对Task 的调用返回的GetObjectResponse转换为MemoryStream。

To that end, you want to use the ContinueWith method on the Task<T> class. Think of it as the asynchronous version of the Select method on the Enumerable class, it's just a projection into another Task<T> except that each time you call ContinueWith, you are potentially creating a new Task that runs that section of code.

为此,您希望在Task 类上使用ContinueWith方法。可以把它想象成Enumerable类上的Select方法的异步版本,它只是对另一个Task 的投影,除了每次调用ContinueWith时,你可能会创建一个运行该段代码的新任务。

With that, your method looks like the following:

有了它,您的方法如下所示:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request)
{
    // Start the task of downloading.
    Task<GetObjectResponse> response = 
        Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
            s3.BeginGetObject, s3.EndGetObject, request, null
        );

    // Translate.
    Task<MemoryStream> translation = response.ContinueWith(t => {
        using (Task<GetObjectResponse> resp = t ){
            var ms = new MemoryStream(); 
            t.Result.ResponseStream.CopyTo(ms); 
            return ms;
        } 
    });

    // Return the full task chain.
    return translation;
}

Note that in the above you can possibly call the overload of ContinueWith passing TaskContinuationOptions.ExecuteSynchronously, as it appears you are doing minimal work (I can't tell, the responses might be huge). In the cases where you are doing very minimal work where it would be detrimental to start a new task in order to complete the work, you should pass TaskContinuationOptions.ExecuteSynchronously so that you don't waste time creating new tasks for minimal operations.

请注意,在上面你可以调用ContinueWith的重载传递TaskContinuationOptions.ExecuteSynchronously,因为看起来你做的工作很少(我不知道,响应可能很大)。如果您正在进行非常小的工作而不必为了完成工作而启动新任务,则应该传递TaskContinuationOptions.ExecuteSynchronously,这样您就不会浪费时间为最少的操作创建新任务。

Now that you have the method that can translate one request into a Task<MemoryStream>, creating a wrapper that will process any number of them is simple:

现在您已经拥有了可以将一个请求转换为Task 的方法,创建一个可以处理任意数量的包装器的包装器很简单:

static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3,
    IEnumerable<GetObjectRequest> requests)
{
    // Just call Select on the requests, passing our translation into
    // a Task<MemoryStream>.
    // Also, materialize here, so that the tasks are "hot" when
    // returned.
    return requests.Select(r => GetMemoryStreamAsync(s3, r)).
        ToArray();
}

In the above, you simply take a sequence of your GetObjectRequest instances and it will return an array of Task<MemoryStream>. The fact that it returns a materialized sequence is important. If you don't materialize it before returning, then the tasks will not be created until the sequence is iterated through.

在上面,您只需获取一系列GetObjectRequest实例,它将返回一个Task 数组。返回物化序列的事实很重要。如果在返回之前没有实现它,则在迭代序列之前不会创建任务。

Of course, if you want this behavior, then by all means, just remove the call to .ToArray(), have the method return IEnumerable<Task<MemoryStream>> and then the requests will be made as you iterate through the tasks.

当然,如果你想要这种行为,那么一定要删除对.ToArray()的调用,让方法返回IEnumerable >,然后在迭代完成任务时进行请求。

From there, you can process them one at a time (using the Task.WaitAny method in a loop) or wait for all of them to be completed (by calling the Task.WaitAll method). An example of the latter would be:

从那里,您可以一次处理一个(在循环中使用Task.WaitAny方法)或等待所有这些完成(通过调用Task.WaitAll方法)。后者的一个例子是:

static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3, 
    IEnumerable<GetObjectRequest> requests)
{
    Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests);
    Task.WaitAll(tasks);
    return tasks.Select(t => t.Result).ToList();
}

Also, it should be mentioned that this is a pretty good fit for the Reactive Extensions framework, as this very well-suited towards an IObservable<T> implementation.

此外,应该提到的是,这非常适合Reactive Extensions框架,因为它非常适合IObservable 实现。