NumPy:Vectorize为另一个数组中的每个元素在数组中查找最接近的值

时间:2022-08-22 13:50:11

Input

known_array : numpy array; consisting of scalar values only; shape: (m, 1)

known_array:numpy数组;仅由标量值组成;形状:(m,1)

test_array : numpy array; consisting of scalar values only; shape: (n, 1)

test_array:numpy数组;仅由标量值组成;形状:(n,1)

Output

indices : numpy array; shape: (n, 1); For each value in test_array finds the index of the closest value in known_array

指数:numpy数组;形状:(n,1);对于test_array中的每个值,找到known_array中最接近值的索引

residual : numpy array; shape: (n, 1); For each value in test_array finds the difference from the closest value in known_array

残余:numpy数组;形状:(n,1);对于test_array中的每个值,都会找到与known_array中最接近的值的差异

Example

In [17]: known_array = np.array([random.randint(-30,30) for i in range(5)])

In [18]: known_array
Out[18]: array([-24, -18, -13, -30,  29])

In [19]: test_array = np.array([random.randint(-10,10) for i in range(10)])

In [20]: test_array
Out[20]: array([-6,  4, -6,  4,  8, -4,  8, -6,  2,  8])

Sample Implementation (Not fully vectorized)

def find_nearest(known_array, value):
    idx = (np.abs(known_array - value)).argmin()
    diff = known_array[idx] - value
    return [idx, -diff]

In [22]: indices = np.zeros(len(test_array))

In [23]: residual = np.zeros(len(test_array))

In [24]: for i in range(len(test_array)):
   ....:     [indices[i], residual[i]] =  find_nearest(known_array, test_array[i])
   ....:     

In [25]: indices
Out[25]: array([ 2.,  2.,  2.,  2.,  2.,  2.,  2.,  2.,  2.,  2.])

In [26]: residual
Out[26]: array([  7.,  17.,   7.,  17.,  21.,   9.,  21.,   7.,  15.,  21.])

What is the best way to speed up this task? Cython is an option, but, I would always prefer to be able to remove the for loop and let the code remain are pure NumPy.

加快这项任务的最佳方法是什么? Cython是一个选项,但是,我总是希望能够删除for循环并让代码保持纯粹的NumPy。


NB: Following Stack Overflow questions were consulted

注意:请参阅Stack Overflow问题

  1. Python/Numpy - Quickly Find the Index in an Array Closest to Some Value
  2. Python / Numpy - 快速查找最接近某些值的数组中的索引

  3. Find the index of numerically closest value
  4. 找到数值最接近的值的索引

  5. Find nearest value in numpy array
  6. 在numpy数组中查找最接近的值

  7. Finding the nearest value and return the index of array in Python
  8. 查找最近的值并返回Python中的数组索引

  9. finding nearest items across two lists/arrays in Python
  10. 在Python中查找两个列表/数组中最近的项目


Updates

I did some small benchmarks for comparing the non-vectorized and vectorized solution (accepted answer).

我做了一些小基准来比较非矢量化和矢量化解决方案(接受的答案)。

In [48]: [indices1, residual1] = find_nearest_vectorized(known_array, test_array)

In [53]: [indices2, residual2] = find_nearest_non_vectorized(known_array, test_array)

In [54]: indices1==indices2
Out[54]: array([ True,  True,  True,  True,  True,  True,  True,  True,  True,  True],   dtype=bool)

In [55]: residual1==residual2
Out[55]: array([ True,  True,  True,  True,  True,  True,  True,  True,  True,  True], dtype=bool)

In [56]: %timeit [indices2, residual2] = find_nearest_non_vectorized(known_array, test_array)
10000 loops, best of 3: 173 µs per loop

In [57]: %timeit [indices1, residual1] = find_nearest_vectorized(known_array, test_array)
100000 loops, best of 3: 16.8 µs per loop

About a 10-fold speedup!

加速大约10倍!

Clarification

known_array is not sorted.

known_array未排序。

I ran the benchmarks as given in the answer by @cyborg below.

我在下面的@cyborg的答案中给出了基准测试。

Case 1: If known_array were sorted

案例1:如果已对known_array进行了排序

known_array = np.arange(0,1000)
test_array = np.random.randint(0, 100, 10000)
print('Speedups:')
base_time = time_f('base')
for func_name in ['diffs', 'searchsorted1', 'searchsorted2']:
    print func_name + ' is x%.1f faster than base.' % (base_time / time_f(func_name))
    assert np.allclose(base(known_array, test_array), eval(func_name+'(known_array, test_array)'))

Speedups:
diffs is x0.4 faster than base.
searchsorted1 is x81.3 faster than base.
searchsorted2 is x107.6 faster than base.

Firstly, for large arrays diffs method is actually slower, it also eats up a lot of RAM and my system hanged when I ran it on actual data.

