Reputation: 137
I'm playing around with a toy discrete HMM model in PyMC3 and I'm running into some issues.
My initial code look like this:
# Known transition and emission
with pymc3.Model() as hmm1:
T = tt.as_tensor_variable(Tr)
E = tt.as_tensor_variable(Er)
# State models
p0 = np.ones(num_states) / num_states
# No shape, so each state is a scalar tensor
states = [pymc3.Categorical('s0', p=p0)]
emissions = [pymc3.Categorical('z0', p=E[:,states[0]], observed=Zr[:,0])]
for i in range(1, num_times):
states.append(pymc3.Categorical('s{0}'.format(i), p=T[:,states[i-1]]))
emissions.append(pymc3.Categorical('z{0}'.format(i), p=E[:,states[i]], observed=Zr[:,i]))
Here Tr
and Er
are the real transition and emission matrices.
My issues are:
This model does not seems to explore the full values for the states, and it stays on a single value for each state (see notebook).
I haven't found a way to define states
and emissions
in a more pymc-tonic way, e.g. using shape=...
.
Furthermore, when I extend the model to account for unknown an transition or emission matrix, I ran into an indexing problem, and I'm forced to used theano.tensor.clip
, as in the next code:
# Unknown transitions and emissions
with pymc3.Model() as hmm3:
# Transition "matrix"
a_t = np.ones((num_states,num_states))
T = pymc3.Dirichlet('T', a=a_t, shape=(num_states,num_states))
# Emission "matrix"
a_e = np.ones((num_emissions, num_states))
E = pymc3.Dirichlet('E', a=a_e, shape=(num_emissions, num_states))
# State models
p0 = np.ones(num_states) / num_states
# No shape, so each state is a scalar tensor
states = [pymc3.Categorical('s0', p=p0)]
clp_state = tt.clip(states[0], 0, num_states-1)
emissions = [pymc3.Categorical('z0', p=E[:,clp_state], observed=Zr[0])]
for i in range(1, num_times):
clp_prev_state = tt.clip(states[i-1], 0, num_states-1)
states.append(pymc3.Categorical('s{0}'.format(i), p=T[:,clp_prev_state]))
clp_state = tt.clip(states[i], 0, num_states-1)
emissions.append(pymc3.Categorical('z{0}'.format(i), p=E[:,clp_state], observed=Zr[i]))
See the following notebook with a complete code.
Upvotes: 1
Views: 2226
Reputation: 100
A more pymc3 way (or alternatively, a more theano way) to define a hidden-markov states model is as follows:
class HMMStatesN(pm.Categorical):
"""
Hidden Markov Model States
Parameters
----------
P : tensor
transition probability
shape = (N_states,N_states)
PA : tensor
equilibrium probabilities
shape = (N_states)
"""
def __init__(self, PA=None, P=None,
*args, **kwargs):
super(pm.Categorical, self).__init__(*args, **kwargs)
self.P = P
self.PA = PA
self.k = N_states
self.mode = tt.cast(0,dtype='int64')
def logp(self, x):
P = self.P
PA = self.PA
# calculate equilibrium
# now we need to create an array with probabilities
# so that for x=A: PA=P1, PB=(1-P1)
# and for x=B: PA=P2, PB=(1-P2)
#P = tt.switch(x[:-1],P1,P2)
PS = P[x[:-1]]
x_i = x[1:]
ou_like = pm.Categorical.dist(PS).logp(x_i)
return pm.Categorical.dist(PA).logp(x[0]) + tt.sum(ou_like)
I also created a similar Gaussian emission class:
class HMMGaussianEmmisionsN(pm.Continuous):
"""
Hidden Markov Model Gaussian Emmisions
Parameters
----------
A : tensor
prior for Gaussian emmision mu
shape = (2,N_states)
S : tensor
prior for Gaussian emmision width
shape = (2,N_states)
states : tensor
equilibrium probabilities
shape = (N_states)
"""
def __init__(self, A=None, S=None, states=None,
*args, **kwargs):
super(HMMGaussianEmmisionsN, self).__init__(*args, **kwargs)
self.A = A
self.S = S
self.states = states
self.mean = 0.
def logp(self, x):
A = self.A
S = self.S
states = self.states
AS = A[states]
SS = S[states]
ou_like = pm.Normal.dist(mu=AS,sd=SS).logp(x)
return tt.sum(ou_like)
That can be called in the following way:
from scipy import optimize
with pm.Model() as model:
# N_states state model
P = pm.Dirichlet('P', a=np.ones((N_states,N_states)), shape=(N_states,N_states))
A = pm.InverseGamma('A',alpha=alphaA, beta=betaA, shape=(N_states))
S = pm.InverseGamma('S', alpha=alphaS, beta=betaS, shape=(N_states))
AA = tt.dmatrix('AA')
AA = tt.eye(N_states) - P + tt.ones(shape=(N_states,N_states))
PA = pm.Deterministic('PA',sla.solve(AA.T,tt.ones(shape=(N_states))))
states = HMMStatesN('states',PA=PA, P=P, shape=(len(measurement)))
emmision = HMMGaussianEmmisionsN('emmision', A=A, S=S, states=states, observed=measurement)
start = pm.find_MAP(fmin=optimize.fmin_powell)
step1 = pm.Metropolis(vars=[P,A,S,PA,emmision])
step2 = pm.CategoricalGibbsMetropolis(vars=[states])
trace = pm.sample(10000, start=start, step=[step1,step2])
I would think that your model could be coded in a similar fashion. It is important that you use the CategoricalGibbsMetropolis stepper (see this question for more detail) for the HMMstates. See a more detailed code example here.
Upvotes: 1