矢量化最小和最大切片可能吗?

时间:2022-08-09 18:55:58

Suppose that I have a NumPy array of integers.

假设我有一个NumPy整数数组。

arr = np.random.randint(0, 1000, 1000)

And I have two arrays lower and upper, which represent lower and upper bounds respectively on slices of arr. These intervals are overlapping and variable-length, however lowers and uppers are both guaranteed to be non-decreasing.

我有两个低位和高位数组,它们分别代表arr切片的下限和上限。这些间隔是重叠的并且是可变长度的,但是保证下降和鞋面都是不减小的。

lowers = np.array([0, 5, 132, 358, 566, 822])
uppers = np.array([45, 93, 189, 533, 800, 923])

I want to find the min and the max of each slice of arr defined by lowers and uppers, and store these in another array.

我想找到由降低器和鞋面定义的每个arr切片的最小值和最大值,并将它们存储在另一个数组中。

out_arr = np.empty((lowers.size, 2))

What is the most efficient way to do this? I'm worried there is not a vectorized approach, as I can't see how I get around indexing in a loop..

最有效的方法是什么?我担心没有矢量化的方法,因为我无法看到我如何绕过循环中的索引。


My current approach is just the straightforward

我目前的做法很简单

for i in range(lowers.size):
    arr_v = arr[lowers[i]:uppers[i]]
    out_arr[i,0] = np.amin(arr_v)
    out_arr[i,1] = np.amax(arr_v)

which leaves me with a desired result like

这让我得到了理想的结果

In [304]: out_arr
Out[304]: 

array([[  26.,  908.],
       [  18.,  993.],
       [   0.,  968.],
       [   3.,  999.],
       [   1.,  998.],
       [   0.,  994.]])

but this is far too slow on my actual data.

但这对我的实际数据来说太慢了。

3 个解决方案

#1


3  

Ok, here is how to at least down-size the original problem using np.minimum.reduceat:

好的,这里是如何使用np.minimum.reduceat至少缩小原始问题的大小:

lu = np.r_[lowers, uppers]
so = np.argsort(lu)
iso = np.empty_like(so)
iso[so] = np.arange(len(so))
cut = len(lowers)
lmin = np.minimum.reduceat(arr, lu[so])
for i in range(cut):
    print(min(lmin[iso[i]:iso[cut+i]]), min(arr[lowers[i]:uppers[i]]))

# 33 33
# 7 7
# 5 5
# 0 0
# 3 3
# 7 7

What this doesn't achieve is getting rid of the main loop but at least the data were reduced from a 1000 element array to a 12 element one.

这没有实现的是摆脱主循环,但至少数据从1000个元素数组减少到12个元素数组。

Update:

With small overlaps @Eric Hansen's own solutions are hard to beat. I'd still like to point out that if there are substantial overlaps then it may even be worthwhile to combine both methods. I don't have numba, so below is just a twopass version that combines my preprossing with Eric's pure numpy solution which also serves as a benchmark in the form of onepass:

由于小的重叠@Eric Hansen自己的解决方案很难被击败。我还是想指出,如果有重大的重叠,那么将这两种方法结合起来甚至是值得的。我没有numba,所以下面只是一个twopass版本,结合了我的prerossing与Eric的纯粹numpy解决方案,也作为onepass形式的基准:

import numpy as np
from timeit import timeit

def twopass(lowers, uppers, arr):
    lu = np.r_[lowers, uppers]
    so = np.argsort(lu)
    iso = np.empty_like(so)
    iso[so] = np.arange(len(so))
    cut = len(lowers)
    lmin = np.minimum.reduceat(arr, lu[so])
    return np.minimum.reduceat(lmin, iso.reshape(2,-1).T.ravel())[::2]

def onepass(lowers, uppers, arr):
    mixture = np.empty((lowers.size*2,), dtype=lowers.dtype) 
    mixture[::2] = lowers; mixture[1::2] = uppers
    return np.minimum.reduceat(arr, mixture)[::2]

arr = np.random.randint(0, 1000, 1000)
lowers = np.array([0, 5, 132, 358, 566, 822])
uppers = np.array([45, 93, 189, 533, 800, 923])

print('small')
for f in twopass, onepass:
    print('{:18s} {:9.6f} ms'.format(f.__name__, 
                                     timeit(lambda: f(lowers, uppers, arr),
                                            number=10)*100))

arr = np.random.randint(0, 1000, 10**6)
lowers = np.random.randint(0, 8*10**5, 10**4)
uppers = np.random.randint(2*10**5, 10**6, 10**4)
swap = lowers > uppers
lowers[swap], uppers[swap] = uppers[swap], lowers[swap]


print('large')
for f in twopass, onepass:
    print('{:18s} {:10.4f} ms'.format(f.__name__, 
                                     timeit(lambda: f(lowers, uppers, arr),
                                            number=10)*100))

Sample run:

small
twopass             0.030880 ms
onepass             0.005723 ms
large
twopass               74.4962 ms
onepass             3153.1575 ms

#2


1  

An improved version of my original attempt I came up with based on Paul Panzer's suggestion of reduceat is

根据Paul Panzer关于reduceat的建议,我提出的原始尝试的改进版本是

mixture = np.empty((lowers.size*2,), dtype=lowers.dtype) 
mixture[::2] = lowers; mixture[1::2] = uppers

np.column_stack((np.minimum.reduceat(arr, mixture)[::2],
                 np.maximum.reduceat(arr, mixture)[::2]))

On a sample size comparable to my actual data, this runs in 4.22 ms on my machine compared to my original solution taking 73 ms.

在与我的实际数据相当的样本大小上,在我的机器上运行时间为4.22毫秒,而原始解决方案需要73毫秒。