首先,对于大型数组,diffs方法实际上较慢,它也占用了大量RAM,当我在实际数据上运行时,我的系统被挂起。

Case 2 : When known_array is not sorted; which represents actual scenario

情况2:未对known_array进行排序时;这代表实际情况

known_array = np.random.randint(0,100,100)
test_array = np.random.randint(0, 100, 100)

Speedups:
diffs is x8.9 faster than base.
AssertionError                            Traceback (most recent call last)
<ipython-input-26-3170078c217a> in <module>()
      5 for func_name in ['diffs', 'searchsorted1', 'searchsorted2']:
      6     print func_name + ' is x%.1f faster than base.' % (base_time /  time_f(func_name))
----> 7     assert np.allclose(base(known_array, test_array),  eval(func_name+'(known_array, test_array)'))

AssertionError: 


searchsorted1 is x14.8 faster than base.

I must also comment that the approach should also be memory efficient. Otherwise my 8 GB of RAM is not sufficient. In the base case, it is easily sufficient.

我还必须评论该方法也应该是内存有效的。否则我的8 GB RAM是不够的。在基本情况下,它很容易就足够了。

3 个解决方案

#1


1  

For example, you can compute all the differences in on go with:

例如,您可以使用以下方式计算所有差异:

differences = (test_array.reshape(1,-1) - known_array.reshape(-1,1))

And use argmin and fancy indexing along with np.diagonal to get desired indices and differences:

并使用argmin和fancy索引以及np.diagonal来获得所需的索引和差异:

indices = np.abs(differences).argmin(axis=0)
residual = np.diagonal(differences[indices,])

So for

>>> known_array = np.array([-24, -18, -13, -30,  29])
>>> test_array = np.array([-6,  4, -6,  4,  8, -4,  8, -6,  2,  8])

One get

>>> indices
array([2, 2, 2, 2, 2, 2, 2, 2, 2, 2])
>>> residual
array([ 7, 17,  7, 17, 21,  9, 21,  7, 15, 21])

#2


6  

If the array is large, you should use searchsorted:

如果数组很大,则应使用searchsorted:

import numpy as np
np.random.seed(0)
known_array = np.random.rand(1000)
test_array = np.random.rand(400)

%%time
differences = (test_array.reshape(1,-1) - known_array.reshape(-1,1))
indices = np.abs(differences).argmin(axis=0)
residual = np.diagonal(differences[indices,])

output:

CPU times: user 11 ms, sys: 15 ms, total: 26 ms
Wall time: 26.4 ms

searchsorted version:

%%time

index_sorted = np.argsort(known_array)
known_array_sorted = known_array[index_sorted]

idx1 = np.searchsorted(known_array_sorted, test_array)
idx2 = np.clip(idx1 - 1, 0, len(known_array_sorted)-1)

diff1 = known_array_sorted[idx1] - test_array
diff2 = test_array - known_array_sorted[idx2]

indices2 = index_sorted[np.where(diff1 <= diff2, idx1, idx2)]
residual2 = test_array - known_array[indices]

output:

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 311 µs

We can check that the results is the same:

我们可以检查结果是否相同:

assert np.all(residual == residual2)
assert np.all(indices == indices2)

#3


5  

TL;DR: use numpy.searchsorted().

TL; DR:使用numpy.searchsorted()。

import inspect
from timeit import timeit
import numpy as np

known_array = np.arange(-10, 10)
test_array = np.random.randint(-10, 10, 1000)
number = 1000

def base(known_array, test_array):
    def find_nearest(known_array, value):
        idx = (np.abs(known_array - value)).argmin()
        return idx
    indices = np.zeros_like(test_array, dtype=known_array.dtype)
    for i in range(len(test_array)):
        indices[i] =  find_nearest(known_array, test_array[i])
    return indices

def diffs(known_array, test_array):
    differences = (test_array.reshape(1,-1) - known_array.reshape(-1,1))
    indices = np.abs(differences).argmin(axis=0)
    return indices

def searchsorted1(known_array, test_array):
    index_sorted = np.argsort(known_array)
    known_array_sorted = known_array[index_sorted]
    idx1 = np.searchsorted(known_array_sorted, test_array)
    idx1[idx1 == len(known_array)] = len(known_array)-1
    idx2 = np.clip(idx1 - 1, 0, len(known_array_sorted)-1)
    diff1 = known_array_sorted[idx1] - test_array
    diff2 = test_array - known_array_sorted[idx2]
    indices2 = index_sorted[np.where(diff1 <= diff2, idx1, idx2)]
    return indices2

def searchsorted2(known_array, test_array):
    index_sorted = np.argsort(known_array)
    known_array_sorted = known_array[index_sorted]
    known_array_middles = known_array_sorted[1:] - np.diff(known_array_sorted.astype('f'))/2
    idx1 = np.searchsorted(known_array_middles, test_array)
    indices = index_sorted[idx1]
    return indices

