joblib.Parallel用于嵌套列表理解

时间:2021-10-28 00:14:03

I have a nested list comprehension that looks something like this:

我有一个嵌套列表理解,看起来像这样:

>>> nested = [[1, 2], [3, 4, 5]]
>>> [[sqrt(i) for i in j] for j in nested]
[[1.0, 1.4142135623730951], [1.7320508075688772, 2.0, 2.23606797749979]]

Is it possible to parellelize this using the standard joblib approach for embarrassingly parallel for loops? If so, what is the proper syntax for delayed?

是否有可能使用标准的joblib方法来解决这个问题?如果是这样,延迟的正确语法是什么?

As far as I can tell, the docs don't mention or give any example of nested inputs. I've tried a few naive implementations, to no avail:

据我所知,文档没有提及或给出任何嵌套输入的例子。我尝试了一些天真的实现,但无济于事:

>>> #this syntax fails:
>>> Parallel(n_jobs = 2) (delayed(sqrt)(i for i in j) for j in nested)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Python27\lib\site-packages\joblib\parallel.py", line 660, in __call__
    self.retrieve()
  File "C:\Python27\lib\site-packages\joblib\parallel.py", line 512, in retrieve
    self._output.append(job.get())
  File "C:\Python27\lib\multiprocessing\pool.py", line 558, in get
    raise self._value
pickle.PicklingError: Can't pickle <type 'generator'>: it's not found as __builtin__.generator
>>> #this syntax doesn't fail, but gives the wrong output:
>>> Parallel(n_jobs = 2) (delayed(sqrt)(i) for i in j for j in nested)
[1.7320508075688772, 1.7320508075688772, 2.0, 2.0, 2.23606797749979, 2.23606797749979]

If this is impossible, I can obviously restructure the list before and after passing it to Parallel. However, my actual list is long and each item is enormous, so doing so isn't ideal.

如果这是不可能的,我显然可以在将它传递给Parallel之前和之后对列表进行重组。但是,我的实际列表很长,每个项目都很庞大,所以这样做并不理想。

2 个解决方案

#1


I'm not quite sure what's happening in your second attempt, but the first one is clear to me: The expression in brackets behind the sqrt, (i for i in j) results in a "generator" object, which is passed to the parallel processing pipeline. Unfortunately, the output of a generator may be dependent on previous calls. In your case, it will provide the next element of j every time it's being called, but it might as well do some internal calculations which would mean that the different processes are dependent on each other, and that your results might become dependent on the order in which the parallel processes are executed. For that reason, the multiprocessing library refuses to continue.

我不太确定你的第二次尝试会发生什么,但第一次对我来说很清楚:sqrt后面的括号中的表达式(i for j in j)会产生一个“generator”对象,它被传递给并行处理管道。不幸的是,发电机的输出可能取决于先前的呼叫。在你的情况下,它将在每次被调用时提供j的下一个元素,但它也可能会进行一些内部计算,这意味着不同的进程相互依赖,并且结果可能依赖于顺序其中执行并行处理。因此,多处理库拒绝继续。

As I said, I'm not quite sure what's happening in the second example but it might just be that you managed to accidentally trick multiprocessing into doing exactly what it tried to avoid in the first case.

正如我所说,我不太确定第二个例子中发生了什么,但可能只是你设法意外地将多处理技巧转化为它在第一种情况下完全避免的做法。

Potential solutions:

1: Separate the levels of iteration

1:分离迭代级别

...for example, as j_n suggests, by defining a function which will iterate on the low-level lists. This is easy to implement but may not give you as much benefit from parallelization, depending on how long the individual lists turn out to be. It might also be an option to use non-parallel list comprehension for the outer loop but parallelize the inner one, or even both -- whether this is useful pretty much depends on the structure of your data.

...例如,正如j_n建议的那样,通过定义将在低级列表上迭代的函数。这很容易实现,但可能无法从并行化中获得多少好处,具体取决于单个列表的长度。对于外部循环使用非并行列表理解也可以是一个选项,但是并行化内部循环,甚至两者兼并 - 这是否有用取决于数据的结构。

2: Iterate on a linearised version of your nested list

2:迭代嵌套列表的线性化版本

That way, every single execution is done in parallel, but it means you need to "flatten" the list first and restructure it later.

这样,每次执行都是并行完成的,但这意味着您需要首先“展平”列表并稍后重新构建它。

