问题 添加numpy数组时避免溢出


我想用数据类型uint8添加numpy数组。我知道这些数组中的值可能足够大,可以发生溢出。所以我得到类似的东西:

a = np.array([100, 200, 250], dtype=np.uint8)
b = np.array([50, 50, 50], dtype=np.uint8)
a += b

现在,是 [150 250 44]。但是,我希望uint8的值太大而不是uint8允许的最大值,而不是溢出。所以我想要的结果就是 [150 250 255]

我可以使用以下代码获得此结果:

a = np.array([100, 200, 250], dtype=np.uint8)
b = np.array([50, 50, 50], dtype=np.uint8)
c = np.zeros((1,3), dtype=np.uint16)
c += a
c += b
c[c>255] = 255
a = np.array(c, dtype=np.uint8)

问题是,我的数组非常大,因此创建具有更大数据类型的第三个数组可能是内存问题。是否有一种快速且更有效的内存方式来实现所描述的结果?


12820
2018-04-13 17:15


起源



答案:


您可以通过创建第三个dtype uint8数组和一个bool数组来实现此目的 (它们一起比一个uint16阵列的内存效率更高)

np.putmask 对于避免临时数组很有用。

a = np.array([100, 200, 250], dtype=np.uint8)
b = np.array([50, 50, 50], dtype=np.uint8)
c = 255 - b  # a temp uint8 array here
np.putmask(a, c < a, c)  # a temp bool array here
a += b

但是,正如@moarningsun正确指出的那样,bool数组占用与uint8数组相同的内存量,因此这不一定有用。可以通过避免具有多个临时阵列来解决这个问题 在任何给定的时间

a = np.array([100, 200, 250], dtype=np.uint8)
b = np.array([50, 50, 50], dtype=np.uint8)
b = 255 - b  # old b is gone shortly after new array is created
np.putmask(a, b < a, b)  # a temp bool array here, then it's gone
a += 255 - b  # a temp array here, then it's gone

这种方法用于交换CPU的内存消耗。


另一种方法是 预先计算 所有可能的结果,即O(1)额外内存(即独立于数组的大小):

c = np.clip(np.arange(256) + np.arange(256)[..., np.newaxis], 0, 255).astype(np.uint8)
c
=> array([[  0,   1,   2, ..., 253, 254, 255],
          [  1,   2,   3, ..., 254, 255, 255],
          [  2,   3,   4, ..., 255, 255, 255],
          ..., 
          [253, 254, 255, ..., 255, 255, 255],
          [254, 255, 255, ..., 255, 255, 255],
          [255, 255, 255, ..., 255, 255, 255]], dtype=uint8)

c[a,b]
=> array([150, 250, 255], dtype=uint8)

如果阵列非常大,这种方法的内存效率最高。同样,它在处理时间上很昂贵,因为它用较慢的2dim-array索引替换了超快整数加法。

解释它的工作原理

建设 c 上面的数组使用了一个numpy广播技巧。添加一个形状数组 (N,) 和形状的数组 (1,N) 广播两者都是 (N,N)类似,因此结果是所有可能总和的NxN数组。然后,我们剪辑它。我们得到一个满足以下条件的2dim数组: c[i,j]=min(i+j,255) 对于每个我,j。

然后剩下的是使用花式索引抓取正确的值。使用您提供的输入,我们访问:

c[( [100, 200, 250] , [50, 50, 50] )]

第一个索引数组指第一个暗淡,第二个指向第二个暗淡。因此,结果是一个与索引数组形状相同的数组((N,)),由值组成 [ c[100,50] , c[200,50] , c[250,50] ]


7
2018-04-13 17:25



不知道 putmask, 感谢那!我认为使用该功能 a += b 其次是 np.putmask(a, a<b, 255) 还应该工作吗?
@moarningsun我认为你是对的。然而它依赖于溢出,我个人觉得不太满意...... - shx2
@moarningsun你为什么删除你的答案?我认为这是一个不错的答案,它的确有效 - shx2
那种方法需要 putmask 要有效率,这是你答案中的要点之一。所以我想我也可以在这里作为评论。
我尝试了你的第三种方法,它的效果非常好。但是,你能解释一下,它有什么作用?我比较了第三种方法('预先计算')的执行时间与我只是使用uint16数组的情况,做一个+ = b和一个[a> 255] = 255。 'Precalculate'大约需要两倍的时间。但是如果它的内存效率更高,我将能够使用uint16方法对大型数组进行计算。 - Thomas


答案:


您可以通过创建第三个dtype uint8数组和一个bool数组来实现此目的 (它们一起比一个uint16阵列的内存效率更高)

np.putmask 对于避免临时数组很有用。

a = np.array([100, 200, 250], dtype=np.uint8)
b = np.array([50, 50, 50], dtype=np.uint8)
c = 255 - b  # a temp uint8 array here
np.putmask(a, c < a, c)  # a temp bool array here
a += b

但是,正如@moarningsun正确指出的那样,bool数组占用与uint8数组相同的内存量,因此这不一定有用。可以通过避免具有多个临时阵列来解决这个问题 在任何给定的时间

a = np.array([100, 200, 250], dtype=np.uint8)
b = np.array([50, 50, 50], dtype=np.uint8)
b = 255 - b  # old b is gone shortly after new array is created
np.putmask(a, b < a, b)  # a temp bool array here, then it's gone
a += 255 - b  # a temp array here, then it's gone

