交叉两个numpy索引数组,每个数组中的一个条目

时间:2022-05-01 12:23:06

I have two ordered numpy arrays and I want to interleave them so that I take one item from the first array, then another from the second, then back to the first - taking the next item that is larger than the one I just took from the second and so on. Those are actually arrays of indices to other arrays, and I'll be ok with operating on the original arrays as long as the operation is vectorized (but of course working on the index array as a vector operation will be awesome).

我有两个命令numpy数组和我想交错,以便把一个项目从第一个数组,然后另一个第二,然后回到第一个采取的下一个项目,比我刚从第二等等。这些实际上是指向其他数组的索引数组,只要操作是向量化的,我就可以对原始数组进行操作(当然,作为向量操作处理索引数组是非常棒的)。

Example (ok to assume that the intersection of arrays is empty)

示例(假设数组的交集为空)

a = array([1,2,3,4,7,8,9,10,17])
b = array([5,6,13,14,15,19,21,23])

I would like to get [1,5,7,13,17,19]

我想得到[1,5,7,13,17,19]

5 个解决方案

#1


13  

Vectorised solution (pedagogical style, easily understandable)

We can vectorise this by augmenting the arrays with a discriminator index, such that a is tagged 0 and b is tagged 1:

我们可以通过使用识别器索引对数组进行扩充,从而使a标记为0,b标记为1:

a_t = np.vstack((a, np.zeros_like(a)))
b_t = np.vstack((b, np.ones_like(b)))

Now, let's combine and sort:

现在,我们结合排序:

c = np.hstack((a_t, b_t))[:, np.argsort(np.hstack((a, b)))]
array([[ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 13, 14, 15, 17, 19, 21, 23],
       [ 0,  0,  0,  0,  1,  1,  0,  0,  0,  0,  1,  1,  1,  0,  1,  1,  1]])

You can see that now the elements are in order but retaining their tags, so we can see which elements came from a and from b.

你可以看到现在元素是有序的,但是保留了它们的标签,所以我们可以看到哪些元素来自a和b。

So, let's select the first element and each element where the tag changes from 0 (for a) to 1 (for b) and back again:

那么,让我们选择第一个元素和标记从0 (a)变为1 (b)的每个元素,然后再返回:

c[:, np.concatenate(([True], c[1, 1:] != c[1, :-1]))][0]
array([ 1,  5,  7, 13, 17, 19])

Efficient vectorised solution

You can do this slightly more efficiently by keeping the items and their tags in separate (but parallel) arrays:

通过将项目及其标记保存在单独的(但并行的)数组中,可以稍微提高效率:

ab = np.hstack((a, b))
s = np.argsort(ab)
t = np.hstack((np.zeros_like(a), np.ones_like(b)))[s]
ab[s][np.concatenate(([True], t[1:] != t[:-1]))]
array([ 1,  5,  7, 13, 17, 19])

This is slightly more efficient than the above solution; I get an average of 45 as opposed to 90 microseconds, although your conditions may vary.

这比上面的解决方案稍微有效一些;我平均得到45微秒,而不是90微秒,虽然你的情况可能不同。

#2


3  

You could break the problem into two parts:

你可以把问题分成两部分:

  1. Make a and b iterators which yield values only if they are bigger than some threshold.
  2. 使a和b迭代器只在值大于某个阈值时才产生值。
  3. Use itertools (essentially George Sakkis' roundrobin recipe) to alternate between the two iterators.
  4. 使用itertools(基本上是George Sakkis的roundrobin菜谱)在两个迭代器之间交替使用。

import itertools
import numpy as np

a = np.array([1,2,3,4,7,8,9,10,17])
b = np.array([5,6,13,14,15,19,21,23])

def takelarger(iterable):
    for x in iterable:
        if x > takelarger.threshold:
            takelarger.threshold = x
            yield x
takelarger.threshold = 0

def alternate(*iterables):
    # Modified version of George Sakkis' roundrobin recipe
    # http://docs.python.org/library/itertools.html#recipes
    pending = len(iterables)
    nexts = itertools.cycle(iter(it).next for it in iterables)
    while pending:
        try:
            for n in nexts:
                yield n()
        except StopIteration:
            # Unlike roundrobin, stop when any iterable has been consumed
            break

def interleave(a, b):
    takelarger.threshold = 0
    return list(alternate(takelarger(a),takelarger(b)))

print(interleave(a,b))

yields

收益率

[1, 5, 7, 13, 17, 19]

#3


1  

a = [1,2,3,4,7,8,9,10,17]
b = [5,6,13,14,15,19,21,23]
c=[]
c.append(a[0])  #add the first element to c
i=True          #breaking condition
loop=2          #initially we to choose b
while i:
    if loop==2:
        val=c[-1]
        temp_lis=d([x-val for x in b])  #find the difference between every
                                        # element and the last element of c and
                                        # pick the smallest positive value.

        for k,y in enumerate(temp_lis):
            if y>0:
                c.append(b[k])
                break
        else:
            i=False        #use for-else loop for determining the breaking condition


        loop=1   #change the loop to 1
    elif loop==1:
        val=c[-1]
        temp_lis=[x-val for x in a]
        for k,y in enumerate(temp_lis):
            if y>0:
                c.append(a[k])
                break
        else:
            i=False


        loop=2

print c  

output:

输出:

[1, 5, 7, 13, 17, 19]

#4


1  

Inspired by other posts, I had an other idea of pure python code that require prior ordering but that is symmetric to a and b

受到其他文章的启发,我对纯python代码有了另外的想法,这些代码需要事先排序,但对a和b是对称的

def myIterator4(a, b):
    curr = iter(a)
    next = iter(b)
    try:
        max = curr.next()
        tmp = next.next() #  Empty iterator if b is empty
        while True:
            yield max
            while tmp<=max:
                tmp = next.next()           
            max = tmp
            curr, next  = next, curr
    except StopIteration:
        return  

Returns empty iterator if a or b is empty.

如果a或b为空,则返回空迭代器。

#5


1  

I think you'll have a hard time applying numpy vectorization to this problem and maintain linear performance. This requires storing a fair bit of state: the current index in a, the current index in b, and the current threshold. numpy doesn't provide many stateful vectorized functions. In fact the only one I can think of off the top of my head is ufunc.reduce, which isn't well-suited to this problem -- though of course it could be shoehorned to work.

我认为在这个问题上应用numpy矢量化并保持线性性能是很困难的。这需要存储相当多的状态:a中的当前索引、b中的当前索引和当前阈值。numpy没有提供许多有状态的向量化函数。事实上,我脑子里唯一能想到的就是ufunc。reduce并不适合这个问题——尽管它当然可以被硬塞进去工作。

In fact, just after I posted this, my browser updated with an excellent vectorized solution. But that solution requires sorting, which is O(n log n); and indeed, after some testing, I see that the pure python solution below is faster for all inputs!

事实上,就在我发布这篇文章之后,我的浏览器更新了一个优秀的矢量化解决方案。但是这个解需要排序,也就是O(n log n)的确,经过一些测试,我发现下面的纯python解决方案对于所有输入都更快!

def interleave_monotonic(a, b):
    try:
        a = iter(a)
        threshold = next(a)
        yield threshold
        for current in b:
            if current > threshold:
                threshold = current
                yield threshold
                while current <= threshold:
                    current = next(a)
                threshold = current
                yield threshold
    except StopIteration:
        return

Note that if a is empty, this returns an empty iterable, but if b is empty, this returns a one-item iterable containing the first value of a. I think that's consistent with your example, but this is an edge case that I'm not certain about. Also, the numpy-based solution exhibits slightly different behavior, always beginning with the lesser of a[0] and b[0], while the above always begins with the first value of a, regardless. I modified the above to check for that for the following tests, which show pretty clearly that the pure python solution is the fastest.

注意,如果a是空的,它会返回一个空的可迭代的,但是如果b是空的,它会返回一个包含a的第一个值的单项可迭代的。同样,基于numpbased的解决方案表现出略有不同的行为,总是以低于[0]和b[0]的值开始,而上面的值总是以a的第一个值开始,无论如何。我修改了上面的内容以检查下面的测试,这些测试清楚地显示纯python解决方案是最快的。

Definitions:

定义:

def interleave_monotonic_np(a, b):
    ab = np.hstack((a, b))
    s = np.argsort(ab)
    t = np.hstack((np.zeros_like(a), np.ones_like(b)))[s]
    return ab[s][np.concatenate(([True], t[1:] != t[:-1]))]

def interleave_monotonic(a, b):
    a, b = (a, b) if a[0] <= b[0] else (b, a)
    try:
        a = iter(a)
        threshold = next(a)
        yield threshold
        for current in b:
            if current > threshold:
                threshold = current
                yield threshold
                while current <= threshold:
                    current = next(a)
                threshold = current
                yield threshold
    except StopIteration:
        return

def interleave_monotonic_py(a, b):
    return numpy.fromiter(interleave_monotonic(a, b), dtype='int64')

def get_a_b(n):
    rnd = random.sample(xrange(10 ** 10), n * 2)
    return sorted(rnd[0::2]), sorted(rnd[1::2])

Tests:

测试:

>>> for e in range(7):
...         a, b = get_a_b(10 ** e)
...         print (interleave_monotonic_np(a, b) == 
...                interleave_monotonic_py(a, b)).all()
...         %timeit interleave_monotonic_np(a, b)
...         %timeit interleave_monotonic_py(a, b)
...

True
10000 loops, best of 3: 85.6 us per loop
100000 loops, best of 3: 5.53 us per loop
True
10000 loops, best of 3: 91.7 us per loop
100000 loops, best of 3: 9.19 us per loop
True
10000 loops, best of 3: 144 us per loop
10000 loops, best of 3: 46.5 us per loop
True
1000 loops, best of 3: 711 us per loop
1000 loops, best of 3: 445 us per loop
True
100 loops, best of 3: 6.67 ms per loop
100 loops, best of 3: 4.42 ms per loop
True
10 loops, best of 3: 135 ms per loop
10 loops, best of 3: 55.7 ms per loop
True
1 loops, best of 3: 1.58 s per loop
1 loops, best of 3: 654 ms per loop

#1


13  

Vectorised solution (pedagogical style, easily understandable)

We can vectorise this by augmenting the arrays with a discriminator index, such that a is tagged 0 and b is tagged 1:

我们可以通过使用识别器索引对数组进行扩充,从而使a标记为0,b标记为1:

a_t = np.vstack((a, np.zeros_like(a)))
b_t = np.vstack((b, np.ones_like(b)))

Now, let's combine and sort:

现在,我们结合排序:

c = np.hstack((a_t, b_t))[:, np.argsort(np.hstack((a, b)))]
array([[ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 13, 14, 15, 17, 19, 21, 23],
       [ 0,  0,  0,  0,  1,  1,  0,  0,  0,  0,  1,  1,  1,  0,  1,  1,  1]])

You can see that now the elements are in order but retaining their tags, so we can see which elements came from a and from b.

你可以看到现在元素是有序的,但是保留了它们的标签,所以我们可以看到哪些元素来自a和b。

So, let's select the first element and each element where the tag changes from 0 (for a) to 1 (for b) and back again:

那么,让我们选择第一个元素和标记从0 (a)变为1 (b)的每个元素,然后再返回:

c[:, np.concatenate(([True], c[1, 1:] != c[1, :-1]))][0]
array([ 1,  5,  7, 13, 17, 19])

Efficient vectorised solution

You can do this slightly more efficiently by keeping the items and their tags in separate (but parallel) arrays:

通过将项目及其标记保存在单独的(但并行的)数组中,可以稍微提高效率:

ab = np.hstack((a, b))
s = np.argsort(ab)
t = np.hstack((np.zeros_like(a), np.ones_like(b)))[s]
ab[s][np.concatenate(([True], t[1:] != t[:-1]))]
array([ 1,  5,  7, 13, 17, 19])

This is slightly more efficient than the above solution; I get an average of 45 as opposed to 90 microseconds, although your conditions may vary.

这比上面的解决方案稍微有效一些;我平均得到45微秒,而不是90微秒,虽然你的情况可能不同。

#2


3  

You could break the problem into two parts:

你可以把问题分成两部分:

  1. Make a and b iterators which yield values only if they are bigger than some threshold.
  2. 使a和b迭代器只在值大于某个阈值时才产生值。
  3. Use itertools (essentially George Sakkis' roundrobin recipe) to alternate between the two iterators.
  4. 使用itertools(基本上是George Sakkis的roundrobin菜谱)在两个迭代器之间交替使用。

import itertools
import numpy as np

a = np.array([1,2,3,4,7,8,9,10,17])
b = np.array([5,6,13,14,15,19,21,23])

def takelarger(iterable):
    for x in iterable:
        if x > takelarger.threshold:
            takelarger.threshold = x
            yield x
takelarger.threshold = 0

def alternate(*iterables):
    # Modified version of George Sakkis' roundrobin recipe
    # http://docs.python.org/library/itertools.html#recipes
    pending = len(iterables)
    nexts = itertools.cycle(iter(it).next for it in iterables)
    while pending:
        try:
            for n in nexts:
                yield n()
        except StopIteration:
            # Unlike roundrobin, stop when any iterable has been consumed
            break

def interleave(a, b):
    takelarger.threshold = 0
    return list(alternate(takelarger(a),takelarger(b)))

print(interleave(a,b))

yields

收益率

[1, 5, 7, 13, 17, 19]

#3


1  

a = [1,2,3,4,7,8,9,10,17]
b = [5,6,13,14,15,19,21,23]
c=[]
c.append(a[0])  #add the first element to c
i=True          #breaking condition
loop=2          #initially we to choose b
while i:
    if loop==2:
        val=c[-1]
        temp_lis=d([x-val for x in b])  #find the difference between every
                                        # element and the last element of c and
                                        # pick the smallest positive value.

        for k,y in enumerate(temp_lis):
            if y>0:
                c.append(b[k])
                break
        else:
            i=False        #use for-else loop for determining the breaking condition


        loop=1   #change the loop to 1
    elif loop==1:
        val=c[-1]
        temp_lis=[x-val for x in a]
        for k,y in enumerate(temp_lis):
            if y>0:
                c.append(a[k])
                break
        else:
            i=False


        loop=2

print c  

output:

输出:

[1, 5, 7, 13, 17, 19]

#4


1  

Inspired by other posts, I had an other idea of pure python code that require prior ordering but that is symmetric to a and b

受到其他文章的启发,我对纯python代码有了另外的想法,这些代码需要事先排序,但对a和b是对称的

def myIterator4(a, b):
    curr = iter(a)
    next = iter(b)
    try:
        max = curr.next()
        tmp = next.next() #  Empty iterator if b is empty
        while True:
            yield max
            while tmp<=max:
                tmp = next.next()           
            max = tmp
            curr, next  = next, curr
    except StopIteration:
        return  

Returns empty iterator if a or b is empty.

如果a或b为空,则返回空迭代器。

#5


1  

I think you'll have a hard time applying numpy vectorization to this problem and maintain linear performance. This requires storing a fair bit of state: the current index in a, the current index in b, and the current threshold. numpy doesn't provide many stateful vectorized functions. In fact the only one I can think of off the top of my head is ufunc.reduce, which isn't well-suited to this problem -- though of course it could be shoehorned to work.

我认为在这个问题上应用numpy矢量化并保持线性性能是很困难的。这需要存储相当多的状态:a中的当前索引、b中的当前索引和当前阈值。numpy没有提供许多有状态的向量化函数。事实上,我脑子里唯一能想到的就是ufunc。reduce并不适合这个问题——尽管它当然可以被硬塞进去工作。

In fact, just after I posted this, my browser updated with an excellent vectorized solution. But that solution requires sorting, which is O(n log n); and indeed, after some testing, I see that the pure python solution below is faster for all inputs!

事实上,就在我发布这篇文章之后,我的浏览器更新了一个优秀的矢量化解决方案。但是这个解需要排序,也就是O(n log n)的确,经过一些测试,我发现下面的纯python解决方案对于所有输入都更快!

def interleave_monotonic(a, b):
    try:
        a = iter(a)
        threshold = next(a)
        yield threshold
        for current in b:
            if current > threshold:
                threshold = current
                yield threshold
                while current <= threshold:
                    current = next(a)
                threshold = current
                yield threshold
    except StopIteration:
        return

Note that if a is empty, this returns an empty iterable, but if b is empty, this returns a one-item iterable containing the first value of a. I think that's consistent with your example, but this is an edge case that I'm not certain about. Also, the numpy-based solution exhibits slightly different behavior, always beginning with the lesser of a[0] and b[0], while the above always begins with the first value of a, regardless. I modified the above to check for that for the following tests, which show pretty clearly that the pure python solution is the fastest.

注意,如果a是空的,它会返回一个空的可迭代的,但是如果b是空的,它会返回一个包含a的第一个值的单项可迭代的。同样,基于numpbased的解决方案表现出略有不同的行为,总是以低于[0]和b[0]的值开始,而上面的值总是以a的第一个值开始,无论如何。我修改了上面的内容以检查下面的测试,这些测试清楚地显示纯python解决方案是最快的。

Definitions:

定义:

def interleave_monotonic_np(a, b):
    ab = np.hstack((a, b))
    s = np.argsort(ab)
    t = np.hstack((np.zeros_like(a), np.ones_like(b)))[s]
    return ab[s][np.concatenate(([True], t[1:] != t[:-1]))]

def interleave_monotonic(a, b):
    a, b = (a, b) if a[0] <= b[0] else (b, a)
    try:
        a = iter(a)
        threshold = next(a)
        yield threshold
        for current in b:
            if current > threshold:
                threshold = current
                yield threshold
                while current <= threshold:
                    current = next(a)
                threshold = current
                yield threshold
    except StopIteration:
        return

def interleave_monotonic_py(a, b):
    return numpy.fromiter(interleave_monotonic(a, b), dtype='int64')

def get_a_b(n):
    rnd = random.sample(xrange(10 ** 10), n * 2)
    return sorted(rnd[0::2]), sorted(rnd[1::2])

Tests:

测试:

>>> for e in range(7):
...         a, b = get_a_b(10 ** e)
...         print (interleave_monotonic_np(a, b) == 
...                interleave_monotonic_py(a, b)).all()
...         %timeit interleave_monotonic_np(a, b)
...         %timeit interleave_monotonic_py(a, b)
...

True
10000 loops, best of 3: 85.6 us per loop
100000 loops, best of 3: 5.53 us per loop
True
10000 loops, best of 3: 91.7 us per loop
100000 loops, best of 3: 9.19 us per loop
True
10000 loops, best of 3: 144 us per loop
10000 loops, best of 3: 46.5 us per loop
True
1000 loops, best of 3: 711 us per loop
1000 loops, best of 3: 445 us per loop
True
100 loops, best of 3: 6.67 ms per loop
100 loops, best of 3: 4.42 ms per loop
True
10 loops, best of 3: 135 ms per loop
10 loops, best of 3: 55.7 ms per loop
True
1 loops, best of 3: 1.58 s per loop
1 loops, best of 3: 654 ms per loop