我看到有一个
array_split
和 split
methods 但是当你必须分割一个长度不是块大小整数倍的数组时,这些不是很方便。此外,这些方法的输入是切片数量而不是切片大小。我需要更像Matlab的buffer方法,它更适合信号处理。
例如,如果我想将信号缓冲到大小为 60 的块,我需要这样做:
np.vstack(np.hsplit(x.iloc[0:((len(x)//60)*60)], len(x)//60))
,这很麻烦。
我编写了以下例程来处理我需要的用例,但我尚未实现/测试“underlap”。
请随时提出改进建议。
def buffer(X, n, p=0, opt=None):
'''Mimic MATLAB routine to generate buffer array
MATLAB docs here: https://se.mathworks.com/help/signal/ref/buffer.html
Parameters
----------
x: ndarray
Signal array
n: int
Number of data segments
p: int
Number of values to overlap
opt: str
Initial condition options. default sets the first `p` values to zero,
while 'nodelay' begins filling the buffer immediately.
Returns
-------
result : (n,n) ndarray
Buffer array created from X
'''
import numpy as np
if opt not in [None, 'nodelay']:
raise ValueError('{} not implemented'.format(opt))
i = 0
first_iter = True
while i < len(X):
if first_iter:
if opt == 'nodelay':
# No zeros at array start
result = X[:n]
i = n
else:
# Start with `p` zeros
result = np.hstack([np.zeros(p), X[:n-p]])
i = n-p
# Make 2D array and pivot
result = np.expand_dims(result, axis=0).T
first_iter = False
continue
# Create next column, add `p` results from last col if given
col = X[i:i+(n-p)]
if p != 0:
col = np.hstack([result[:,-1][-p:], col])
i += n-p
# Append zeros if last row and not length `n`
if len(col) < n:
col = np.hstack([col, np.zeros(n-len(col))])
# Combine result with next row
result = np.hstack([result, np.expand_dims(col, axis=0).T])
return result
def buffer(X = np.array([]), n = 1, p = 0):
#buffers data vector X into length n column vectors with overlap p
#excess data at the end of X is discarded
n = int(n) #length of each data vector
p = int(p) #overlap of data vectors, 0 <= p < n-1
L = len(X) #length of data to be buffered
m = int(np.floor((L-n)/(n-p)) + 1) #number of sample vectors (no padding)
data = np.zeros([n,m]) #initialize data matrix
for startIndex,column in zip(range(0,L-n,n-p),range(0,m)):
data[:,column] = X[startIndex:startIndex + n] #fill in by column
return data
此 Keras 函数可被视为 MATLAB Buffer() 的 Python 等效项。
参见示例代码:
import numpy as np
S = np.arange(1,99) #A Demo Array
import tensorflow.keras.preprocessing as kp
list(kp.timeseries_dataset_from_array(S, targets = None,sequence_length=7,sequence_stride=7,batch_size=5))
参考:看这个
与其他答案相同,但速度更快。
def buffer(X, n, p=0):
'''
Parameters
----------
x: ndarray
Signal array
n: int
Number of data segments
p: int
Number of values to overlap
Returns
-------
result : (n,m) ndarray
Buffer array created from X
'''
import numpy as np
d = n - p
m = len(X)//d
if m * d != len(X):
m = m + 1
Xn = np.zeros(d*m)
Xn[:len(X)] = X
Xn = np.reshape(Xn,(m,d))
Xne = np.concatenate((Xn,np.zeros((1,d))))
Xn = np.concatenate((Xn,Xne[1:,0:p]), axis = 1)
return np.transpose(Xn[:-1])
ryanjdillon的答案,以显着提高性能;它附加到列表而不是连接数组,后者会迭代地复制数组并且速度要慢得多。
def buffer(x, n, p=0, opt=None):
if opt not in ('nodelay', None):
raise ValueError('{} not implemented'.format(opt))
i = 0
if opt == 'nodelay':
# No zeros at array start
result = x[:n]
i = n
else:
# Start with `p` zeros
result = np.hstack([np.zeros(p), x[:n-p]])
i = n-p
# Make 2D array, cast to list for .append()
result = list(np.expand_dims(result, axis=0))
while i < len(x):
# Create next column, add `p` results from last col if given
col = x[i:i+(n-p)]
if p != 0:
col = np.hstack([result[-1][-p:], col])
# Append zeros if last row and not length `n`
if len(col):
col = np.hstack([col, np.zeros(n - len(col))])
# Combine result with next row
result.append(np.array(col))
i += (n - p)
return np.vstack(result).T
def buffer(X, n, p=0):
'''
Parameters:
x: ndarray, Signal array, input a long vector as raw speech wav
n: int, frame length
p: int, Number of values to overlap
-----------
Returns:
result : (n,m) ndarray, Buffer array created from X
'''
import numpy as np
d = n - p
#print(d)
m = len(X)//d
c = n//d
#print(c)
if m * d != len(X):
m = m + 1
#print(m)
Xn = np.zeros(d*m)
Xn[:len(X)] = X
Xn = np.reshape(Xn,(m,d))
Xn_out = Xn
for i in range(c-1):
Xne = np.concatenate((Xn,np.zeros((i+1,d))))
Xn_out = np.concatenate((Xn_out, Xne[i+1:,:]),axis=1)
#print(Xn_out.shape)
if n-d*c>0:
Xne = np.concatenate((Xn, np.zeros((c,d))))
Xn_out = np.concatenate((Xn_out,Xne[c:,:n-p*c]),axis=1)
return np.transpose(Xn_out)
这里是 Ali Khodabakhsh 示例代码的改进代码,在我的情况下不起作用。欢迎评论并使用它。
通过运行来比较建议答案的执行时间
x = np.arange(1,200000)
start = timer()
y = buffer(x,60,20)
end = timer()
print(end-start)
结果是:
安杰伊·梅,0.005595300000095449
霸主金龙,0.06954789999986133
瑞安狄龙,2.427092700000003
Numpy 可能没有这样的内置函数。然而,Pytorch 和 Tensorflow 都有它。唯一的区别是输入参数是“step”而不是“overlap”。 (步长 = 窗口长度 - 重叠)
火炬
torch.unfold 有 3 个输入参数 (ref):
dimension (int) – 展开发生的维度。
size (int) – 展开的每个切片的大小(窗口长度)
step (int) – 每个切片之间的步长*(窗口长度 - 重叠)*
x = torch.arange(1., 8)
x.unfold(0, 2, 1)
然后您可以简单地将数据类型更改回 numpy:
x.numpy()
对于张量流等效项,您可以检查这里。