使用Python进行时间序列分割

问题描述 投票:0回答:4

我有一个 TimeSeries 数据集,其中包含如下所示的图。我正在尝试找到对时间序列进行分割的最佳方法。我需要将时间序列分为三个区域 - 'RampUp'、'Plateua' 和 'CoolDown' 分别用于初始斜率上升部分、近似恒定部分和最终冷却部分。

对我将获得的未来系列进行此细分的最佳方法是什么?例如,如果我的模型给出这样的时间序列作为输入,它应该能够输出区域边界的索引?另外,这可以在无人监督的情况下进行吗?

谢谢您的宝贵时间。非常感谢任何建议。

python time-series classification unsupervised-learning supervised-learning
4个回答
1
投票

这听起来像是可以通过变化点检测来解决的问题。

ruptures 库将是离线方法的一个很好的起点。

您将找到已知和未知数量的变化点的方法。


0
投票

正如 @mdgorgan 所提供的,除了其他方法(基于 ML 的方法/黑客)之外,您还可以使用这种基于信号处理的方法来执行分割任务。我根据这个文档尝试过:

#install library
#!python -m pip install ruptures

#import libraries
import matplotlib.pyplot as plt 
import ruptures as rpt  

#genearte sample data in form of a signal
n_samples, n_dims, sigma = 1000, 1, 1
n_bkps = 2  # number of breakpoints
signal, bkps = rpt.pw_constant(n_samples, n_dims, n_bkps, noise_std=sigma)
print(bkps) #[340, 671, 1000]

# detection
algo = rpt.Dynp(model="l2").fit(signal)
result = algo.predict(n_bkps=2)

print(result)
#[340, 670, 1000]

# display segmentation
rpt.display(signal, bkps, result)
plt.show()

它根据变化点检测进行分割工作。 img


0
投票

我是 stackoverflow 新手;我很可能遇到了你的问题,我用动态编程(DP)解决了,但在那之后我需要实时进行,DP的问题是当你的数据集变大时变得非常慢,我有超过20000个值,并且完成该算法大约需要7:30小时。但它有效。 所以这是我用于分割的代码:

import csv
import numpy as np
import matplotlib.pyplot as plt

class sls:
    """
    find best least squares fit of multiple segments to the input data
    """
    def __init__(self, filename):
        """
        Inputs:
        filename - text file of pairs of x-y points, one pair per row, 
                   separated by a space
        """
        self.get_data(filename)        
    
    def get_data(self, filename):
        """
        read the data and find the least squares coefficients and errors
        for all possible pairs of start and end points
        """
        with open(filename) as csvfile:
            r = csv.reader(csvfile, delimiter = '\t')
            data = np.array([[float(row[0]), float(row[1])] for row in r])
        
        x = data[:,0]      #data
        y = data[:,1]
        v = np.std(y)**2   #variance of y for caculating penalty
        n = len(x)
        err_arr = np.zeros((n,n))
        a_arr = np.zeros((n,n))
        b_arr = np.zeros((n,n))
    
    #for j  end indices of data, 0 to n-1
    for j in range(n):
        #for i starting indices of this data, 0 to j
        for i in range(j+1):
            #for this segment, i to j, get coefs and error of fit
            a, b, e2 = self.lscoef(x[i:j+1],y[i:j+1])
            
            #store error and coefs for segment i to j in array at [i,j]
            err_arr[i,j] = e2
            a_arr[i,j] = a
            b_arr[i,j] = b
    
    self.x = x
    self.y = y
    self.err_arr = err_arr
    self.a_arr = a_arr 
    self.b_arr = b_arr
    self.v = v

def lscoef(self, x, y):
    """
    least squares fit
    """
    n=len(x)
    if (n == 1):
        return (0.0, y[0], 0.0)
    
    sx = np.sum(x)
    sy = np.sum(y)
    sx2 = np.sum(x ** 2)
    sxy = np.sum(x * y)
    a = (n*sxy-sx*sy)/(n*sx2-sx*sx)    
    b = (sy-a*sx)/n
    e2 = np.sum((y-a*x-b)**2)

    return (a, b, e2)
    
def find_segments(self, max_num_seg=None, desired_penalty=30.0, 
                  penalty_start=0.15, penalty_inc=0.15):
    """
    Find the segments and least squares segment coefficients.
    If desired_penalty is used, find_opt() is called directly (so is faster), 
    otherwise there is a loop over penalty values to find solution that has less 
    than maximum number of segments.
    
    If no parameters input, defaults to desired_penalty of 0.35.
    To set a desired_penalty, do not input max_num_seg to set max_num_seg to None.
    To limit the number of segments, set max_num_seg (desired_penalty ignored),
    and to control the search for the penalty to get the number of segments,
    set penalty_start and penalty_inc. These may depend on the data.
    Inputs
    max_num_seg - set if a limit to the number of segments is desired
    desired_penalty - penalty term to add to error in optimization to all
                       for noise
    penalty_start, penalty_inc - parameter for searching for penalty 
                                 term in case of limiting the number of 
                                 segments
    Returns
    penalty value used
    number of segments used
    """
    if max_num_seg is not None:
        # setting max_num_seg, ignore desired_penalty and check other parameters
        desired_penalty = None
        assert max_num_seg >= 1, 'max_num_seg must be at least 1'
        assert penalty_start is not None and penalty_start > 0, 'penalty_start must not None, > 0'
        assert penalty_inc is not None and penalty_inc > 0, 'penalty_inc must be not None, > 0'
    else:
        # using desired_penalty, check it
        assert desired_penalty is not None and desired_penalty > 0, 'desired_penalty must be not None, > 0'
           
    if desired_penalty:
        # call find_opt directly
        penalty = desired_penalty
        self.find_opt(penalty_factor=penalty)
        num_seg = self.get_num_segments()
    else:
        # loop over penalties to find one that gives number of segments
        # less than max_num_seg
        penalty = penalty_start - penalty_inc
        num_seg = np.inf
        while num_seg > max_num_seg:
            penalty += penalty_inc
            self.find_opt(penalty_factor=penalty)
            num_seg = self.get_num_segments()
    return penalty, num_seg
    
