我有一个 TimeSeries 数据集,其中包含如下所示的图。我正在尝试找到对时间序列进行分割的最佳方法。我需要将时间序列分为三个区域 - 'RampUp'、'Plateua' 和 'CoolDown' 分别用于初始斜率上升部分、近似恒定部分和最终冷却部分。
对我将获得的未来系列进行此细分的最佳方法是什么?例如,如果我的模型给出这样的时间序列作为输入,它应该能够输出区域边界的索引?另外,这可以在无人监督的情况下进行吗?
谢谢您的宝贵时间。非常感谢任何建议。
正如 @mdgorgan 所提供的,除了其他方法(基于 ML 的方法/黑客)之外,您还可以使用这种基于信号处理的方法来执行分割任务。我根据这个文档尝试过:
#install library
#!python -m pip install ruptures
#import libraries
import matplotlib.pyplot as plt
import ruptures as rpt
#genearte sample data in form of a signal
n_samples, n_dims, sigma = 1000, 1, 1
n_bkps = 2 # number of breakpoints
signal, bkps = rpt.pw_constant(n_samples, n_dims, n_bkps, noise_std=sigma)
print(bkps) #[340, 671, 1000]
# detection
algo = rpt.Dynp(model="l2").fit(signal)
result = algo.predict(n_bkps=2)
print(result)
#[340, 670, 1000]
# display segmentation
rpt.display(signal, bkps, result)
plt.show()
它根据变化点检测进行分割工作。
我是 stackoverflow 新手;我很可能遇到了你的问题,我用动态编程(DP)解决了,但在那之后我需要实时进行,DP的问题是当你的数据集变大时变得非常慢,我有超过20000个值,并且完成该算法大约需要7:30小时。但它有效。 所以这是我用于分割的代码:
import csv
import numpy as np
import matplotlib.pyplot as plt
class sls:
"""
find best least squares fit of multiple segments to the input data
"""
def __init__(self, filename):
"""
Inputs:
filename - text file of pairs of x-y points, one pair per row,
separated by a space
"""
self.get_data(filename)
def get_data(self, filename):
"""
read the data and find the least squares coefficients and errors
for all possible pairs of start and end points
"""
with open(filename) as csvfile:
r = csv.reader(csvfile, delimiter = '\t')
data = np.array([[float(row[0]), float(row[1])] for row in r])
x = data[:,0] #data
y = data[:,1]
v = np.std(y)**2 #variance of y for caculating penalty
n = len(x)
err_arr = np.zeros((n,n))
a_arr = np.zeros((n,n))
b_arr = np.zeros((n,n))
#for j end indices of data, 0 to n-1
for j in range(n):
#for i starting indices of this data, 0 to j
for i in range(j+1):
#for this segment, i to j, get coefs and error of fit
a, b, e2 = self.lscoef(x[i:j+1],y[i:j+1])
#store error and coefs for segment i to j in array at [i,j]
err_arr[i,j] = e2
a_arr[i,j] = a
b_arr[i,j] = b
self.x = x
self.y = y
self.err_arr = err_arr
self.a_arr = a_arr
self.b_arr = b_arr
self.v = v
def lscoef(self, x, y):
"""
least squares fit
"""
n=len(x)
if (n == 1):
return (0.0, y[0], 0.0)
sx = np.sum(x)
sy = np.sum(y)
sx2 = np.sum(x ** 2)
sxy = np.sum(x * y)
a = (n*sxy-sx*sy)/(n*sx2-sx*sx)
b = (sy-a*sx)/n
e2 = np.sum((y-a*x-b)**2)
return (a, b, e2)
def find_segments(self, max_num_seg=None, desired_penalty=30.0,
penalty_start=0.15, penalty_inc=0.15):
"""
Find the segments and least squares segment coefficients.
If desired_penalty is used, find_opt() is called directly (so is faster),
otherwise there is a loop over penalty values to find solution that has less
than maximum number of segments.
If no parameters input, defaults to desired_penalty of 0.35.
To set a desired_penalty, do not input max_num_seg to set max_num_seg to None.
To limit the number of segments, set max_num_seg (desired_penalty ignored),
and to control the search for the penalty to get the number of segments,
set penalty_start and penalty_inc. These may depend on the data.
Inputs
max_num_seg - set if a limit to the number of segments is desired
desired_penalty - penalty term to add to error in optimization to all
for noise
penalty_start, penalty_inc - parameter for searching for penalty
term in case of limiting the number of
segments
Returns
penalty value used
number of segments used
"""
if max_num_seg is not None:
# setting max_num_seg, ignore desired_penalty and check other parameters
desired_penalty = None
assert max_num_seg >= 1, 'max_num_seg must be at least 1'
assert penalty_start is not None and penalty_start > 0, 'penalty_start must not None, > 0'
assert penalty_inc is not None and penalty_inc > 0, 'penalty_inc must be not None, > 0'
else:
# using desired_penalty, check it
assert desired_penalty is not None and desired_penalty > 0, 'desired_penalty must be not None, > 0'
if desired_penalty:
# call find_opt directly
penalty = desired_penalty
self.find_opt(penalty_factor=penalty)
num_seg = self.get_num_segments()
else:
# loop over penalties to find one that gives number of segments
# less than max_num_seg
penalty = penalty_start - penalty_inc
num_seg = np.inf
while num_seg > max_num_seg:
penalty += penalty_inc
self.find_opt(penalty_factor=penalty)
num_seg = self.get_num_segments()
return penalty, num_seg
def find_opt(self, penalty_factor):
"""
dynamic programming segmented least squares
"""
penalty = self.v * penalty_factor
n = self.err_arr.shape[0]
#for each end index, get minimum error over possible start indices
opt_arr = np.zeros(n)
for j in range(n):
#for this end index, use min error for previous end index and
#current error to get errors for all possible start indices
tmp_opt = np.zeros(j+1)
tmp_opt[0] = self.err_arr[0,j] + penalty
for i in range(1,j+1):
tmp_opt[i] = opt_arr[i-1] + self.err_arr[i,j] + penalty
opt_arr[j] = np.amin(tmp_opt)
#backtrack from opt_arr[n-1] to get segment and coefficients
opt_coefs = []
j = n-1
while j >= 0:
tmp_opt = np.zeros(j+1)
tmp_opt[0] = self.err_arr[0,j] + penalty
for i in range(1,j+1):
tmp_opt[i] = opt_arr[i-1] + self.err_arr[i,j] + penalty
i_opt = np.argmin(tmp_opt)
a_opt = self.a_arr[i_opt,j]
b_opt = self.b_arr[i_opt,j]
#set boundaries of interval for these coefs in terms of x
if i_opt <= 0:
xmin = np.NINF
else:
xmin = (self.x[i_opt-1] + self.x[i_opt])/2
if j >= n-1:
xmax = np.Inf
else:
xmax = (self.x[j] + self.x[j+1])/2
opt_coefs.insert(0, (xmin,xmax,a_opt,b_opt))
print(xmin)
j = i_opt-1
self.opt_coefs = opt_coefs
def get_num_segments(self):
return len(self.opt_coefs)
def get_fit(self,x):
#get fit using coefs
n = len(x)
yfit = np.zeros(n)
for opt_coef in self.opt_coefs:
ind = [i for i,elem in enumerate(x) \
if elem >= opt_coef[0] and elem <= opt_coef[1]]
yfit[ind] = x[ind] * opt_coef[2] + opt_coef[3]
return yfit
def plot_fit(self,xplt):
yfit = self.get_fit(xplt)
plt.plot(self.x, self.y, '.')
plt.plot(xplt, yfit)
plt.show()
然后您必须在其他模块中使用此模块才能进行操作:
from sls_class import sls
from itertools import count
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
data = pd.read_csv("norm1Ac2.txt",sep="\t")#The format of the data is index(0,1,2,3,...) value(y1,y2,y3,...)
x = data.index
y1 = data.values
y1 = pd.DataFrame(y1)
y1 = y1[1]
s = sls("norm1Ac2.txt")
penalty, num_segments = s.find_segments()
print("penalty = {0}, num segments = {1}".format(penalty, num_segments))
n = len(s.x)
minx = np.amin(s.x)
maxx = np.amax(s.x)
dx = 0.33*(maxx-minx)/(n-1)
xplt = np.arange(minx,maxx+dx,dx)
s.plot_fit(xplt)
plt.plot(x, data["y1"], label='Gas')
plt.plot(xplt,s.get_fit(xplt),"r")
plt.xticks(rotation=90)
plt.legend(loc='upper left')
plt.tight_layout()
plt.show()
我希望这可以帮助你。
我想知道在获得片段后是否有人有解决方案,我该如何存储它?,比如在该点切断数据集并从切割数据集的下一个点重新运行(认为它是真实运行的)时间)我使用 Animate 形式 matplotlib 来表示实时,但我感觉堆积了 tath 点。感谢您的阅读。
如果有人想找到贝叶斯替代方案,Python 中的一种可能是我的贝叶斯变点检测和时间序列分割/分解包:https://pypi.org/project/Rbeast。与非贝叶斯方法不同,它不仅告诉我们何时以及有多少个变化点/段,而且还告诉我们与变化点发生相关的概率(例如,有 3 个变化点的概率是多少……)。从统计上讲,它适合模型 Y = 季节性 + 趋势 + 误差(季节性分量是可选的),其中分段模型用于假定 idd 正态误差的季节性或趋势分量。可以通过
pip install Rbeast
安装。
下面是一个简单的例子:
import Rbeast as rb
nile, year = rb.load_example('nile') # an annual time series of streamflow of the River Nile
o = rb.beast( nile, start=1871, season='none')
rb.plot(o)
rb.print(o)
o # see a list of output fields in the output variable o