This is easy if your nested list is regularly structured (i.e. if it contains n lists with m elements each:

如果您的嵌套列表是有规律的结构(即如果它包含n个列表,每个列表包含m个元素),这很容易:

making a numpy array from the nested list, like so:

从嵌套列表中创建一个numpy数组,如下所示:

import numpy as np
# convert to array -- only works well if you have a regular structure!
nested_arr = np.array(nested)
# the shape of the array, for later
shape = nested_arr.shape

# generate an (n*m) linear array from an (n, m) 2D one
linear = nested_arr.ravel()

# run the parallel calculation
results_lin = Parallel(n_jobs = 2) (delayed(sqrt)(e) for e in linear)

# get everything back into shape:
results = results_lin.reshape(shape)

Actually, this might be even simpler because np.nditer() iterate element-wise over a multidimensional array. I'm not sure whether it will cooperate with joblib and multiprocessing, though. If you have regular data (and you really just want to do nothing much more complicated than get a square root) , you should also consider simply using np.sqrt(nested_arr) -- this executes much quicker than iterating over a list of numbers and squaring them separately, by orders of magnitude!

实际上,这可能更简单,因为np.nditer()在多维数组上以元素方式迭代。不过,我不确定它是否会与joblib和多处理协同工作。如果你有常规数据(并且你真的只想做更复杂的事情而不是获得平方根),你还应该考虑使用np.sqrt(nested_arr) - 这比迭代数字列表要快得多按顺序分别对它们进行平方!

If your nested list is irregular, linearising becomes a bit more involved:

如果嵌套列表不规则,则线性化变得更加复杂:

# store lengths of the sub-lists
structure = [len(e) for e in nested]

# make one linear list
linlist = []
for l in nested:
    linlist.extend(l)

# finally run the parallel computation:
results_lin = Parallel(n_jobs = 2) (delayed(sqrt)(e) for e in linlist)

# ...and bring it all back into shape:
results = []
i = 0

for n in structure:
    results.append(results_lin[i:i+n])

Whether all this makes sense again depends on the amount of data you're treating, and the complexity of your list. With your simple example, obviously the sorting will take longer than computing square roots.

这一切是否有意义取决于您正在处理的数据量以及列表的复杂程度。使用您的简单示例,显然排序将比计算平方根需要更长的时间。

Do you really need parallelisation, though?

你真的需要并行化吗?

If all you're doing is simple maths on a large amount of numbers, consider using np.array. You can put arrays into most equations as if they were numbers, and calculations run much faster:

如果您所做的只是对大量数字的简单数学运算,请考虑使用np.array。您可以将数组放入大多数方程式中,就像它们是数字一样,并且计算运行得更快:

In [14]: time resl = [sqrt(e) for e in range(1000000)]
CPU times: user 2.1 s, sys: 194 ms, total: 2.29 s
Wall time: 2.19 s

In [15]: time res = np.sqrt(np.arange(1000000))
CPU times: user 10.4 ms, sys: 0 ns, total: 10.4 ms
Wall time: 10.1 ms

This is way faster than what the operation on the list could be accelerated to, even running parallel on 24 cores. (in fact, you'd need about 216 parallel processes to keep up with numpy, and I'm sure the computational effort of mutliprocessing distributing load to that many processes would doom the attempt anyway.

这比列表上的操作可以加速到更快,甚至可以在24个内核上并行运行。 (实际上,你需要大约216个并行进程才能跟上numpy,而且我确信mutliprocessing将负载分配给那么多进程的计算工作无论如何都会使尝试失败。

#2


As far as I understand it, you cannot enter an expression (like (i for i in j)) at that position - you put the argument to the function there. You can achieve what you want by writing a function that does the unpacking of the list, such as here:

据我所知,你不能在那个位置输入一个表达式(比如(i for i in j)) - 你把参数放在那里的函数中。您可以通过编写一个解压缩列表的函数来实现您想要的功能,例如:

def sqrt_n(j):
   return [i**i for i in j]

Parallel(n_jobs = 2) (delayed(sqrt_n)(j) for j in nested)

#1


I'm not quite sure what's happening in your second attempt, but the first one is clear to me: The expression in brackets behind the sqrt, (i for i in j) results in a "generator" object, which is passed to the parallel processing pipeline. Unfortunately, the output of a generator may be dependent on previous calls. In your case, it will provide the next element of j every time it's being called, but it might as well do some internal calculations which would mean that the different processes are dependent on each other, and that your results might become dependent on the order in which the parallel processes are executed. For that reason, the multiprocessing library refuses to continue.

我不太确定你的第二次尝试会发生什么,但第一次对我来说很清楚:sqrt后面的括号中的表达式(i for j in j)会产生一个“generator”对象,它被传递给并行处理管道。不幸的是,发电机的输出可能取决于先前的呼叫。在你的情况下,它将在每次被调用时提供j的下一个元素,但它也可能会进行一些内部计算,这意味着不同的进程相互依赖,并且结果可能依赖于顺序其中执行并行处理。因此,多处理库拒绝继续。

As I said, I'm not quite sure what's happening in the second example but it might just be that you managed to accidentally trick multiprocessing into doing exactly what it tried to avoid in the first case.

正如我所说,我不太确定第二个例子中发生了什么,但可能只是你设法意外地将多处理技巧转化为它在第一种情况下完全避免的做法。

Potential solutions:

1: Separate the levels of iteration

1:分离迭代级别

...for example, as j_n suggests, by defining a function which will iterate on the low-level lists. This is easy to implement but may not give you as much benefit from parallelization, depending on how long the individual lists turn out to be. It might also be an option to use non-parallel list comprehension for the outer loop but parallelize the inner one, or even both -- whether this is useful pretty much depends on the structure of your data.

...例如,正如j_n建议的那样,通过定义将在低级列表上迭代的函数。这很容易实现,但可能无法从并行化中获得多少好处,具体取决于单个列表的长度。对于外部循环使用非并行列表理解也可以是一个选项,但是并行化内部循环,甚至两者兼并 - 这是否有用取决于数据的结构。

2: Iterate on a linearised version of your nested list

2:迭代嵌套列表的线性化版本

That way, every single execution is done in parallel, but it means you need to "flatten" the list first and restructure it later.

这样,每次执行都是并行完成的,但这意味着您需要首先“展平”列表并稍后重新构建它。

This is easy if your nested list is regularly structured (i.e. if it contains n lists with m elements each:

如果您的嵌套列表是有规律的结构(即如果它包含n个列表,每个列表包含m个元素),这很容易:

making a numpy array from the nested list, like so:

从嵌套列表中创建一个numpy数组,如下所示:

import numpy as np
# convert to array -- only works well if you have a regular structure!
nested_arr = np.array(nested)
# the shape of the array, for later
shape = nested_arr.shape

# generate an (n*m) linear array from an (n, m) 2D one
linear = nested_arr.ravel()

# run the parallel calculation
results_lin = Parallel(n_jobs = 2) (delayed(sqrt)(e) for e in linear)

# get everything back into shape:
results = results_lin.reshape(shape)

Actually, this might be even simpler because np.nditer() iterate element-wise over a multidimensional array. I'm not sure whether it will cooperate with joblib and multiprocessing, though. If you have regular data (and you really just want to do nothing much more complicated than get a square root) , you should also consider simply using np.sqrt(nested_arr) -- this executes much quicker than iterating over a list of numbers and squaring them separately, by orders of magnitude!

实际上,这可能更简单,因为np.nditer()在多维数组上以元素方式迭代。不过,我不确定它是否会与joblib和多处理协同工作。如果你有常规数据(并且你真的只想做更复杂的事情而不是获得平方根),你还应该考虑使用np.sqrt(nested_arr) - 这比迭代数字列表要快得多按顺序分别对它们进行平方!

If your nested list is irregular, linearising becomes a bit more involved:

如果嵌套列表不规则,则线性化变得更加复杂:

# store lengths of the sub-lists
structure = [len(e) for e in nested]

# make one linear list
linlist = []
for l in nested:
    linlist.extend(l)

# finally run the parallel computation:
results_lin = Parallel(n_jobs = 2) (delayed(sqrt)(e) for e in linlist)

# ...and bring it all back into shape:
results = []
i = 0

for n in structure:
    results.append(results_lin[i:i+n])

Whether all this makes sense again depends on the amount of data you're treating, and the complexity of your list. With your simple example, obviously the sorting will take longer than computing square roots.

这一切是否有意义取决于您正在处理的数据量以及列表的复杂程度。使用您的简单示例,显然排序将比计算平方根需要更长的时间。

Do you really need parallelisation, though?

你真的需要并行化吗?

If all you're doing is simple maths on a large amount of numbers, consider using np.array. You can put arrays into most equations as if they were numbers, and calculations run much faster:

如果您所做的只是对大量数字的简单数学运算,请考虑使用np.array。您可以将数组放入大多数方程式中,就像它们是数字一样,并且计算运行得更快:

In [14]: time resl = [sqrt(e) for e in range(1000000)]
CPU times: user 2.1 s, sys: 194 ms, total: 2.29 s
Wall time: 2.19 s

In [15]: time res = np.sqrt(np.arange(1000000))
CPU times: user 10.4 ms, sys: 0 ns, total: 10.4 ms
Wall time: 10.1 ms

This is way faster than what the operation on the list could be accelerated to, even running parallel on 24 cores. (in fact, you'd need about 216 parallel processes to keep up with numpy, and I'm sure the computational effort of mutliprocessing distributing load to that many processes would doom the attempt anyway.

这比列表上的操作可以加速到更快,甚至可以在24个内核上并行运行。 (实际上,你需要大约216个并行进程才能跟上numpy,而且我确信mutliprocessing将负载分配给那么多进程的计算工作无论如何都会使尝试失败。

#2


As far as I understand it, you cannot enter an expression (like (i for i in j)) at that position - you put the argument to the function there. You can achieve what you want by writing a function that does the unpacking of the list, such as here:

据我所知,你不能在那个位置输入一个表达式(比如(i for i in j)) - 你把参数放在那里的函数中。您可以通过编写一个解压缩列表的函数来实现您想要的功能,例如:

def sqrt_n(j):
   return [i**i for i in j]

Parallel(n_jobs = 2) (delayed(sqrt_n)(j) for j in nested)