def time_f(func_name):
    return timeit(func_name+"(known_array, test_array)",
        'from __main__ import known_array, test_array, ' + func_name, number=number)

print('Speedups:')
base_time = time_f('base')
for func_name in ['diffs', 'searchsorted1', 'searchsorted2']:
    print func_name + ' is x%.1f faster than base.' % (base_time / time_f(func_name))

Output:

Speedups:
diffs is x29.9 faster than base.
searchsorted1 is x37.4 faster than base.
searchsorted2 is x64.3 faster than base.

#1


1  

For example, you can compute all the differences in on go with:

例如,您可以使用以下方式计算所有差异:

differences = (test_array.reshape(1,-1) - known_array.reshape(-1,1))

And use argmin and fancy indexing along with np.diagonal to get desired indices and differences:

并使用argmin和fancy索引以及np.diagonal来获得所需的索引和差异:

indices = np.abs(differences).argmin(axis=0)
residual = np.diagonal(differences[indices,])

So for

>>> known_array = np.array([-24, -18, -13, -30,  29])
>>> test_array = np.array([-6,  4, -6,  4,  8, -4,  8, -6,  2,  8])

One get

>>> indices
array([2, 2, 2, 2, 2, 2, 2, 2, 2, 2])
>>> residual
array([ 7, 17,  7, 17, 21,  9, 21,  7, 15, 21])

#2


6  

If the array is large, you should use searchsorted:

如果数组很大,则应使用searchsorted:

import numpy as np
np.random.seed(0)
known_array = np.random.rand(1000)
test_array = np.random.rand(400)

%%time
differences = (test_array.reshape(1,-1) - known_array.reshape(-1,1))
indices = np.abs(differences).argmin(axis=0)
residual = np.diagonal(differences[indices,])

output:

CPU times: user 11 ms, sys: 15 ms, total: 26 ms
Wall time: 26.4 ms

searchsorted version:

%%time

index_sorted = np.argsort(known_array)
known_array_sorted = known_array[index_sorted]

idx1 = np.searchsorted(known_array_sorted, test_array)
idx2 = np.clip(idx1 - 1, 0, len(known_array_sorted)-1)

diff1 = known_array_sorted[idx1] - test_array
diff2 = test_array - known_array_sorted[idx2]

indices2 = index_sorted[np.where(diff1 <= diff2, idx1, idx2)]
residual2 = test_array - known_array[indices]

output:

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 311 µs

We can check that the results is the same:

我们可以检查结果是否相同:

assert np.all(residual == residual2)
assert np.all(indices == indices2)

#3


5  

TL;DR: use numpy.searchsorted().

TL; DR:使用numpy.searchsorted()。

import inspect
from timeit import timeit
import numpy as np

known_array = np.arange(-10, 10)
test_array = np.random.randint(-10, 10, 1000)
number = 1000

def base(known_array, test_array):
    def find_nearest(known_array, value):
        idx = (np.abs(known_array - value)).argmin()
        return idx
    indices = np.zeros_like(test_array, dtype=known_array.dtype)
    for i in range(len(test_array)):
        indices[i] =  find_nearest(known_array, test_array[i])
    return indices

def diffs(known_array, test_array):
    differences = (test_array.reshape(1,-1) - known_array.reshape(-1,1))
    indices = np.abs(differences).argmin(axis=0)
    return indices

def searchsorted1(known_array, test_array):
    index_sorted = np.argsort(known_array)
    known_array_sorted = known_array[index_sorted]
    idx1 = np.searchsorted(known_array_sorted, test_array)
    idx1[idx1 == len(known_array)] = len(known_array)-1
    idx2 = np.clip(idx1 - 1, 0, len(known_array_sorted)-1)
    diff1 = known_array_sorted[idx1] - test_array
    diff2 = test_array - known_array_sorted[idx2]
    indices2 = index_sorted[np.where(diff1 <= diff2, idx1, idx2)]
    return indices2

def searchsorted2(known_array, test_array):
    index_sorted = np.argsort(known_array)
    known_array_sorted = known_array[index_sorted]
    known_array_middles = known_array_sorted[1:] - np.diff(known_array_sorted.astype('f'))/2
    idx1 = np.searchsorted(known_array_middles, test_array)
    indices = index_sorted[idx1]
    return indices

def time_f(func_name):
    return timeit(func_name+"(known_array, test_array)",
        'from __main__ import known_array, test_array, ' + func_name, number=number)

print('Speedups:')
base_time = time_f('base')
for func_name in ['diffs', 'searchsorted1', 'searchsorted2']:
    print func_name + ' is x%.1f faster than base.' % (base_time / time_f(func_name))

Output:

Speedups:
diffs is x29.9 faster than base.
searchsorted1 is x37.4 faster than base.
searchsorted2 is x64.3 faster than base.