如何在Java中使用多个线程迭代一个Collection,其中没有两个线程迭代在Collection的同一部分?

时间:2022-08-03 20:55:38

I need to iterate over a large ArrayList (~50,000 entries) and I need to use multiple threads to do this fairly quickly.

我需要迭代一个大的ArrayList(~50,000个条目),我需要使用多个线程来相当快地完成这个。

But I need each thread to start at a unique index so that no two threads ever iterate over the same part of the list. There will be a batchSize of 100 so each thread will loop from its startIndex to startIndex + 100.

但是我需要每个线程从一个唯一索引开始,这样就不会有两个线程迭代在列表的同一部分上。将有一个100的batchSize,因此每个线程将从其startIndex循环到startIndex + 100。

Is there any way to achieve this? Note that I am only performing read operations here, no writes. Each entry in the list is just a String which is actually an SQL query which I am then executing against a DB via JDBC.

有没有办法实现这个目标?请注意,我只在这里执行读取操作,没有写入。列表中的每个条目只是一个String,它实际上是一个SQL查询,然后我通过JDBC对DB执行。

3 个解决方案

#1


If you only intend to read the List, not mutate it, you can simply define your Runnable to take the List and a startIndex as constructor arguments. There's no danger to concurrently reading an ArrayList (even the same indices) as long as no threads modify it at the same time.

如果您只打算读取List,而不是改变它,您可以简单地定义Runnable以获取List和startIndex作为构造函数参数。只要没有线程同时修改它,就没有同时读取ArrayList(即使是相同的索引)的危险。

To be safe, be sure to wrap your ArrayList in a call to Collections.unmodifiableList() and pass that List to your Runnables. That way you can be confident the threads will not modify the backing ArrayList.

为了安全起见,请确保将ArrayList包装在对Collections.unmodifiableList()的调用中,并将该List传递给Runnables。这样您就可以确信线程不会修改后备ArrayList。

Alternatively, you can construct sublists in your main thread (with List.subList()) so that you don't need to pass the startIndex to each thread. However you still want to make the sublists unmodifiable before you do so. Six of one, half a dozen of the other.

或者,您可以在主线程中构建子列表(使用List.subList()),这样您就不需要将startIndex传递给每个线程。但是,在执行此操作之前,您仍希望使子列表不可修改。六分之一,另外六分之一。

Even better would be to use Guava's ImmutableList; it's naturally thread-safe.

更好的是使用Guava的ImmutableList;它天生就是线程安全的。

There's also parallel streams in Java 8, but take care with this solution; they're powerful, but easy to get wrong.

Java 8中也有并行流,但要注意这个解决方案;他们很强大,但很容易出错。

#2


If you use Java 8, look at list.stream().parallel()

如果您使用Java 8,请查看list.stream()。parallel()

For Java 7, use subList() outside of the threads to split the work into pieces. The threads should then just operate on such a sub-list. For most lists, subList() is a very efficient operation which doesn't copy the data. If the backing list is modified, then you get a ConcurrentModificationException

对于Java 7,使用线程外部的subList()将工作拆分为多个部分。然后,线程应该只在这样的子列表上运行。对于大多数列表,subList()是一种非常有效的操作,不会复制数据。如果修改了支持列表,则会收到ConcurrentModificationException

As the pumping the data to the threads, I suggest to look at the Executor API and Queues. Just put all the work pieces in the queue and let the executor figure everything out.

在将数据提供给线程时,我建议查看Executor API和Queues。只需将所有工件放入队列中,然后让执行者解决所有问题。

#3


Have an atomic variable:

有一个原子变量:

int nextBatch = 0;

Increment it every time a thread concumes a new batch:

每次线程包含新批次时增加它:

public synchronized int getNextBatch() {
    nextBatch += batchSize;
    if(nextBatch >= arraylist.size()) {
        // The end was reached
        return -1;
    }
    return nextBatch;
}

A thread will call this method and get the the range we will need to work on:

线程将调用此方法并获取我们需要处理的范围:

int start = getNextBatch();
if(start == -1) {
    // The end was reached
}
int end = Math.min(start + batchSize, arraylist.size);

// Iterate over its own range
for(int i = start; i < end; i++) {
    Object obj = arraylist.get(i);
    // Do something with obj
} 

#1


If you only intend to read the List, not mutate it, you can simply define your Runnable to take the List and a startIndex as constructor arguments. There's no danger to concurrently reading an ArrayList (even the same indices) as long as no threads modify it at the same time.

如果您只打算读取List,而不是改变它,您可以简单地定义Runnable以获取List和startIndex作为构造函数参数。只要没有线程同时修改它,就没有同时读取ArrayList(即使是相同的索引)的危险。

To be safe, be sure to wrap your ArrayList in a call to Collections.unmodifiableList() and pass that List to your Runnables. That way you can be confident the threads will not modify the backing ArrayList.

为了安全起见,请确保将ArrayList包装在对Collections.unmodifiableList()的调用中,并将该List传递给Runnables。这样您就可以确信线程不会修改后备ArrayList。

Alternatively, you can construct sublists in your main thread (with List.subList()) so that you don't need to pass the startIndex to each thread. However you still want to make the sublists unmodifiable before you do so. Six of one, half a dozen of the other.

或者,您可以在主线程中构建子列表(使用List.subList()),这样您就不需要将startIndex传递给每个线程。但是,在执行此操作之前,您仍希望使子列表不可修改。六分之一,另外六分之一。

Even better would be to use Guava's ImmutableList; it's naturally thread-safe.

更好的是使用Guava的ImmutableList;它天生就是线程安全的。

There's also parallel streams in Java 8, but take care with this solution; they're powerful, but easy to get wrong.

Java 8中也有并行流,但要注意这个解决方案;他们很强大,但很容易出错。

#2


If you use Java 8, look at list.stream().parallel()

如果您使用Java 8,请查看list.stream()。parallel()

For Java 7, use subList() outside of the threads to split the work into pieces. The threads should then just operate on such a sub-list. For most lists, subList() is a very efficient operation which doesn't copy the data. If the backing list is modified, then you get a ConcurrentModificationException

对于Java 7,使用线程外部的subList()将工作拆分为多个部分。然后,线程应该只在这样的子列表上运行。对于大多数列表,subList()是一种非常有效的操作,不会复制数据。如果修改了支持列表,则会收到ConcurrentModificationException

As the pumping the data to the threads, I suggest to look at the Executor API and Queues. Just put all the work pieces in the queue and let the executor figure everything out.

在将数据提供给线程时,我建议查看Executor API和Queues。只需将所有工件放入队列中,然后让执行者解决所有问题。

#3


Have an atomic variable:

有一个原子变量:

int nextBatch = 0;

Increment it every time a thread concumes a new batch:

每次线程包含新批次时增加它:

public synchronized int getNextBatch() {
    nextBatch += batchSize;
    if(nextBatch >= arraylist.size()) {
        // The end was reached
        return -1;
    }
    return nextBatch;
}

A thread will call this method and get the the range we will need to work on:

线程将调用此方法并获取我们需要处理的范围:

int start = getNextBatch();
if(start == -1) {
    // The end was reached
}
int end = Math.min(start + batchSize, arraylist.size);

// Iterate over its own range
for(int i = start; i < end; i++) {
    Object obj = arraylist.get(i);
    // Do something with obj
}