问题 迭代Python中相邻元素的“窗口”


这更像是一个优雅和性能的问题,而不是“如何做”,所以我只是展示代码:

def iterate_adjacencies(gen, fill=0, size=2, do_fill_left=True,
  do_fill_right=False):
    """ Iterates over a 'window' of `size` adjacent elements in the supploed
    `gen` generator, using `fill` to fill edge if `do_fill_left` is True
    (default), and fill the right edge (i.e.  last element and `size-1` of
    `fill` elements as the last item) if `do_fill_right` is True.  """
    fill_size = size - 1
    prev = [fill] * fill_size
    i = 1
    for item in gen:  # iterate over the supplied `whatever`.
        if not do_fill_left and i < size:
            i += 1
        else:
            yield prev + [item]
        prev = prev[1:] + [item]
    if do_fill_right:
        for i in range(fill_size):
            yield prev + [fill]
            prev = prev[1:] + [fill]

然后问:那已经有了功能吗?而且,如果没有,你能以更好的方式(即更整洁和/或更快)的方式做同样的事情吗?

编辑:

来自@ agf,@ FreeBird,@ senderle的答案的想法,一个看起来有点整洁的代码是:

def window(seq, size=2, fill=0, fill_left=True, fill_right=False):
    """ Returns a sliding window (of width n) over data from the iterable:
      s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...
    """
    ssize = size - 1
    it = chain(
      repeat(fill, ssize * fill_left),
      iter(seq),
      repeat(fill, ssize * fill_right))
    result = tuple(islice(it, size))
    if len(result) == size:  # `<=` if okay to return seq if len(seq) < size
        yield result
    for elem in it:
        result = result[1:] + (elem,)
        yield result

8342
2017-08-09 14:57


起源

您可能想要更详细地解释这对单词的作用。 - agf
你能提供一个样本输入和预期输出的样本吗?你会更容易理解你的功能在做什么。 - mdeous
我运行代码并查看比从文档字符串中找出来更快,因为措辞不是那么清楚,所以我建议他更清楚地写出来。 - agf
@HoverHell - 非常,非常好。你应该把它作为答案发布并接受它。我将如何区分文档字符串?我会把它分解成几个简短的陈述句,描述它的作用和每个论点。 - agf
@HoverHell:1)是的,请将您的新代码作为答案发布。 2) 这个问题及其答案 解释如何在numpy中创建滚动窗口。 - senderle


答案:


此页面显示如何使用滑动窗口 itertoolshttp://docs.python.org/release/2.3.5/lib/itertools-example.html

def window(seq, n=2):
    "Returns a sliding window (of width n) over data from the iterable"
    "   s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...                   "
    it = iter(seq)
    result = tuple(islice(it, n))
    if len(result) == n:
        yield result    
    for elem in it:
        result = result[1:] + (elem,)
        yield result

输出示例:

>>> list(window(range(10)))
[(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9)]

如果需要,您需要将其更改为左右填充。


6
2017-08-09 15:05





这是我的版本填充,保持签名相同。我以前见过 itertools 食谱,但在写这篇文章之前没有看过它。

from itertools import chain
from collections import deque

def ia(gen, fill=0, size=2, fill_left=True, fill_right=False):
    gen, ssize = iter(gen), size - 1
    deq = deque(chain([fill] * ssize * fill_left,
                      (next(gen) for _ in xrange((not fill_left) * ssize))),
                maxlen = size)
    for item in chain(gen, [fill] * ssize * fill_right):
        deq.append(item)
        yield deq

编辑:在发布此问题之前,我也没有看到您对您的问题的评论。

编辑2:修复。我试过用一个 chain 但这个设计需要两个。

编辑3:正如@senderle所指出的那样,只能将它用作生成器,不要用它包装 list 或者累积输出,因为它会重复产生相同的可变项。


2
2017-08-09 15:20