Even faster though is to just use Numba with my original solution

更快的是使用Numba和我原来的解决方案

from numba import jit

@jit
def get_res():
    out_arr = np.empty((lowers.size, 2))
    for i in range(lowers.size):
        arr_v = arr[lowers[i]:uppers[i]]
        out_arr[i,0] = np.amin(arr_v)
        out_arr[i,1] = np.amax(arr_v)
    return out_arr

which runs in 100 microseconds on my machine.

在我的机器上运行100微秒。

#3


-1  

The execution will be slow since inside the loop, the sub array is copied to an array and then the operations are carried out. You can avoid the entire loop by single line code

执行将很慢,因为在循环内部,子数组被复制到数组然后执行操作。您可以通过单行代码避免整个循环

out_array = np.array([(np.amin(arr[lowers[i]:uppers[i]]),np.amax(arr[lowers[i]:uppers[i]])) for i in range(lowers.size)])

#1


3  

Ok, here is how to at least down-size the original problem using np.minimum.reduceat:

好的,这里是如何使用np.minimum.reduceat至少缩小原始问题的大小:

lu = np.r_[lowers, uppers]
so = np.argsort(lu)
iso = np.empty_like(so)
iso[so] = np.arange(len(so))
cut = len(lowers)
lmin = np.minimum.reduceat(arr, lu[so])
for i in range(cut):
    print(min(lmin[iso[i]:iso[cut+i]]), min(arr[lowers[i]:uppers[i]]))

# 33 33
# 7 7
# 5 5
# 0 0
# 3 3
# 7 7

What this doesn't achieve is getting rid of the main loop but at least the data were reduced from a 1000 element array to a 12 element one.

这没有实现的是摆脱主循环,但至少数据从1000个元素数组减少到12个元素数组。

Update:

With small overlaps @Eric Hansen's own solutions are hard to beat. I'd still like to point out that if there are substantial overlaps then it may even be worthwhile to combine both methods. I don't have numba, so below is just a twopass version that combines my preprossing with Eric's pure numpy solution which also serves as a benchmark in the form of onepass:

由于小的重叠@Eric Hansen自己的解决方案很难被击败。我还是想指出,如果有重大的重叠,那么将这两种方法结合起来甚至是值得的。我没有numba,所以下面只是一个twopass版本,结合了我的prerossing与Eric的纯粹numpy解决方案,也作为onepass形式的基准:

import numpy as np
from timeit import timeit

def twopass(lowers, uppers, arr):
    lu = np.r_[lowers, uppers]
    so = np.argsort(lu)
    iso = np.empty_like(so)
    iso[so] = np.arange(len(so))
    cut = len(lowers)
    lmin = np.minimum.reduceat(arr, lu[so])
    return np.minimum.reduceat(lmin, iso.reshape(2,-1).T.ravel())[::2]

def onepass(lowers, uppers, arr):
    mixture = np.empty((lowers.size*2,), dtype=lowers.dtype) 
    mixture[::2] = lowers; mixture[1::2] = uppers
    return np.minimum.reduceat(arr, mixture)[::2]

arr = np.random.randint(0, 1000, 1000)
lowers = np.array([0, 5, 132, 358, 566, 822])
uppers = np.array([45, 93, 189, 533, 800, 923])

print('small')
for f in twopass, onepass:
    print('{:18s} {:9.6f} ms'.format(f.__name__, 
                                     timeit(lambda: f(lowers, uppers, arr),
                                            number=10)*100))

arr = np.random.randint(0, 1000, 10**6)
lowers = np.random.randint(0, 8*10**5, 10**4)
uppers = np.random.randint(2*10**5, 10**6, 10**4)
swap = lowers > uppers
lowers[swap], uppers[swap] = uppers[swap], lowers[swap]


print('large')
for f in twopass, onepass:
    print('{:18s} {:10.4f} ms'.format(f.__name__, 
                                     timeit(lambda: f(lowers, uppers, arr),
                                            number=10)*100))

Sample run:

small
twopass             0.030880 ms
onepass             0.005723 ms
large
twopass               74.4962 ms
onepass             3153.1575 ms

#2


1  

An improved version of my original attempt I came up with based on Paul Panzer's suggestion of reduceat is

根据Paul Panzer关于reduceat的建议,我提出的原始尝试的改进版本是

mixture = np.empty((lowers.size*2,), dtype=lowers.dtype) 
mixture[::2] = lowers; mixture[1::2] = uppers

np.column_stack((np.minimum.reduceat(arr, mixture)[::2],
                 np.maximum.reduceat(arr, mixture)[::2]))

On a sample size comparable to my actual data, this runs in 4.22 ms on my machine compared to my original solution taking 73 ms.

在与我的实际数据相当的样本大小上,在我的机器上运行时间为4.22毫秒,而原始解决方案需要73毫秒。

Even faster though is to just use Numba with my original solution

更快的是使用Numba和我原来的解决方案

from numba import jit

@jit
def get_res():
    out_arr = np.empty((lowers.size, 2))
    for i in range(lowers.size):
        arr_v = arr[lowers[i]:uppers[i]]
        out_arr[i,0] = np.amin(arr_v)
        out_arr[i,1] = np.amax(arr_v)
    return out_arr

which runs in 100 microseconds on my machine.

在我的机器上运行100微秒。

#3


-1  

The execution will be slow since inside the loop, the sub array is copied to an array and then the operations are carried out. You can avoid the entire loop by single line code

执行将很慢,因为在循环内部,子数组被复制到数组然后执行操作。您可以通过单行代码避免整个循环

out_array = np.array([(np.amin(arr[lowers[i]:uppers[i]]),np.amax(arr[lowers[i]:uppers[i]])) for i in range(lowers.size)])