Scaled Baum-Welch 算法未收敛到合理值

问题描述 投票:0回答:0

我正在按照 Rabiner 的更正说明 编写一个 HMM,我已经实现了

forward
backward
xi
gamma
概率:

def forward_probabilities(initial_state_matrix, transition_matrix,  distributions, observations):
    # number of states & observations
    n_states = len(initial_state_matrix)
    n_obs = len(observations)
    alpha = np.zeros(( n_states, n_obs))
    scale = np.zeros(n_obs)
    # Calculate the initial forward probabilities
    matrix = np.array([distributions[0].prob(observations[0]), distributions[1].prob(observations[0])])
    res = np.multiply(initial_state_matrix , matrix )
    alpha[:, 0] = res
    scale[0] = sum(alpha[:, 0])
    alpha[:,0] = alpha[:,0]/scale[0]
    # Compute the forward probabilities recursively
    for i in range(1, n_obs):
      for j in range(n_states):
        #alpha[t] = np.matmul(np.matmul(alpha[t-1], transition_matrix) , matrix)
        alpha_aux = [alpha[k, i - 1] * distributions[j].prob(observations[i]) * transition_matrix[k, j] for k in
                  range(n_states)]
        alpha[j, i] = sum(alpha_aux)
        scale[i] += alpha[j,i]
      alpha[:,i] = [ alpha[k,i]/scale[i] for k in range(n_states)]

    lik = sum(alpha[:,-1])
    return alpha,scale,lik

def backward_probabilities(scale, initial_state_matrix, transition_matrix,  distributions, observations):
    # number of states & observations
    n_states = len(initial_state_matrix)
    n_obs = len(observations)
    beta = np.zeros(( n_states, n_obs))
    # Calculate the initial backward probabilities
    beta[:,-1] = np.divide([1,1],scale[-1])
    # Compute the backward probabilities recursively
    for i in range(2, n_obs+1):
      for j in range(n_states):
        beta_aux = [beta[k, -i+1] * distributions[k].prob(observations.iat[-i+1]) * transition_matrix[j,k] for k in range(n_states)]
        beta[j, -i] = sum(beta_aux)
      beta[:,-i] = np.divide(beta[:,-i],scale[-i])

    start_state = [beta[k, 0] * distributions[k].prob(observations.iat[0]) for k in range(n_states)]
    start_state = np.multiply(start_state, initial_state_matrix)
    start_state_val = sum(start_state)
    return beta, start_state_val

#Probability of moving from state j to state k in time i
def xi_probabilities(forward, backward, transition_matrix, distributions,observations):

    n_states = forward.shape[0]
    n_observations = forward.shape[1]
    xi = np.zeros((n_states, n_observations-1, n_states))

    for i in range(n_observations-1):
        for j in range(n_states):
            for k in range(n_states):
                xi[j,i,k] = (forward[j,i] * backward[k,i+1] * transition_matrix[j,k]
                             * distributions[k].prob(observations.iat[i+1]))
    return xi

#Probability of being in state j in time i
def gamma_probabilities(xi):
    n_states = xi.shape[0]
    gamma = np.zeros((n_states, xi.shape[1]))

    for t in range(xi.shape[1]):
        for j in range(n_states):
          gamma[j, t] = sum(xi[j,t,:])

    return gamma

然后我计算 baum-welch 算法,试图最大化描述数据的正态分布的期望 (

train["Change"]
)。

n_states = len(initial_state_matrix)
n_obs = len( train["Change"])
log_verosim = np.zeros(100)
for iteration in range(15):
  print('\nIteration No: ', iteration + 1)

  #Calling probability functions to calculate all probabilities
  alf,scale, lik_alpha  = forward_probabilities(initial_state_matrix, transition_matrix, [prior_1,prior_2], train["Change"])
  beta,lik_beta  = backward_probabilities(scale,initial_state_matrix, transition_matrix, [prior_1,prior_2],  train["Change"])
  log_verosim[iteration]  = - np.sum(np.log(scale))
  xi = xi_probabilities(alf,beta, transition_matrix, [prior_1,prior_2], train["Change"])
  gamma = gamma_probabilities(xi)

  a = np.zeros((n_states,n_states))

  # 'pi' matrix
  for j in range(n_states):
    initial_state_matrix[j] = gamma[j,0]/np.sum(gamma[:,0])#Revisar que Gamma no es una probabilidad aqui!

  #'a' matrix
  for j in range(n_states):
    for i in range(n_states):
      for t in range(n_obs-1):
        a[j,i] = a[j,i] + xi[j,t,i]

      denomenator_a = [xi[j, t_x, i_x] for t_x in range(n_obs - 1) for i_x in range(n_states)]
      denomenator_a = sum(denomenator_a)

      if (denomenator_a == 0):
          a[j,i] = 0
      else:
          a[j,i] = a[j,i]/denomenator_a

  #'b' matrix
  mu = np.zeros(n_states)
  sigma = np.zeros(n_states)

  #mu
  for i in range(n_states):
      num = 0
      den = 0
      for t in range(n_obs-1):
          num = num + (gamma[i,t]* train["Change"][t])
          den += gamma[i,t]
      mu[i] = num/den
  #sigma
  for i in range(n_states):
      num = 0
      den = 0
      for t in range(n_obs-1):
          num = gamma[i,t]*((train["Change"][t] - mu[i])** 2)
          den = gamma[i,t]
      sigma[i] = np.sqrt(num/den)

  transition_matrix = a
  prior_1= tfd.Normal(loc=mu[0], scale=sigma[0])
  prior_2= tfd.Normal(loc=mu[1], scale=sigma[1])
  new_alf, new_scale, new_lik_alpha = forward_probabilities(initial_state_matrix, transition_matrix, [prior_1,prior_2],  train["Change"])
  new_log_verosim = - np.sum(np.log(new_scale))
  better_new_log = create_HMM(initial_state_matrix, transition_matrix, [prior_1,prior_2]).log_prob(train["Change"]).numpy()
  diff =  np.abs(log_verosim[iteration] - new_log_verosim)
  print('Difference in forward probability: ', diff)

  if (diff < 0.0000001):
      break

虽然算法很快收敛到一个解,但是得到的transition_matrix是完全错误的:

array([[0.98909289, 0.01090711],
       [1.        , 0.        ]])

证明这一点的是使用相同数据集获得的糟糕的 F 分数。

F1-score: 0.0851063829787234
F2-score: 0.056818181818181816

我真的不知道我做错了什么,但我想这与价值的重新估计有关,因为我比较了

forward
backward
xi
的价值gamma
概率使用Tensorflow的HMM和得到的结果是一样的。 Tensorflow 没有实现 fit 方法,所以我无法评估我的 Baum-Welch 算法。你可以在 github 上找到完整的代码。我也尝试将我的方法与其他方法进行比较,例如 pomegranate,但我并不完全理解它。

tensorflow2.0 hidden-markov-models expectation-maximization pomegranate
© www.soinside.com 2019 - 2024. All rights reserved.