Javier C.
Javier C.

Reputation: 137

Problems building a discrete HMM in PyMC3

I'm playing around with a toy discrete HMM model in PyMC3 and I'm running into some issues.

My initial code look like this:

# Known transition and emission
with pymc3.Model() as hmm1:
    T = tt.as_tensor_variable(Tr)
    E = tt.as_tensor_variable(Er)
    # State models
    p0 = np.ones(num_states) / num_states
    # No shape, so each state is a scalar tensor
    states = [pymc3.Categorical('s0', p=p0)]
    emissions = [pymc3.Categorical('z0', p=E[:,states[0]], observed=Zr[:,0])]
    for i in range(1, num_times):
        states.append(pymc3.Categorical('s{0}'.format(i), p=T[:,states[i-1]]))
        emissions.append(pymc3.Categorical('z{0}'.format(i), p=E[:,states[i]], observed=Zr[:,i]))

Here Tr and Er are the real transition and emission matrices.

My issues are:

  1. This model does not seems to explore the full values for the states, and it stays on a single value for each state (see notebook).

  2. I haven't found a way to define states and emissions in a more pymc-tonic way, e.g. using shape=....

  3. Furthermore, when I extend the model to account for unknown an transition or emission matrix, I ran into an indexing problem, and I'm forced to used theano.tensor.clip, as in the next code:

    # Unknown transitions and emissions
    with pymc3.Model() as hmm3:
        # Transition "matrix"
        a_t = np.ones((num_states,num_states))
        T = pymc3.Dirichlet('T', a=a_t, shape=(num_states,num_states))
        # Emission "matrix"
        a_e = np.ones((num_emissions, num_states))
        E = pymc3.Dirichlet('E', a=a_e, shape=(num_emissions, num_states))
        # State models
        p0 = np.ones(num_states) / num_states
        # No shape, so each state is a scalar tensor
        states = [pymc3.Categorical('s0', p=p0)]
        clp_state = tt.clip(states[0], 0, num_states-1)
        emissions = [pymc3.Categorical('z0', p=E[:,clp_state], observed=Zr[0])]
        for i in range(1, num_times):
            clp_prev_state = tt.clip(states[i-1], 0, num_states-1)
            states.append(pymc3.Categorical('s{0}'.format(i), p=T[:,clp_prev_state]))
            clp_state = tt.clip(states[i], 0, num_states-1)
            emissions.append(pymc3.Categorical('z{0}'.format(i), p=E[:,clp_state], observed=Zr[i]))
    

See the following notebook with a complete code.

Upvotes: 1

Views: 2226

Answers (1)

Helmut Strey
Helmut Strey

Reputation: 100

A more pymc3 way (or alternatively, a more theano way) to define a hidden-markov states model is as follows:

class HMMStatesN(pm.Categorical):
    """
    Hidden Markov Model States
    Parameters
    ----------
    P : tensor
        transition probability
        shape = (N_states,N_states)

    PA : tensor
         equilibrium probabilities
         shape = (N_states)

    """

    def __init__(self, PA=None, P=None,
                 *args, **kwargs):
        super(pm.Categorical, self).__init__(*args, **kwargs)
        self.P = P
        self.PA = PA
        self.k = N_states
        self.mode = tt.cast(0,dtype='int64')

    def logp(self, x):
        P = self.P
        PA = self.PA

        # calculate equilibrium

        # now we need to create an array with probabilities
        # so that for x=A: PA=P1, PB=(1-P1)
        # and for x=B: PA=P2, PB=(1-P2)
        #P = tt.switch(x[:-1],P1,P2)

        PS = P[x[:-1]]

        x_i = x[1:]
        ou_like = pm.Categorical.dist(PS).logp(x_i)
        return pm.Categorical.dist(PA).logp(x[0]) + tt.sum(ou_like)

I also created a similar Gaussian emission class:

class HMMGaussianEmmisionsN(pm.Continuous):
    """
    Hidden Markov Model Gaussian Emmisions
    Parameters
    ----------
    A : tensor
        prior for Gaussian emmision mu
        shape = (2,N_states)

    S : tensor
        prior for Gaussian emmision width
        shape = (2,N_states)

    states : tensor
         equilibrium probabilities
         shape = (N_states)

    """

    def __init__(self, A=None, S=None, states=None,
                 *args, **kwargs):
        super(HMMGaussianEmmisionsN, self).__init__(*args, **kwargs)
        self.A = A
        self.S = S
        self.states = states
        self.mean = 0.

    def logp(self, x):
        A = self.A
        S = self.S
        states = self.states

        AS = A[states]        
        SS = S[states]

        ou_like = pm.Normal.dist(mu=AS,sd=SS).logp(x)
        return tt.sum(ou_like)

That can be called in the following way:

from scipy import optimize
with pm.Model() as model:
    # N_states state model

    P = pm.Dirichlet('P', a=np.ones((N_states,N_states)), shape=(N_states,N_states))
    A = pm.InverseGamma('A',alpha=alphaA, beta=betaA, shape=(N_states))
    S = pm.InverseGamma('S', alpha=alphaS, beta=betaS, shape=(N_states))

    AA = tt.dmatrix('AA')
    AA = tt.eye(N_states) - P + tt.ones(shape=(N_states,N_states))
    PA = pm.Deterministic('PA',sla.solve(AA.T,tt.ones(shape=(N_states))))

    states = HMMStatesN('states',PA=PA, P=P, shape=(len(measurement)))

    emmision = HMMGaussianEmmisionsN('emmision', A=A, S=S, states=states, observed=measurement)

    start = pm.find_MAP(fmin=optimize.fmin_powell)
    step1 = pm.Metropolis(vars=[P,A,S,PA,emmision])
    step2 = pm.CategoricalGibbsMetropolis(vars=[states])
    trace = pm.sample(10000, start=start, step=[step1,step2])

I would think that your model could be coded in a similar fashion. It is important that you use the CategoricalGibbsMetropolis stepper (see this question for more detail) for the HMMstates. See a more detailed code example here.

Upvotes: 1

Related Questions