在每个时间步 t,我都会得到一个新的 x_t 和 y_t。一旦有 PERIOD x 和 y 的总数,它将执行普通最小二乘并吐出系数。在代码中,“基准”是 x,“资产”是 y。 (这是来自金融术语)。 online_calc 方法执行实际计算。
class Beta():
def __init__(self, PERIOD):
self.PERIOD = PERIOD
self.stat = None
self.benchmark_vec = []
self.asset_vec = []
def update(self, new_benchmark, new_asset):
"""
assumes that on update, neither new_asset nor new_benchmark are none
"""
if new_benchmark is not None and new_asset is not None:
self.benchmark_vec.append(new_benchmark)
self.asset_vec.append(new_asset)
if len(self.benchmark_vec)>self.PERIOD:
self.benchmark_vec=self.benchmark_vec[1:]
if len(self.asset_vec)>self.PERIOD:
self.asset_vec=self.asset_vec[1:]
if len(self.benchmark_vec)==self.PERIOD and len(self.asset_vec)==self.PERIOD:
self.stat=self.online_calc()
return self.stat
def online_calc(self):
asset_ar = np.array(self.asset_vec)
benchmark_ar = np.array(self.benchmark_vec)
y = asset_ar[1:]/asset_ar[:-1]-1
x = benchmark_ar[1:]/benchmark_ar[:-1]-1
X = x.reshape(-1, 1)
X = np.concatenate([np.ones_like(X), X], axis=1)
b = np.linalg.pinv(X.T.dot(X)).dot(X.T).dot(y)
return b[1]
###### example usage
asset_vec=[ 0.01568653, -0.00669479, 0.01140213, -0.00107317, -0.00131155, -0.00333463,
-0.00114006, 0.00263075, -0.00507337, 0.00712401, 0.00388323]
benchmark_vec=[ 0.01150227, 0.00045742, 0.01114376, -0.00305367, 0.00388245, 0.00323491,
-0.00449446, -0.00075698, -0.01114904, 0.0147878, 0.00528754]
beta = Beta(4)
output = []
for b, a in zip(benchmark_vec, asset_vec):
output.append(beta.update(b, a))
output
我之前使用 scipy.stats.linregress 来获取系数,但它比这慢了大约 50%(我猜测是因为它还计算截距、p 值等)。所以我想知道是否有任何速度增益可以留在桌面上。
编辑:编辑代码以包含示例用法
这是更新的尝试。这使用之前计算的值,因此应该更快。如果内存是一个问题,我们可以通过动态计算来进一步优化它,而不是存储值并调用它们:
class Beta2():
def __init__(self, PERIOD):
self.PERIOD = PERIOD
self.stat = None
self.benchmark_vec = []
self.asset_vec = []
self._x, self._y, self._xx, self._xy = [], [], [], []
self._sum_x, self._sum_y, self._sum_xx, self._sum_xy = 0,0,0,0
self._ln = 0
def update(self, new_benchmark, new_asset):
"""
assumes that on update, neither new_asset nor new_benchmark are none
"""
if new_benchmark is not None and new_asset is not None:
try:
x = new_benchmark/ self.benchmark_vec[-1] - 1
y = new_asset / self.asset_vec[-1] - 1
except:
x, y = 0, 0
self._x.append(x)
self._y.append(y)
self._xx.append(x * x)
self._xy.append(x * y)
self._sum_x += x
self._sum_xx += self._xx[-1]
self._sum_y += y
self._sum_xy += self._xy[-1]
self.benchmark_vec.append(new_benchmark)
self.asset_vec.append(new_asset)
self._ln += 1
if (i := self._ln - self.PERIOD + 1) > 0:
self.online_calc()
self._sum_x -= self._x[i]
self._sum_xx -= self._xx[i]
self._sum_y -= self._y[i]
self._sum_xy -= self._xy[i]
return self.stat
def online_calc(self):
numerator = self._sum_xy - self._sum_x * self._sum_y/(self.PERIOD - 1)
denominator = self._sum_xx - self._sum_x * self._sum_x/(self.PERIOD - 1)
self.stat = numerator/denominator
###### exa
beta2 = Beta2(4)
output2 = []
for b, a in zip(benchmark_vec, asset_vec):
output2.append(beta2.update(b, a))