这种方法用于交换CPU的内存消耗。


另一种方法是 预先计算 所有可能的结果,即O(1)额外内存(即独立于数组的大小):

c = np.clip(np.arange(256) + np.arange(256)[..., np.newaxis], 0, 255).astype(np.uint8)
c
=> array([[  0,   1,   2, ..., 253, 254, 255],
          [  1,   2,   3, ..., 254, 255, 255],
          [  2,   3,   4, ..., 255, 255, 255],
          ..., 
          [253, 254, 255, ..., 255, 255, 255],
          [254, 255, 255, ..., 255, 255, 255],
          [255, 255, 255, ..., 255, 255, 255]], dtype=uint8)

c[a,b]
=> array([150, 250, 255], dtype=uint8)

如果阵列非常大,这种方法的内存效率最高。同样,它在处理时间上很昂贵,因为它用较慢的2dim-array索引替换了超快整数加法。

解释它的工作原理

建设 c 上面的数组使用了一个numpy广播技巧。添加一个形状数组 (N,) 和形状的数组 (1,N) 广播两者都是 (N,N)类似,因此结果是所有可能总和的NxN数组。然后,我们剪辑它。我们得到一个满足以下条件的2dim数组: c[i,j]=min(i+j,255) 对于每个我,j。

然后剩下的是使用花式索引抓取正确的值。使用您提供的输入,我们访问:

c[( [100, 200, 250] , [50, 50, 50] )]

第一个索引数组指第一个暗淡,第二个指向第二个暗淡。因此,结果是一个与索引数组形状相同的数组((N,)),由值组成 [ c[100,50] , c[200,50] , c[250,50] ]


7
2018-04-13 17:25



不知道 putmask, 感谢那!我认为使用该功能 a += b 其次是 np.putmask(a, a<b, 255) 还应该工作吗?
@moarningsun我认为你是对的。然而它依赖于溢出,我个人觉得不太满意...... - shx2
@moarningsun你为什么删除你的答案?我认为这是一个不错的答案,它的确有效 - shx2
那种方法需要 putmask 要有效率,这是你答案中的要点之一。所以我想我也可以在这里作为评论。
我尝试了你的第三种方法,它的效果非常好。但是,你能解释一下,它有什么作用?我比较了第三种方法('预先计算')的执行时间与我只是使用uint16数组的情况,做一个+ = b和一个[a> 255] = 255。 'Precalculate'大约需要两倍的时间。但是如果它的内存效率更高,我将能够使用uint16方法对大型数组进行计算。 - Thomas


这是一种方式:

>>> a = np.array([100, 200, 250], dtype=np.uint8)
>>> b = np.array([50, 50, 50], dtype=np.uint8)
>>> a+=b; a[a<b]=255
>>> a
array([150, 250, 255], dtype=uint8)

1
2018-04-13 18:32





您可以使用Numba真正实现它,例如:

import numba

@numba.jit('void(u1[:],u1[:])', locals={'temp': numba.uint16})
def add_uint8_inplace_clip(a, b):
    for i in range(a.shape[0]):
        temp = a[i] + b[i]
        a[i] = temp if temp<256 else 255

add_uint8_inplace_clip(a, b)

或者使用Numexpr,例如:

import numexpr

numexpr.evaluate('where((a+b)>255, 255, a+b)', out=a, casting='unsafe')

Numexpr upcasts  uint8 至 int32 在内部,然后把它放回去 uint8 阵列。


1
2018-04-13 19:30





def non_overflowing_sum(a, b)
    c = np.uint16(a)+b
    c[np.where(c>255)] = 255
    return np.uint8( c )

它也交换内存,但我发现更优雅,临时uint16在返回后转换后被释放


1
2018-06-02 15:11





这样做

>>> a + np.minimum(255 - a, b)
array([150, 250, 255], dtype=uint8)

一般来说,获取数据类型的最大值

np.iinfo(np.uint8).max

0
2018-04-13 17:23



@PadraicCunningham确实如此,但dint uint8,而不是uint16。然而,它创造了 三 temp uint8数组.. - shx2
@ shx2:我只统计两个。 255 - a,和 np.minimum(255 - a, b)。第三个是什么? - user2357112
@ user2357112 ,. a + ...。如果OP希望结果​​代替 a 数组,有可能避免这种情况。 - shx2


numpy中的一个函数 为了这:

numpy.nan_to_num(x)[source]

将nan替换为零,使用有限数字替换inf。

返回一个数组或标量替换非数字(NaN),零,(正)无穷大,数字非常大,负无穷大,数字非常小(或负数)。

新的数组与x的形状相同,x中的元素的dtype具有最高的精度。

如果x不精确,则NaN被零替换,并且无穷大(-infinity)被适合输出dtype的最大(最小或最负)浮点值替换。如果x不精确,则返回x的副本。

我不确定它是否适用于uint8,因为在输出中提到浮点,但对于其他读者,它可能是有用的


0
2018-02-28 16:05



我不明白这对这个问题有什么帮助。在任何要添加的数组中都没有NaN或无限值。所以也许我错过了你的回答? - Thomas
@Thomas嗯,也许它对于整数类型是不同的,但是当我遇到浮点数的问题时,溢出出现为+/-无穷大 - Toshinou Kyouko
@ToshinouKyouko是的,它确实与整数不同,它们只是溢出,如OP的例子。 - luator