我正在尝试估计一个自定义的似然函数
import numpy as np
import pandas as pd
from scipy.optimize import minimize
def lik(params):
p_s = params[0]
lamda_s=params[1]
lamda_bl=params[2]
phi_bl=params[3]
p_hat=params[4]
a1_bl = 4 * df["s1"] + 1 * df["s2"] + 6 * df["s3"]
a2_bl = 0 * df["s1"] + 2 * df["s2"] + 8 * df["s3"]
a3_bl = 10 * df["s1"] + 4 * df["s2"] + 0 * df["s3"]
p1_bl = np.exp(lamda_bl*(a1_bl-a2_bl))/(np.exp(lamda_bl * (a1_bl - a2_bl))+1 + np.exp(lamda_bl*(a3_bl-a2_bl)))
p2_bl = 1 / (np.exp(lamda_bl*(a1_bl-a2_bl))+1+np.exp(lamda_bl * (a3_bl-a2_bl)))
p3_bl = np.exp(lamda_bl*(a3_bl-a2_bl))/(np.exp(lamda_bl * (a1_bl - a2_bl))+1 + np.exp(lamda_bl*(a3_bl-a2_bl)))
df['f_bl'] = p1_bl*y1+p2_bl*y2+p3_bl*y3
s1_s = p1_s * p_hat + p1_bl * (1- p_hat)
s2_s = p2_s * p_hat + p2_bl * (1 - p_hat)
s3_s = p3_s * p_hat + p3_bl * (1 - p_hat)
a1_s = 4 * s1_s + 1 * s2_s + 6 * s3_s
a2_s = 0 * s1_s + 2 * s2_s + 8 * s3_s
a3_s = 10 * s1_s + 4 * s2_s + 0 * s3_s
p1_s = np.exp(lamda_s*(a1_s-a2_s))/(np.exp(lamda_s * (a1_s - a2_s)) + 1 + np.exp(lamda_s*(a3_s-a2_s)))
p2_s = 1 / (np.exp(lamda_s*(a1_s-a2_s))+1+np.exp(lamda_s * (a3_s-a2_s)))
p3_s = np.exp(lamda_s*(a3_s-a2_s))/(np.exp(lamda_s * (a1_s - a2_s))+1 + np.exp(lamda_s*(a3_s-a2_s)))
df['f_s'] = p1_s*y1+p2_s*y2+p3_s*y3
pcodes = df["pcode"].unique()
ff_bl = np.array([np.prod(df['f_bl'][df['pcode'] == i]) for i in pcodes])
ff_s = np.array([np.prod(df['f_s'][df['pcode'] == i]) for i in pcodes])
pp = p_s * ff_s + (1-p_s)*ff_bl
li=np.log(pp)
LL=np.sum(li)
return -LL
results = minimize(lik,np.array([0.6, 0.6, 0.1, 0.3,0.2]),method= 'L-BFGS-B')
print(results)
s1_s
、a1_s
和p1_s
是相互依赖的,所以我得到了一个错误UnboundLocalError: local variable 'p1_s' referenced before assignment
。
有没有办法写出这样的似然函数而不需要自己求解?
您需要在循环依赖中添加一个变量系列作为自由度,然后添加一个非线性约束,告诉求解器在保持等式为真的情况下进行优化。
你的程序中有很多代码要么向量化不足,要么向量化错误,但我无法帮助解决其中的大部分,因为你的问题中缺少一堆变量,所以我不得不填写大量变量(“假”)间隙使其可重现。添加合理的界限至关重要。
from functools import partial
import numpy as np
from scipy.optimize import minimize, NonlinearConstraint, Bounds
def unpack_params(
params: np.ndarray,
s1: np.ndarray, s2: np.ndarray, s3: np.ndarray,
) -> tuple[float, ...]:
(
(p_s, lamda_s, lamda_bl, phi_bl, p_hat), p1_s, p2_s, p3_s,
) = np.split(params, (5, 5 + len(s1), 5 + 2*len(s1)))
# inefficient
a1_bl = 4*s1 + 1*s2 + 6*s3
a2_bl = 0*s1 + 2*s2 + 8*s3
a3_bl = 10*s1 + 4*s2 + 0*s3
# inefficient
denb = np.exp(lamda_bl*(a1_bl - a2_bl)) + 1 + np.exp(lamda_bl*(a3_bl - a2_bl))
p1_bl = np.exp(lamda_bl*(a1_bl - a2_bl)) / denb
p2_bl = 1 / denb
p3_bl = np.exp(lamda_bl*(a3_bl - a2_bl)) / denb
return p_s, lamda_s, lamda_bl, phi_bl, p_hat, p1_s, p2_s, p3_s, p1_bl, p2_bl, p3_bl
def likelihood(
params: np.ndarray,
s1: np.ndarray, s2: np.ndarray, s3: np.ndarray,
y1: np.ndarray, y2: np.ndarray, y3: np.ndarray,
pcode: np.ndarray,
) -> float:
(
p_s, lamda_s, lamda_bl, phi_bl, p_hat, p1_s, p2_s, p3_s, p1_bl, p2_bl, p3_bl,
) = unpack_params(params, s1, s2, s3)
f_bl = p1_bl*y1 + p2_bl*y2 + p3_bl*y3
f_s = p1_s *y1 + p2_s *y2 + p3_s *y3
# inefficient
pcodes = np.unique(pcode)
ff_bl = np.array([np.prod(f_bl[pcode == i]) for i in pcodes])
ff_s = np.array([np.prod(f_s [pcode == i]) for i in pcodes])
pp = p_s*ff_s + (1 - p_s)*ff_bl
li = np.log(pp)
ll = li.sum()
return -ll
def constrain_ps(
s1: np.ndarray, s2: np.ndarray, s3: np.ndarray,
params: np.ndarray,
) -> np.ndarray:
(
p_s, lamda_s, lamda_bl, phi_bl, p_hat, p1_s, p2_s, p3_s, p1_bl, p2_bl, p3_bl,
) = unpack_params(params, s1, s2, s3)
# inefficient
s1_s = p1_s*p_hat + p1_bl*(1 - p_hat)
s2_s = p2_s*p_hat + p2_bl*(1 - p_hat)
s3_s = p3_s*p_hat + p3_bl*(1 - p_hat)
# inefficient
a1_s = 4*s1_s + 1*s2_s + 6*s3_s
a2_s = 0*s1_s + 2*s2_s + 8*s3_s
a3_s = 10*s1_s + 4*s2_s + 0*s3_s
# inefficient
dens = np.exp(lamda_s * (a1_s - a2_s)) + 1 + np.exp(lamda_s*(a3_s-a2_s))
p1_sv = np.exp(lamda_s*(a1_s - a2_s))/dens
p2_sv = 1 / dens
p3_sv = np.exp(lamda_s*(a3_s - a2_s))/dens
# inefficient
return np.concatenate((
p1_sv - p1_s,
p2_sv - p2_s,
p3_sv - p3_s,
))
def solve(*args: np.ndarray) -> np.ndarray:
s1, s2, s3, *_ = args
result = minimize(
fun=likelihood, # v BOGUS v
x0=(0.6, 0.6, 0.1, 0.3, 0.2) + (0.5,)*(3*len(s1)),
args=args,
constraints=NonlinearConstraint(
fun=partial(constrain_ps, s1, s2, s3),
lb=0, ub=0,
),
options={'maxiter': 1_000},
bounds=Bounds(lb=0.1, ub=5), # < BOGUS
)
assert result.success, result.message
return result.x
def main() -> None:
rand = np.random.default_rng(seed=0)
s1, s2, s3, y1, y2, y3 = rand.random(size=(6, 10)) # < BOGUS
pcode = rand.integers(low=0, high=10, size=10) # < BOGUS
params = solve(s1, s2, s3, y1, y2, y3, pcode)
(
p_s, lamda_s, lamda_bl, phi_bl, p_hat, p1_s, p2_s, p3_s, p1_bl, p2_bl, p3_bl,
) = unpack_params(params, s1, s2, s3)
print(' p_s =', p_s)
print(' lamda_s =', lamda_s)
print('lamda_bl =', lamda_bl)
print(' phi_bl =', phi_bl)
print(' p_hat =', p_hat)
print('pjs equation errors:')
print(constrain_ps(s1, s2, s3, params))
if __name__ == '__main__':
main()
p_s = 2.8526260821832463
lamda_s = 0.1937634525070506
lamda_bl = 4.999999999999996
phi_bl = 0.29999999999975147
p_hat = 0.10000000000000005
pjs equation errors:
[-4.52609218e-08 -4.37312120e-08 7.77375703e-09 7.74417697e-09
-4.52615372e-08 -4.52612817e-08 1.63845859e-09 -1.49695251e-09
3.55849794e-10 -4.52621541e-08 -3.45946843e-08 -3.28769829e-08
-1.17397136e-09 -1.14484833e-09 -3.45941857e-08 -3.45951420e-08
1.15751003e-09 -3.98884203e-09 -8.80438666e-10 -3.45948470e-08
7.98552173e-08 7.66103062e-08 -6.59930843e-09 -6.59914307e-09
7.98556435e-08 7.98554262e-08 -2.79849960e-09 5.48236018e-09
5.24669364e-10 7.98545330e-08]