返回一个元组会不会更好?的结果 list(ia(range(10), 'a', fill_left=True, fill_right=True)) 是同一个deque的引用列表,都在同一个状态。此外,你可以使用 islice 构造deque然后遍历gen的其余部分,而不是两次调用链。 - senderle
这只是itertools中给出的解决方案。我对复制它不感兴趣。 - agf
它是?我没有看到 itertools 任何地方。让我重新说一下。反复 yield同一个可变对象会产生意想不到的结果。例如,输出 list(ia(range(2), 'x', 2, True, True)) 是 [deque([1, 'x'], maxlen=2), deque([1, 'x'], maxlen=2), deque([1, 'x'], maxlen=2)]。我认为返回deque内容的副本而不是仅返回deque是有意义的。最快的方法可能是 tuple。 - senderle
我明白。但是,如果你要屈服 tuple和使用 islice 无论如何,那么@HoverHell基于其中的解决方案给出了解决方案 itertools 同 chain 和 repeat 填补更好。不值得改变这个更像那样;最好留下它,当你真的想用它作为发电机。 - agf
是的,好的,我现在看到你在说什么。事实上,在做完之后 一些时间,我必须承认你在所有方面都是完全正确的。 - senderle


好了,在我意识到这之后,这是一个非荒谬的版本 window_iter_fill。我之前的版本(在编辑中可见)很糟糕,因为我忘了使用它 izip。不知道我在想什么。使用izip,这是有效的,事实上,它是小输入的最快选择!

def window_iter_fill(gen, size=2, fill=None):
    gens = (chain(repeat(fill, size - i - 1), gen, repeat(fill, i))
            for i, gen in enumerate(tee(gen, size)))
    return izip(*gens)

这个对于元组屈服也很好,但不是那么快。

def window_iter_deque(it, size=2, fill=None, fill_left=False, fill_right=False):
    lfill = repeat(fill, size - 1 if fill_left else 0)
    rfill = repeat(fill, size - 1 if fill_right else 0)
    it = chain(lfill, it, rfill)
    d = deque(islice(it, 0, size - 1), maxlen=size)
    for item in it:
        d.append(item)
        yield tuple(d)

HoverHell最新的解决方案仍然是高输入的最佳元组解决方案。

一些时间:

Arguments: [xrange(1000), 5, 'x', True, True]

==============================================================================
  window               HoverHell's frankeniter           :  0.2670ms [1.91x]
  window_itertools     from old itertools docs           :  0.2811ms [2.02x]
  window_iter_fill     extended `pairwise` with izip     :  0.1394ms [1.00x]
  window_iter_deque    deque-based, copying              :  0.4910ms [3.52x]
  ia_with_copy         deque-based, copying v2           :  0.4892ms [3.51x]
  ia                   deque-based, no copy              :  0.2224ms [1.60x]
==============================================================================

缩放行为:

Arguments: [xrange(10000), 50, 'x', True, True]

==============================================================================
  window               HoverHell's frankeniter           :  9.4897ms [4.61x]
  window_itertools     from old itertools docs           :  9.4406ms [4.59x]
  window_iter_fill     extended `pairwise` with izip     :  11.5223ms [5.60x]
  window_iter_deque    deque-based, copying              :  12.7657ms [6.21x]
  ia_with_copy         deque-based, copying v2           :  13.0213ms [6.33x]
  ia                   deque-based, no copy              :  2.0566ms [1.00x]
==============================================================================

agf的deque-yield脚本对于大输入来说非常快 - 看似O(n)而不是O(n,m)就像其他的一样,其中n是iter的长度,m是窗口的大小 - 因为它不必迭代每个窗口。但我仍然认为在一般情况下产生元组更有意义,因为调用函数可能只是迭代遍历deque;这只是计算负担的转变。较大程序的渐近行为应保持不变。

不过,在某些特殊情况下, deque - 容易版本可能会更快。

基于HoverHell测试结构的更多时序。

>>> import testmodule
>>> kwa = dict(gen=xrange(1000), size=4, fill=-1, fill_left=True, fill_right=True)
>>> %timeit -n 1000 [a + b + c + d for a, b, c, d in testmodule.window(**kwa)]
1000 loops, best of 3: 462 us per loop
>>> %timeit -n 1000 [a + b + c + d for a, b, c, d in testmodule.ia(**kwa)]
1000 loops, best of 3: 463 us per loop
>>> %timeit -n 1000 [a + b + c + d for a, b, c, d in testmodule.window_iter_fill(**kwa)]
1000 loops, best of 3: 251 us per loop
>>> %timeit -n 1000 [sum(x) for x in testmodule.window(**kwa)]
1000 loops, best of 3: 525 us per loop
>>> %timeit -n 1000 [sum(x) for x in testmodule.ia(**kwa)]
1000 loops, best of 3: 462 us per loop
>>> %timeit -n 1000 [sum(x) for x in testmodule.window_iter_fill(**kwa)]
1000 loops, best of 3: 333 us per loop