def find_opt(self, penalty_factor):
    """
    dynamic programming segmented least squares
    """
    penalty = self.v * penalty_factor
    n = self.err_arr.shape[0]
    
    #for each end index, get minimum error over possible start indices
    opt_arr = np.zeros(n)
    for j in range(n):
        #for this end index, use min error for previous end index and
        #current error to get errors for all possible start indices
        tmp_opt = np.zeros(j+1)
        tmp_opt[0] = self.err_arr[0,j] + penalty
        for i in range(1,j+1):
            tmp_opt[i] = opt_arr[i-1] + self.err_arr[i,j] + penalty
        opt_arr[j] = np.amin(tmp_opt)
    
    #backtrack from opt_arr[n-1] to get segment and coefficients
    opt_coefs = []
    j = n-1
    while j >= 0:
        tmp_opt = np.zeros(j+1)
        tmp_opt[0] = self.err_arr[0,j] + penalty
        for i in range(1,j+1):
            tmp_opt[i] = opt_arr[i-1] + self.err_arr[i,j] + penalty
        i_opt = np.argmin(tmp_opt)
        a_opt = self.a_arr[i_opt,j]
        b_opt = self.b_arr[i_opt,j]
        
        #set boundaries of interval for these coefs in terms of x
        if i_opt <= 0:
            xmin = np.NINF
        else:
            xmin = (self.x[i_opt-1] + self.x[i_opt])/2
        if j >= n-1:
            xmax = np.Inf
        else:
            xmax = (self.x[j] + self.x[j+1])/2     
        opt_coefs.insert(0, (xmin,xmax,a_opt,b_opt))
        print(xmin)
        j = i_opt-1
        
    self.opt_coefs = opt_coefs
    
def get_num_segments(self):
    return len(self.opt_coefs)
    
def get_fit(self,x):
    #get fit using coefs
    n = len(x)
    yfit = np.zeros(n)
    for opt_coef in self.opt_coefs:
        ind = [i for i,elem in enumerate(x) \
               if elem >= opt_coef[0] and elem <= opt_coef[1]]
        yfit[ind] = x[ind] * opt_coef[2] + opt_coef[3]

    return yfit
    
def plot_fit(self,xplt):
    yfit = self.get_fit(xplt)
    plt.plot(self.x, self.y, '.')
    plt.plot(xplt, yfit)
    plt.show()

然后您必须在其他模块中使用此模块才能进行操作:

from sls_class import sls
from itertools import count
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

data = pd.read_csv("norm1Ac2.txt",sep="\t")#The format of the data is index(0,1,2,3,...)    value(y1,y2,y3,...)
x = data.index
y1 = data.values
y1 = pd.DataFrame(y1)
y1 = y1[1]
s = sls("norm1Ac2.txt")
penalty, num_segments = s.find_segments()
print("penalty = {0}, num segments = {1}".format(penalty, num_segments))
n = len(s.x)
minx = np.amin(s.x)
maxx = np.amax(s.x)
dx = 0.33*(maxx-minx)/(n-1)
xplt = np.arange(minx,maxx+dx,dx)
s.plot_fit(xplt)
plt.plot(x, data["y1"], label='Gas')
plt.plot(xplt,s.get_fit(xplt),"r")    
plt.xticks(rotation=90)
plt.legend(loc='upper left')
plt.tight_layout()
plt.show()

我希望这可以帮助你。

我想知道在获得片段后是否有人有解决方案,我该如何存储它?,比如在该点切断数据集并从切割数据集的下一个点重新运行(认为它是真实运行的)时间)我使用 Animate 形式 matplotlib 来表示实时,但我感觉堆积了 tath 点。感谢您的阅读。


0
投票

如果有人想找到贝叶斯替代方案,Python 中的一种可能是我的贝叶斯变点检测和时间序列分割/分解包:https://pypi.org/project/Rbeast。与非贝叶斯方法不同,它不仅告诉我们何时以及有多少个变化点/段,而且还告诉我们与变化点发生相关的概率(例如,有 3 个变化点的概率是多少……)。从统计上讲,它适合模型 Y = 季节性 + 趋势 + 误差(季节性分量是可选的),其中分段模型用于假定 idd 正态误差的季节性或趋势分量。可以通过

pip install Rbeast
安装。

下面是一个简单的例子:

import Rbeast as rb

nile, year = rb.load_example('nile')                  # an annual time series of streamflow of the River Nile
o          = rb.beast( nile, start=1871, season='none')
rb.plot(o)
rb.print(o)
o                                                     # see a list of output fields in the output variable o

© www.soinside.com 2019 - 2024. All rights reserved.