我正在按照 Rabiner 的更正说明 编写一个 HMM,我已经实现了
forward
、backward
、xi
和 gamma
概率:
def forward_probabilities(initial_state_matrix, transition_matrix, distributions, observations):
# number of states & observations
n_states = len(initial_state_matrix)
n_obs = len(observations)
alpha = np.zeros(( n_states, n_obs))
scale = np.zeros(n_obs)
# Calculate the initial forward probabilities
matrix = np.array([distributions[0].prob(observations[0]), distributions[1].prob(observations[0])])
res = np.multiply(initial_state_matrix , matrix )
alpha[:, 0] = res
scale[0] = sum(alpha[:, 0])
alpha[:,0] = alpha[:,0]/scale[0]
# Compute the forward probabilities recursively
for i in range(1, n_obs):
for j in range(n_states):
#alpha[t] = np.matmul(np.matmul(alpha[t-1], transition_matrix) , matrix)
alpha_aux = [alpha[k, i - 1] * distributions[j].prob(observations[i]) * transition_matrix[k, j] for k in
range(n_states)]
alpha[j, i] = sum(alpha_aux)
scale[i] += alpha[j,i]
alpha[:,i] = [ alpha[k,i]/scale[i] for k in range(n_states)]
lik = sum(alpha[:,-1])
return alpha,scale,lik
def backward_probabilities(scale, initial_state_matrix, transition_matrix, distributions, observations):
# number of states & observations
n_states = len(initial_state_matrix)
n_obs = len(observations)
beta = np.zeros(( n_states, n_obs))
# Calculate the initial backward probabilities
beta[:,-1] = np.divide([1,1],scale[-1])
# Compute the backward probabilities recursively
for i in range(2, n_obs+1):
for j in range(n_states):
beta_aux = [beta[k, -i+1] * distributions[k].prob(observations.iat[-i+1]) * transition_matrix[j,k] for k in range(n_states)]
beta[j, -i] = sum(beta_aux)
beta[:,-i] = np.divide(beta[:,-i],scale[-i])
start_state = [beta[k, 0] * distributions[k].prob(observations.iat[0]) for k in range(n_states)]
start_state = np.multiply(start_state, initial_state_matrix)
start_state_val = sum(start_state)
return beta, start_state_val
#Probability of moving from state j to state k in time i
def xi_probabilities(forward, backward, transition_matrix, distributions,observations):
n_states = forward.shape[0]
n_observations = forward.shape[1]
xi = np.zeros((n_states, n_observations-1, n_states))
for i in range(n_observations-1):
for j in range(n_states):
for k in range(n_states):
xi[j,i,k] = (forward[j,i] * backward[k,i+1] * transition_matrix[j,k]
* distributions[k].prob(observations.iat[i+1]))
return xi
#Probability of being in state j in time i
def gamma_probabilities(xi):
n_states = xi.shape[0]
gamma = np.zeros((n_states, xi.shape[1]))
for t in range(xi.shape[1]):
for j in range(n_states):
gamma[j, t] = sum(xi[j,t,:])
return gamma
然后我计算 baum-welch 算法,试图最大化描述数据的正态分布的期望 (
train["Change"]
)。
n_states = len(initial_state_matrix)
n_obs = len( train["Change"])
log_verosim = np.zeros(100)
for iteration in range(15):
print('\nIteration No: ', iteration + 1)
#Calling probability functions to calculate all probabilities
alf,scale, lik_alpha = forward_probabilities(initial_state_matrix, transition_matrix, [prior_1,prior_2], train["Change"])
beta,lik_beta = backward_probabilities(scale,initial_state_matrix, transition_matrix, [prior_1,prior_2], train["Change"])
log_verosim[iteration] = - np.sum(np.log(scale))
xi = xi_probabilities(alf,beta, transition_matrix, [prior_1,prior_2], train["Change"])
gamma = gamma_probabilities(xi)
a = np.zeros((n_states,n_states))
# 'pi' matrix
for j in range(n_states):
initial_state_matrix[j] = gamma[j,0]/np.sum(gamma[:,0])#Revisar que Gamma no es una probabilidad aqui!
#'a' matrix
for j in range(n_states):
for i in range(n_states):
for t in range(n_obs-1):
a[j,i] = a[j,i] + xi[j,t,i]
denomenator_a = [xi[j, t_x, i_x] for t_x in range(n_obs - 1) for i_x in range(n_states)]
denomenator_a = sum(denomenator_a)
if (denomenator_a == 0):
a[j,i] = 0
else:
a[j,i] = a[j,i]/denomenator_a
#'b' matrix
mu = np.zeros(n_states)
sigma = np.zeros(n_states)
#mu
for i in range(n_states):
num = 0
den = 0
for t in range(n_obs-1):
num = num + (gamma[i,t]* train["Change"][t])
den += gamma[i,t]
mu[i] = num/den
#sigma
for i in range(n_states):
num = 0
den = 0
for t in range(n_obs-1):
num = gamma[i,t]*((train["Change"][t] - mu[i])** 2)
den = gamma[i,t]
sigma[i] = np.sqrt(num/den)
transition_matrix = a
prior_1= tfd.Normal(loc=mu[0], scale=sigma[0])
prior_2= tfd.Normal(loc=mu[1], scale=sigma[1])
new_alf, new_scale, new_lik_alpha = forward_probabilities(initial_state_matrix, transition_matrix, [prior_1,prior_2], train["Change"])
new_log_verosim = - np.sum(np.log(new_scale))
better_new_log = create_HMM(initial_state_matrix, transition_matrix, [prior_1,prior_2]).log_prob(train["Change"]).numpy()
diff = np.abs(log_verosim[iteration] - new_log_verosim)
print('Difference in forward probability: ', diff)
if (diff < 0.0000001):
break
虽然算法很快收敛到一个解,但是得到的transition_matrix是完全错误的:
array([[0.98909289, 0.01090711],
[1. , 0. ]])
证明这一点的是使用相同数据集获得的糟糕的 F 分数。
F1-score: 0.0851063829787234
F2-score: 0.056818181818181816
我真的不知道我做错了什么,但我想这与价值的重新估计有关,因为我比较了
forward
,backward
,xi
和的价值gamma
概率使用Tensorflow的HMM和得到的结果是一样的。 Tensorflow 没有实现 fit 方法,所以我无法评估我的 Baum-Welch 算法。你可以在 github 上找到完整的代码。我也尝试将我的方法与其他方法进行比较,例如 pomegranate,但我并不完全理解它。