总的来说,一旦你使用 izipwindow_iter_fill 结果很快 - 特别是对于小窗户。


2
2017-08-10 01:21



但是迭代一个双端队列比迭代一个元组要快,所以如果你要迭代就返回一个双端队列是有意义的。 这是最近的答案 我在其中定时了几个窗口迭代器并考虑了deque与元组/列表问题。 - kindall
有趣的是window_itertools有点慢。此外,如果您要进行迭代,deque会更快,但如果您正在拆包,则元组似乎更快。 - HoverHell
呃,实际上只是元组 似乎 打开包装时速度更快 - 在这种情况下deque几乎一样快。查看我的答案了解一些时间。 - HoverHell
@kindall,此外,如果你返回一个元组,你可能会迭代两次值。但我并不清楚 - 我的观点只是因为它的渐近行为 ia  容貌 在上述时间中更好(只有~10倍而不是像所有其他时间一样慢50倍)使用它的程序的渐近行为可能不会更好。所以你真的只是通过一个恒定因素获得加速。我不认为支付更通用的迭代器是不可接受的代价。当然,出于特殊目的,返回一个双端队列是一个很好的策略。 - senderle
@kindall,@ HoverHell,看来我是傻瓜!我忘了用 izip - 事实上,与 izip,产生元组是自动的,并为小输入产生最快的结果(在我的计算机上)。 - senderle


产生的功能(来自问题的编辑),

frankeniter与来自@ agf,@ FreeBird,@ senderle的答案的想法,一个看起来有点整洁的代码片段是:

from itertools import chain, repeat, islice

def window(seq, size=2, fill=0, fill_left=True, fill_right=False):
    """ Returns a sliding window (of width n) over data from the iterable:
      s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...
    """
    ssize = size - 1
    it = chain(
      repeat(fill, ssize * fill_left),
      iter(seq),
      repeat(fill, ssize * fill_right))
    result = tuple(islice(it, size))
    if len(result) == size:  # `<=` if okay to return seq if len(seq) < size
        yield result
    for elem in it:
        result = result[1:] + (elem,)
        yield result

并且,有关deque / tuple的一些性能信息:

In [32]: kwa = dict(gen=xrange(1000), size=4, fill=-1, fill_left=True, fill_right=True)
In [33]: %timeit -n 10000 [a+b+c+d for a,b,c,d in tmpf5.ia(**kwa)]
10000 loops, best of 3: 358 us per loop
In [34]: %timeit -n 10000 [a+b+c+d for a,b,c,d in tmpf5.window(**kwa)]
10000 loops, best of 3: 368 us per loop
In [36]: %timeit -n 10000 [sum(x) for x in tmpf5.ia(**kwa)]
10000 loops, best of 3: 340 us per loop
In [37]: %timeit -n 10000 [sum(x) for x in tmpf5.window(**kwa)]
10000 loops, best of 3: 432 us per loop

但无论如何,如果它是数字,那么numpy可能更可取。


1
2017-08-11 07:29





我很惊讶没有人采取简单的协程方法。

from collections import deque


def window(n, initial_data=None):
    if initial_data:
        win = deque(initial_data, n)
    else:
        win = deque(((yield) for _ in range(n)), n)
    while 1:
        side, val = (yield win)
        if side == 'left':
            win.appendleft(val)
        else:
            win.append(val)

win = window(4)
win.next()

print(win.send(('left', 1)))
print(win.send(('left', 2)))
print(win.send(('left', 3)))
print(win.send(('left', 4)))
print(win.send(('right', 5)))

## -- Results of print statements --
deque([1, None, None, None], maxlen=4)
deque([2, 1, None, None], maxlen=4)
deque([3, 2, 1, None], maxlen=4)
deque([4, 3, 2, 1], maxlen=4)
deque([3, 2, 1, 5], maxlen=4)

0
2017-11-11 01:30