-
Notifications
You must be signed in to change notification settings - Fork 0
/
hmm_practice.py
2494 lines (2320 loc) · 119 KB
/
hmm_practice.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
"""
Created on Sat Dec 29 18:29:54 2018
@author: Kedar
"""
import numpy as np
from functools import reduce
from itertools import product
from matplotlib import pyplot as plt
import webbrowser
from datetime import datetime
plt.ioff()
# -----------------------------------------------------------------------------#
def print_matrix(A, name='A', n_indents=1):
"""
given a numpy matrix, A, print it to the screen with nice indentation
"""
# convert input to a numpy array, in case it isn't already
A = np.array(A)
# format and print the input, depending on whether it's an array or scalar
if len(A.shape) >= 2:
print('\n' + n_indents * '\t' + name + ' =\n' + (n_indents + 1) * '\t',
str(A).replace('\n', '\n' + (n_indents + 1) * '\t' + ' '))
else:
print('\n' + n_indents * '\t' + name + ' =', str(A))
# -----------------------------------------------------------------------------#
def extended_exp(x):
'''
this subroutine makes a simple extension to the exp function: whenever a
nan is passed in as an argument, the function returns zero, as opposed to
nan. the assumption here is that in initial nan in the argument is due to
a ln(0) value. see mann, algorithm 1
'''
# use the standard exp function first
exp_x = np.exp(x)
# run through the array and figure out where there are nans or infs
problem_locs = np.argwhere((np.isnan(exp_x)) | (abs(exp_x) == np.inf))
# run through these locations and replace the values with zeros
for loc_array in problem_locs:
# if we're just working with a scalar that's a nan, then just reassign
# the value computed by np.exp directly
if type(exp_x) == np.float64:
exp_x = 0.0
else:
# otherwise, convert the individual arrays to tuples in order to
# use the output of argwhere as array indices (numpy accepts tuples
# as indices)
exp_x[tuple(loc_array)] = 0.0
# return the exponentiated array
return exp_x
# -----------------------------------------------------------------------------#
def extended_ln(x):
'''
this subroutine makes a simple extension to the natural-log function:
whenever a zero is passed in, nan is returned (instead of -inf) without a
runtime warning. whenever a negative number is passed in an error is raised
as opposed to returning nan with a runtime warning. see mann, algorithm 2
'''
# convert the input to a numpy array
x = np.array(x)
# add a dimension, if a scalar has been passed in
if x.size == 1:
x = np.expand_dims(x, axis=0)
# initialize the return array with nans
ln_x = np.nan * np.ones_like(x)
# pull out a tuple with the dimensions of the input matrices
dims = x.shape
# we want to run through the array here, element-by-element. we'll do this
# by first making a list of all the indices that need to be visited, i.e.
# all the indices encompassed by an array with dimensions in dims. this can
# be done in a single line: first, use the map function to create a list of
# range objects corresponding to each of the dimensions found; then, take
# this list of range objects and apply it as an argument to the itertools
# product function. the result is a list of tuples of the indices
indices = list(product(*map(range, dims)))
# run through the arrays, element-by-element
for index in indices:
# pull out the value at this index
x_val = x[index]
# if the element is zero, then return nan
if x_val == 0:
ln_x[index] = np.nan
# if the element is positive, then compute the log as usual
elif x_val > 0:
ln_x[index] = np.log(x_val)
else:
# if the input is negative, then raise an error
if x_val < 0:
raise ValueError('\n\n\tcannot take natural log of a ' + \
'negative value!')
else:
# if it's a nan or an inf or something, raise another error
raise ValueError('\n\n\tcannot take the natural log of ' + \
str(x_val))
# return the scalar or array of logarithms
if x.size == 1:
return float(ln_x.squeeze())
else:
return ln_x
# -----------------------------------------------------------------------------#
def ln_sum(ln_x, ln_y):
'''
given ln(x) and ln(y), this funtion returns ln(x+y) in a way that avoid
numerical underflow/overflow issues. if either ln_x or ln_y is nan, then
then the non-nan input is returned, i.e. ln(x+y) = ln(y) if ln(x) = nan.
if both ln_x and ln_y are positive, then the log-sum-exp trick is used to
compute ln(x+y), i.e. ln(x+y) = ln(x) + ln(1+e^(ln(y)-ln(x))), where ln(x)
is bigger than ln(y). see mann, algorithm 3
'''
# convert the inputs to a numpy arrays
ln_x = np.array(ln_x)
ln_y = np.array(ln_y)
# to each input, add a dimension, if a scalar has been passed in
if ln_x.size == 1:
ln_x = np.expand_dims(ln_x, axis=0)
if ln_y.size == 1:
ln_y = np.expand_dims(ln_y, axis=0)
# make sure the two inputs are the same shape
if not ln_x.shape == ln_y.shape:
raise ValueError('\n\n\tcannot take a product of two arrays of ' + \
'different shapes: \n\tln_x.shape = ' + \
str(ln_x.shape) + '\n\tln_y.shape =' + str(
ln_y.shape))
# create an array to store the product, initialize it with nans
ln_x_plus_y = np.nan * np.ones_like(ln_x)
# pull out a tuple with the dimensions of the input matrices
dims = ln_x.shape
# we want to run through the arrays here, element-by-element. we'll do this
# by first making a list of all the indices that need to be visited, i.e.
# all the indices encompassed by an array with dimensions in dims. this can
# be done in a single line: first, use the map function to create a list of
# range objects corresponding to each of the dimensions found; then, take
# this list of range objects and apply it as an argument to the itertools
# product function. the result is a list of tuples of the indices
indices = list(product(*map(range, dims)))
# run through the arrays, element-by-element
for index in indices:
# pull out and store the values from each array at this index
ln_x_val = ln_x[index]
ln_y_val = ln_y[index]
# if both inputs are nan, return nan
if np.isnan(ln_x_val) and np.isnan(ln_y_val):
ln_x_plus_y_val = np.nan
# if one of the inputs is nan, then reutrn the other
elif np.isnan(ln_x_val):
ln_x_plus_y_val = ln_y_val
elif np.isnan(ln_y_val):
ln_x_plus_y_val = ln_x_val
# in the case where both arguments aren't nans, then figure out which
# one is larger and then use the log-sum-exp trick
elif ln_x_val > ln_y_val:
ln_x_plus_y_val = ln_x_val + extended_ln(
1 + np.exp(ln_y_val - ln_x_val))
else:
ln_x_plus_y_val = ln_y_val + extended_ln(
1 + np.exp(ln_x_val - ln_y_val))
# record the sum value found
ln_x_plus_y[index] = ln_x_plus_y_val
# return either the resulting array or scalar
if ln_x_plus_y.size == 1:
return float(ln_x_plus_y.squeeze())
else:
return ln_x_plus_y
# -----------------------------------------------------------------------------#
def ln_product(ln_x, ln_y):
'''
given ln(x) and ln(y), this funtion returns ln(xy). if either of the inputs
are nan, then the returned value is nan. see mann, algorithm 4
'''
# convert the inputs to a numpy arrays
ln_x = np.array(ln_x)
ln_y = np.array(ln_y)
# to each input, add a dimension, if a scalar has been passed in
if ln_x.size == 1:
ln_x = np.expand_dims(ln_x, axis=0)
if ln_y.size == 1:
ln_y = np.expand_dims(ln_y, axis=0)
# make sure the two inputs are the same shape
if not ln_x.shape == ln_y.shape:
raise ValueError('\n\n\tcannot take a product of two arrays of ' + \
'different shapes: \n\tln_x.shape = ' + \
str(ln_x.shape) + '\n\tln_y.shape =' + str(
ln_y.shape))
# create an array to store the product, initialize it with nans
ln_xy = np.nan * np.ones_like(ln_x)
# pull out a tuple with the dimensions of the input matrices
dims = ln_x.shape
# we want to run through the arrays here, element-by-element. we'll do this
# by first making a list of all the indices that need to be visited, i.e.
# all the indices encompassed by an array with dimensions in dims. this can
# be done in a single line: first, use the map function to create a list of
# range objects corresponding to each of the dimensions found; then, take
# this list of range objects and apply it as an argument to the itertools
# product function. the result is a list of tuples of the indices
indices = list(product(*map(range, dims)))
# run through the arrays, element-by-element
for index in indices:
# if both inputs are valid, i.e. not nans, then compute the product
# using standard logarithm rules
if not np.isnan(ln_x[index]) and not np.isnan(ln_y[index]):
ln_xy[index] = ln_x[index] + ln_y[index]
# return either the resulting array or scalar
if ln_xy.size == 1:
return float(ln_xy.squeeze())
else:
return ln_xy
# -----------------------------------------------------------------------------#
def forward_basic(O, A, B, pi, V, return_matrix=False):
'''
compute the likelihood (i.e. the probability) of the given sequence, O,
with respect to the hidden markov model defined by the transition
probabilities in A and the emission probabilites in B. do so using the
forward algorithm. return the forward-path probabilities, if desired
- input:
- O: the observation sequence under consideration
- A: the matrix of transition probabilities
- B: the matrix of emission probabilities
- pi: the vector of the initial-state probabilities
- V: the vocabulary of ordered possible observations
- return_matrix: return the matrix of forward-path probabilities (alpha)
- output:
- P_O: the likelihood of the given sequence of observations
- alpha: N-by-T matrix of forward-path probabilities
'''
# extract the number of states and the number of time steps
N = A.shape[0]
T = len(O)
# create a matrix to hold the forward path probabilities
alpha = np.zeros((N, T))
# fill in the first column of matrix with the initial values for the
# forward path probabilties
first_o_index = V.index(O[0])
alpha[:, 0] = np.multiply(pi.reshape((1, N)), B[:, first_o_index])
# using recursion, fill in the rest of the forward-path probs
for j in range(1, T):
# find the index of the observation at time step j
o_index = V.index(O[j])
# walk down the j-th column
for i in range(N):
# compute alpha value by summing over all possible previous states
# N.B. alpha[i,j] is the probability of being in the i-th state at
# the j-th time step and seeing the first j observations
for i_previous in range(N):
alpha[i, j] += alpha[i_previous, j - 1] * A[i_previous, i] * B[
i, o_index]
# the probability of the entire sequence is found by summing down the last
# column of the matrix of forward-path probabilities
P_O = np.sum(alpha[:, -1])
# return the probability of the observation sequence and, if desired, the
# matrix of forward-path probabilities
if return_matrix:
return P_O, alpha
else:
return P_O
# --------------------------------------------------------------------------- #
def forward_log(O, A, B, pi, V, return_matrix=False):
'''
compute the log likelihood (i.e. the natural log of the probability) of the
given sequence, O, with respect to the hidden markov model defined by the
transition probabilities in A and the emission probabilites in B. do so
using the forward algorithm. return the matrix of the log of the forward-
path probabilities, if desired
- input:
- O: the observation sequence under consideration
- A: the matrix of transition probabilities
- B: the matrix of emission probabilities
- pi: the vector of the initial-state probabilities
- V: the vocabulary of ordered possible observations
- return_matrix: return the matrix of forward-path probabilities (ln_alpha)
- output:
- ln_P_O: the likelihood of the given sequence of observations
- ln_alpha: N-by-T matrix of the log of that forward-path probs
'''
# extract the number of states and the number of time steps
N = A.shape[0]
T = len(O)
# create a matrix to hold the logs of the forward path probabilities
ln_alpha = np.zeros((N, T))
# reshape the initial-state probabilities if it's not a 1D array
if len(pi.shape) > 1:
pi = pi.squeeze()
# fill in the first column of matrix with the initial values for the
# forward path probabilities
first_o_index = V.index(O[0])
ln_alpha[:, 0] = np.log(pi) + np.log(B[:, first_o_index])
# using recursion, fill in the rest of the logs of the forward-path probs
for j in range(1, T):
# find the index of the observation at time step j
o_index = V.index(O[j])
# walk down the j-th column
for i in range(N):
# compute and store the logs of the probabilities of getting to
# state i at time j from each possible state at time j-1 (ignore
# the probability of seeing the observation at time j, for now, but
# do consider the probability associated with seeing the sequence
# of observations up until time j-1)
ln_prob_from_previous = ln_alpha[:, j - 1] + np.log(A[:, i])
# pull out the maximum value of these log probabilities
max_ln_prob = np.max(ln_prob_from_previous)
# apply the log-sum-exp trick. ln_alpha[i,j] is the natural log of
# the joint probability of being in the i-th state at the j-th time
# step and seeing the first j observations
ln_alpha[i, j] = np.log(B[i, o_index]) + max_ln_prob + \
np.log(np.sum(
np.exp(ln_prob_from_previous - max_ln_prob)))
# the natural log of the probability of the entire sequence is found by
# exponentiating and then summing down the last column of the matrix of the
# natural logs of the forward-path probabilities
ln_P_O = np.log(np.sum(np.exp(ln_alpha[:, -1])))
# return the log of the probability of the observation sequence and, if
# desired, the matrix of natural logs of the forward-path probabilities
if return_matrix:
return ln_P_O, ln_alpha
else:
return ln_P_O
# -----------------------------------------------------------------------------#
def forward(O, A, B, pi, V, return_matrix=False):
'''
compute the log likelihood (i.e. the natural log of the probability) of the
given sequence, O, with respect to the hidden markov model defined by the
transition probabilities in A and the emission probabilites in B. do so
using the forward algorithm and the extended subroutines suggested by mann,
algorithm 5. return the matrix of the log of the forward-path
probabilities, if desired
- input:
- O: the observation sequence under consideration
- A: the matrix of transition probabilities
- B: the matrix of emission probabilities
- pi: the vector of the initial-state probabilities
- V: the vocabulary of ordered possible observations
- return_matrix: return matrix of forward-path probabilities (ln_alpha)
- output:
- ln_P_O: the likelihood of the given sequence of observations
- ln_alpha: N-by-T matrix of the log of that forward-path probs
'''
# extract the number of states and the number of time steps
N = A.shape[0]
T = len(O)
# create a matrix to hold the logs of the forward path probabilities,
# initialize the matrix to hold all nans
ln_alpha = np.nan * np.ones((N, T))
# reshape the initial-state probabilities if it's not a 1D array
if len(pi.shape) > 1:
pi = pi.squeeze()
# fill in the first column of matrix with the initial values for the
# forward path probabilties
first_o_index = V.index(O[0])
ln_alpha[:, 0] = ln_product(extended_ln(pi),
extended_ln(B[:, first_o_index]))
# using recursion, fill in the rest of the logs of the forward-path probs
for j in range(1, T):
# find the index of the observation at time step j
o_index = V.index(O[j])
# walk down the j-th column
for i in range(N):
# partially compute the log of the alpha value by summing over all
# possible previous states. N.B. ln_alpha[i,j] is the log
# probability of being in the i-th state at the j-th time step and
# seeing the first j observations. the summation only runs over the
# transition probabilities, so start with that
for i_previous in range(N):
ln_alpha[i, j] = ln_sum(ln_alpha[i, j],
ln_product(ln_alpha[i_previous, j - 1],
extended_ln(
A[i_previous, i])))
# now, include the probability of the seeing the i-th observation
ln_alpha[i, j] = ln_product(ln_alpha[i, j],
extended_ln(B[i, o_index]))
# the natural log of the probability of the entire sequence is found by
# exponentiating and then summing down the last column of the matrix of the
# natural logs of the forward-path probabilities
ln_P_O = extended_ln(np.sum(extended_exp(ln_alpha[:, -1])))
# return the log of the probability of the observation sequence and, if
# desired, the matrix of natural logs of the forward-path probabilities
if return_matrix:
return ln_P_O, ln_alpha
else:
return ln_P_O
# -----------------------------------------------------------------------------#
def viterbi_basic(O, A, B, pi, V, Q):
'''
given a sequence of observations, O, this subroutine finds the most
probable sequence of hidden states that could have generated it, based on
the hidden markov model defined by the transition probabilities, the
emission probabilities, B, and the initial-state probabilities, pi. the
best path is found using the viterbi algorithm.
- input:
- O: the observation sequence under consideration
- A: the matrix of transition probabilities
- B: the matrix of emission probabilities
- pi: the vector of the initial-state probabilities
- V: the vocabulary of ordered possible observations
- Q: set of possible states
- output:
- best_path: the most likely sequence of hidden states
- P_best_path: the probability of the best path
'''
# extract the number of states and the number of time steps
N = A.shape[0]
T = len(O)
# initialize matrices to hold viterbi probabilities and backpointers
viterbi_probs = np.zeros((N, T))
backpointer = np.zeros((N, T))
# fill in the first column of the viterbi matrix
first_o_index = V.index(O[0])
viterbi_probs[:, 0] = np.multiply(pi, B[:, first_o_index])
# the first column of the backpointer matrix is not defined, since there is
# no previous hidden state. fill with nans
backpointer[:, 0] = np.nan * np.ones(N)
# using recursion, fill in the rest of the previous viterbi probabilities,
# along the way, record the corresponding backpointers
for j in range(1, T):
# find the index of the observation at time step j
o_index = V.index(O[j])
# walk down the j-th column
for i in range(N):
# compute the viterbi probabilty by first computing the probability
# of having come from each of the previous states, then taking the
# max
possible_path_probs = np.zeros(N)
for i_previous in range(N):
possible_path_probs[i_previous] = viterbi_probs[
i_previous, j - 1] * A[
i_previous, i] * B[
i, o_index]
viterbi_probs[i, j] = np.max(possible_path_probs)
# the corresponding backpointer index is simply the argmax of the
# list of computed probabilties
backpointer[i, j] = np.argmax(possible_path_probs)
print_matrix(viterbi_probs, name='viterbi_probs')
print_matrix(extended_ln(viterbi_probs), name='extended_ln(viterbi_probs)')
# compute the probability of the most-likely path
P_best_path = np.max(viterbi_probs[:, -1])
# compute the last backpointer
final_backpointer = np.argmax(viterbi_probs[:, -1])
# find the most likely hidden-state sequence by starting with the final
# backpointer and working backwards in time (column by column) through the
# backpointer matrix to recover the index of the most-likely previous state
reverse_best_path_indices = [final_backpointer]
for j in range(T - 1, 0, -1):
current_best_index = int(reverse_best_path_indices[-1])
previous_state_index = int(backpointer[current_best_index, j])
reverse_best_path_indices.append(previous_state_index)
# running backwards through the reversed list of best-path indices, make a
# list of the best path through the hidden states running forward in time
best_path = []
for j in range(T - 1, -1, -1):
best_path.append(Q[reverse_best_path_indices[j]])
# return the best path and the its likelihood
return best_path, P_best_path
# -----------------------------------------------------------------------------#
def viterbi_log(O, A, B, pi, V, Q):
'''
given a sequence of observations, O, this subroutine finds the most
probable sequence of hidden states that could have generated it, based on
the hidden markov model defined by the transition probabilities, the
emission probabilities, B, and the initial-state probabilities, pi. the
best path is found using the viterbi algorithm. in contrast to
viterbi_basic, this subroutine works with the natural logarithms of the
viterbi probabilities, thereby avoiding numerical underflow
- input:
- O: the observation sequence under consideration
- A: the matrix of transition probabilities
- B: the matrix of emission probabilities
- pi: the vector of the initial-state probabilities
- V: the vocabulary of ordered possible observations
- Q: set of possible states
- output:
- best_path: the most likely sequence of hidden states
- ln_P_best_path: the natural log of the probability of the best path
'''
# extract the number of states and the number of time steps
N = A.shape[0]
T = len(O)
# initialize matrices to hold the natural logarithms of the viterbi
# probabilities, phi, and the backpointers
phi = np.zeros((N, T))
backpointer = np.zeros((N, T))
# fill in the first column of the phi matrix
first_o_index = V.index(O[0])
phi[:, 0] = np.log(pi) + np.log(B[:, first_o_index])
# the first column of the backpointer matrix is not defined, since there is
# no previous hidden state. fill with nans
backpointer[:, 0] = np.nan * np.ones(N)
# using recursion, fill in the rest of the natural logs of the viterbi
# probabilities, along the way, record the corresponding backpointers
for j in range(1, T):
# find the index of the observation at time step j
o_index = V.index(O[j])
# walk down the j-th column
for i in range(N):
# compute the natural log of the viterbi probabilty by first
# computing the natural log of the probability of having come from
# each of the previous states, then taking the maximum. the
# corresponding backpointer index is simply the argmax of the list
# of computed probabilties
ln_possible_path_probs = np.zeros(N)
for i_previous in range(N):
ln_possible_path_probs[i_previous] = phi[i_previous, j - 1] + \
np.log(A[i_previous, i]) + \
np.log(B[i, o_index])
phi[i, j] = np.max(ln_possible_path_probs)
backpointer[i, j] = np.argmax(ln_possible_path_probs)
# compute the natural log of the probability of the most-likely path and
# the last backpointer
ln_P_best_path = np.max(phi[:, -1])
final_backpointer = np.argmax(phi[:, -1])
# find the most likely hidden-state sequence by starting with the final
# backpointer and working backwards in time (column by column) through the
# backpointer matrix to recover the index of the most-likely previous state
reverse_best_path_indices = [final_backpointer]
for j in range(T - 1, 0, -1):
current_best_index = int(reverse_best_path_indices[-1])
previous_state_index = int(backpointer[current_best_index, j])
reverse_best_path_indices.append(previous_state_index)
# running backwards through the reversed list of best-path indices, make a
# list of the best path through the hidden states running forward in time
best_path = []
for j in range(T - 1, -1, -1):
best_path.append(Q[reverse_best_path_indices[j]])
# return the best path and the its likelihood
return best_path, ln_P_best_path
# -----------------------------------------------------------------------------#
def viterbi(O, A, B, pi, V, Q):
'''
given a sequence of observations, O, this subroutine finds the most
probable sequence of hidden states that could have generated it, based on
the hidden markov model defined by the transition probabilities, the
emission probabilities, B, and the initial-state probabilities, pi. the
best path is found using the viterbi algorithm. in contrast to
viterbi_basic, this subroutine works with the natural logarithms of the
viterbi probabilities, using the extended logarithmic subroutines of mann,
thereby avoiding numerical underflow
- input:
- O: the observation sequence under consideration
- A: the matrix of transition probabilities
- B: the matrix of emission probabilities
- pi: the vector of the initial-state probabilities
- V: the vocabulary of ordered possible observations
- Q: set of possible states
- output:
- best_path: the most likely sequence of hidden states
- ln_P_best_path: the natural log of the probability of the best path
'''
# extract the number of states and the number of time steps
N = A.shape[0]
T = len(O)
# initialize matrices to hold the natural logarithms of the viterbi
# probabilities, phi, and the backpointers
phi = np.zeros((N, T))
backpointer = np.zeros((N, T))
# fill in the first column of the phi matrix
first_o_index = V.index(O[0])
phi[:, 0] = ln_product(extended_ln(pi), extended_ln(B[:, first_o_index]))
# the first column of the backpointer matrix is not defined, since there is
# no previous hidden state. fill with nans
backpointer[:, 0] = np.nan * np.ones(N)
# using recursion, fill in the rest of the natural logs of the viterbi
# probabilities, along the way, record the corresponding backpointers
for j in range(1, T):
# find the index of the observation at time step j
o_index = V.index(O[j])
# walk down the j-th column
for i in range(N):
# compute the natural log of the viterbi probabilty by first
# computing the natural log of the probability of having come from
# each of the previous states, then taking the maximum
ln_possible_path_probs = np.zeros(N)
for i_previous in range(N):
ln_possible_path_probs[i_previous] = reduce(ln_product,
[phi[
i_previous, j - 1],
extended_ln(A[
i_previous, i]),
extended_ln(B[
i, o_index])])
# to treat the case where there might be nans in the array of
# possible path probabilities, replace them with -inf values. why?
# because max and np.max both return nan if there's even a single
# nan in the array. -inf behaves differently
nan_locs = np.argwhere(np.isnan(ln_possible_path_probs))
ln_possible_path_probs[nan_locs] = -np.inf
# pull out the highest possible-path probability
phi[i, j] = np.max(ln_possible_path_probs)
# the corresponding backpointer index is simply the argmax of the
# list of computed probabilties
backpointer[i, j] = np.argmax(ln_possible_path_probs)
# compute the natural log of the probability of the most-likely path and
# the last backpointer
ln_P_best_path = np.max(phi[:, -1])
final_backpointer = np.argmax(phi[:, -1])
# find the most likely hidden-state sequence by starting with the final
# backpointer and working backwards in time (column by column) through the
# backpointer matrix to recover the index of the most-likely previous state
reverse_best_path_indices = [final_backpointer]
for j in range(T - 1, 0, -1):
current_best_index = int(reverse_best_path_indices[-1])
previous_state_index = int(backpointer[current_best_index, j])
reverse_best_path_indices.append(previous_state_index)
# running backwards through the reversed list of best-path indices, make a
# list of the best path through the hidden states running forward in time
best_path = []
for j in range(T - 1, -1, -1):
best_path.append(Q[reverse_best_path_indices[j]])
# return the best path and the its likelihood
return best_path, ln_P_best_path
# -----------------------------------------------------------------------------#
def backward_basic(O, A, B, pi, V, return_matrix=False):
'''
compute the likelihood (i.e. the probability) of the given sequence, O,
with respect to the hidden markov model defined by the transition
probabilities in A and the emission probabilites in B. do so using the
backward algorithm. return the backward probabilities, if desired
- input:
- O: the observation sequence under consideration
- A: the matrix of transition probabilities
- B: the matrix of emission probabilities
- pi: the vector of the initial-state probabilities
- V: the vocabulary of ordered possible observations
- return_matrix: return the matrix of backward probabilities (beta)
- output:
- P_O: the likelihood of the given sequence of observations
- beta: N-by-T matrix of backward probabilities
'''
# extract the number of states and the number of time steps
N = A.shape[0]
T = len(O)
# create a matrix to hold the backward probabilities
beta = np.zeros((N, T))
# initialize the last column of the matrix of backward probabilties. N.B.
# the backward probability beta[i,j] is the probability of seeing the
# observation sequence o[j+1], o[j+2],..., o[T-1] given that you are in
# state i at time j
beta[:, -1] = np.ones(N)
# working backwards using recursion, fill in the rest of the backward probs
for j in range(T - 2, -1, -1):
# find the index of the observation at time step j+1 (next time step)
o_next_index = V.index(O[j + 1])
# walk down the j-th column
for i in range(N):
# compute the beta value by summing over all possible next states
# N.B. beta[i,j] is the probability of seeing the observation
# sequence o[j+1], o[j+2],..., o[T-1] given that you are in state i
# at time j
for i_next in range(N):
beta[i, j] += A[i, i_next] * B[i_next, o_next_index] * beta[
i_next, j + 1]
# the probability of the entire sequence is found by propagating the values
# in the first column of beta back to the beginning of the sequence using
# the initial probability distribution over the states and the emission
# probabilities
P_O = 0.0
first_o_index = V.index(O[0])
for i in range(N):
P_O += pi[i] * B[i, first_o_index] * beta[i, 0]
# return the probability of the observation sequence and, if desired, the
# matrix of backward probabilities
if return_matrix:
return P_O, beta
else:
return P_O
# -----------------------------------------------------------------------------#
def backward_log(O, A, B, pi, V, return_matrix=False):
'''
compute the log likelihood (i.e. the natural log of the probability) of the
given sequence, O, with respect to the hidden markov model defined by the
transition probabilities in A, the emission probabilites in B, and the
initial-state probabilities in pi. do so using the backward algorithm.
return the backward log probabilities too, if desired
- input:
- O: the observation sequence under consideration
- A: the matrix of transition probabilities
- B: the matrix of emission probabilities
- pi: the vector of the initial-state probabilities
- V: the vocabulary of ordered possible observations
- return_matrix: return the matrix of backward probabilities (beta)
- output:
- ln_P_O: the log likelihood of the sequence of observations
- ln_beta: N-by-T matrix of backward log probabilities
'''
# extract the number of states and the number of time steps
N = A.shape[0]
T = len(O)
# create a matrix to hold the natural logs of the backward probabilities.
# initializing with zeros here automatically sets the last column of the
# matrix of backward log probabilties. N.B. the backward log probability
# ln_beta[i,j] is the natural logarithm of the probability of seeing the
# observation sequence o[j+1], o[j+2],..., o[T-1] given that you are in
# state i at time j. the backward probabilities at the last time step are
# all one, since the probability of seeing nothing afterwards is certain,
# irrespective of final state. the natural log of one, is zero. so, the
# final row has already been filled
ln_beta = np.zeros((N, T))
# using recursion, backfill the rest of the backward log probabilities
for j in range(T - 2, -1, -1):
# find the index of the observation at time step j+1 (next time step)
o_next_index = V.index(O[j + 1])
# walk down the j-th column
for i in range(N):
# compute and store the log probabilities of going from state i at
# time j to each of the N possible states at time j+1, emitting the
# (j+1)th observation, as well as all the subsequent observations
# after that
ln_probs_to_next = np.log(A[i, :].T) + np.log(B[:, o_next_index]) + \
ln_beta[:, j + 1]
# applying the log-sum-exp trick, compute the max log-prob value
max_ln_prob = np.max(ln_probs_to_next)
# apply the log-sum-exp trick. ln_beta[i,j] is the natural log of
# the joint probability of seeing the observation sequence o[j+1],
# o[j+2],..., o[T-1] given that you are in state i at time j
ln_beta[i, j] = max_ln_prob + \
np.log(
np.sum(np.exp(ln_probs_to_next - max_ln_prob)))
# the log probability of the entire sequence is found by propagating the
# values in the first column of ln_beta back to the beginning of the
# sequence using the initial probability distribution over the states and
# the emission probabilities
first_o_index = V.index(O[0])
ln_P_O = np.log(np.sum(
np.exp(np.log(pi) + np.log(B[:, first_o_index]) + ln_beta[:, 0])))
# return the log probability of the observation sequence and, if desired,
# the matrix of backward log probabilities
if return_matrix:
return ln_P_O, ln_beta
else:
return ln_P_O
# -----------------------------------------------------------------------------#
def backward(O, A, B, pi, V, return_matrix=False):
'''
using algorithm 6 in mann, compute the log likelihood (i.e. the natural log
of the probability) of the given sequence, O, with respect to the hidden
markov model defined by the transition probabilities in A, the emission
probabilites in B, and the initial-state probabilities in pi. do so using
the backward algorithm. return the backward log probabilities, if desired
- input:
- O: the observation sequence under consideration
- A: the matrix of transition probabilities
- B: the matrix of emission probabilities
- pi: the vector of the initial-state probabilities
- V: the vocabulary of ordered possible observations
- return_matrix: return the matrix of backward probabilities (beta)
- output:
- ln_P_O: the log likelihood of the sequence of observations
- ln_beta: N-by-T matrix of backward log probabilities
'''
# extract the number of states and the number of time steps
N = A.shape[0]
T = len(O)
# create a matrix to hold the natural logs of the backward probabilities.
# initializing with zeros here automatically sets the last column of the
# matrix of backward log probabilties. N.B. the backward log probability
# ln_beta[i,j] is the natural logarithm of the probability of seeing the
# observation sequence o[j+1], o[j+2],..., o[T-1] given that you are in
# state i at time j. the backward probabilities at the last time step are
# all one, since the probability of seeing nothing afterwards is certain,
# irrespective of final state. the natural log of one, is zero. so, the
# final row has already been filled
ln_beta = np.zeros((N, T))
# using recursion, backfill the rest of the backward log probabilities
for j in range(T - 2, -1, -1):
# find the index of the observation at time step j+1 (next time step)
o_next_index = V.index(O[j + 1])
# walk down the j-th column
for i in range(N):
# initialize a list to hold the probabilities of moving to each of
# the hidden states at the next time instance
ln_probs_to_next = []
# run though the hidden states that can be transitioned to
for i_next in range(N):
# compute the probability of moving to the particular next
# hidden state and observing all the subsequent observations
ln_prob_to_next = reduce(ln_product,
[extended_ln(A[i, i_next]),
ln_beta[i_next, j + 1],
extended_ln(
B[i_next, o_next_index])])
# store that probability
ln_probs_to_next.append(ln_prob_to_next)
# now, compute the log of the beta value by summing over the log
# probabilities of transitioning to each of the next possible
# hidden states and seeing all the subsequent observations. N.B.
# beta[i,j] is the probability of seeing the observation sequence
# o[j+1], o[j+2],..., o[T-1] given that you're in state i at time j
ln_beta[i, j] = reduce(ln_sum, ln_probs_to_next)
# the log probability of the entire sequence is found by propagating the
# values in the first column of ln_beta back to the beginning of the
# sequence using the initial probability distribution over the states and
# the emission probabilities
first_o_index = V.index(O[0])
ln_P_O = extended_ln(
np.sum(pi * B[:, first_o_index] * extended_exp(ln_beta[:, 0])))
# return the log probability of the observation sequence and, if desired,
# the matrix of backward log probabilities
if return_matrix:
return ln_P_O, ln_beta
else:
return ln_P_O
# -----------------------------------------------------------------------------#
def check_alpha_beta(alpha, beta):
'''
this function checks to make sure the matrix alpha (holding the forward
probabilities) and the matrix beta (holding the backward probabilties) have
been computed correctly. this is checked T separate times. how? because
when both the forward and backward probabilities are known for each state
at a given time instance, the probability of the entire observation
sequence (assuming it passes through state i) can be computed. adding up
the product of these probabilities across all states at a given time step
yields the likelihood of the entire observation sequence, P(O). since there
are T time steps in the sequence, P(O) can be computed T times. return a
boolean stating whether or not the matrices have been computed correctly
'''
# compute the likelihood of the entire observation sequence at each time
P_O_values = np.sum(np.multiply(alpha, beta), axis=0)
# compute the discrepancies with respect to the first P(O) value
discrepancies = P_O_values - P_O_values[0]
# check to make sure all the discrepancies are zero
if abs(np.sum(discrepancies)) < len(discrepancies) * np.finfo('float').eps:
matrices_correct = True
else:
matrices_correct = False
# return the boolean value
return matrices_correct
# -----------------------------------------------------------------------------#
def check_ln_alpha_ln_beta_log(ln_alpha, ln_beta):
'''
this function checks to make sure the matrix ln_alpha (holding the forward
log probabilities) and the matrix ln_beta (holding the backward log
probabilties) have been computed correctly. this is checked T separate
times. how? because when both the forward and backward log probabilities
are known for each state at a given time instance, the log probability of
the entire observation sequence (assuming it passes through state i) can be
computed. the log likelihood of the entire observation sequence, ln(P(O)),
can be found by adding the two matrices together and then using the log-
sum-exp trick. since there are T time steps in the sequence, ln(P(O)) can
be computed T times. all T values should be the same. return a boolean
stating whether or not the matrices have been computed correctly
'''
# add together the corresponding forward and backward log probabilities
ln_P_O_and_i = ln_alpha + ln_beta
# compute the max log probability per time step
max_ln_P_O_per_t = np.max(ln_P_O_and_i, axis=0)
# using the log-sum-exp trick, compute the log likelihood of the sequence
# of observations at each time step
ln_P_O_values = max_ln_P_O_per_t + \
np.log(np.sum(np.exp(ln_P_O_and_i - max_ln_P_O_per_t),
axis=0))
# compute the discrepancies with respect to the first ln(P(O)) value
discrepancies = ln_P_O_values - ln_P_O_values[0]
# check to make sure all the discrepancies are zero
if abs(np.sum(discrepancies)) < len(discrepancies) ** 2 * np.finfo(
'float').eps:
matrices_correct = True
else:
matrices_correct = False
# return the boolean value
return matrices_correct
# -----------------------------------------------------------------------------#
def check_ln_alpha_ln_beta(ln_alpha, ln_beta):
'''
this function uses the extended logarithm subroutines to check whether the
matrix ln_alpha (holding the forward log probabilities) and the matrix
ln_beta (holding the backward log probabilties) have been computed
correctly. this is checked T separate times. how? because when both the
forward and backward log probabilities are known for each state at a given
time instance, the log probability of the entire observation sequence
(assuming it passes through state i) can be computed. since there are T
time steps in the sequence, ln(P(O)) can be computed T times. all T values
should be the same. return a boolean stating whether or not the matrices
have been computed correctly
'''
# add together the corresponding forward and backward log probabilities
ln_P_O_and_i = ln_product(ln_alpha, ln_beta)
# sum down the rows of the matrix, using the reduce function
ln_P_O_values = reduce(ln_sum, ln_P_O_and_i)
# compute the discrepancies with respect to the first ln(P(O)) value
discrepancies = ln_P_O_values - ln_P_O_values[0]
# check to make sure all the discrepancies are zero
if abs(np.sum(discrepancies)) < len(discrepancies) ** 2 * np.finfo(
'float').eps:
matrices_correct = True
else:
matrices_correct = False
# return the boolean value
return matrices_correct
# -----------------------------------------------------------------------------#
def compute_P_O(alpha, beta):
'''
given the matrices of forward and backward probabilties, this subroutine
computes and returns the probability of the entire observation sequence.
ideally, this function will be used after running check_alpha_beta(). n.b.
this is not the only way to compute this quantity! it can also be found
while computing the matrices of forward and backward probabilities. this is
really just a helper function for the baum-welch subroutine
- input:
- alpha: matrix of forward probabilities
- beta: matrix of backward probabilities
- output:
- P_O: probability of the observation sequence
'''
# compute the T values of P(O) that can be computed (one at each time
# step). all T values should be the same. you could just pick one or, in
# order to get rid of numerical error, average all T of them
P_O = np.mean(np.sum(np.multiply(alpha, beta), axis=0))
# return the mean value
return P_O
# -----------------------------------------------------------------------------#
def compute_ln_P_O_log(ln_alpha, ln_beta):
'''
given the matrices of forward and backward log probabilties, this
subroutine computes and returns the log probability of the entire
observation sequence. ideally, this function will be used after running
check_ln_alpha_ln_beta(). n.b. this is not the only way to compute this
quantity! it can also be found while computing the matrices of forward and
backward log probabilities. this is really just a helper function for the
log-space baum-welch subroutine
- input:
- ln_alpha: matrix of forward log probabilities
- ln_beta: matrix of backward log probabilities
- output:
- ln_P_O: log probability of the observation sequence
'''
# compute the T values of ln(P(O)) that can be computed (one at each time
# step). start by adding together the corresponding forward and backward
# log probabilities
ln_P_O_and_i = ln_alpha + ln_beta
# compute the max log probability per time step
max_ln_P_O_per_t = np.max(ln_P_O_and_i, axis=0)
# using the log-sum-exp trick, compute the log likelihood of the sequence
# of observations at each time step
ln_P_O_values = max_ln_P_O_per_t + \
np.log(np.sum(np.exp(ln_P_O_and_i - max_ln_P_O_per_t),
axis=0))
# all T values should be the same. you could just pick one or, in order to
# get rid of numerical error, average all T of them
ln_P_O = np.mean(ln_P_O_values)
# return the mean value
return ln_P_O
# -----------------------------------------------------------------------------#
def compute_ln_P_O(ln_alpha, ln_beta):
'''
given the matrices of forward and backward log probabilties, this
subroutine computes and returns the log probability of the entire
observation sequence using the logarithmic subroutines of . ideally, this function will be
used after running check_ln_alpha_ln_beta(). n.b. this is not the only way
to compute this quantity! it can also be found while computing the matrices
of forward and backward log probabilities. this is really just a helper
function for the log-space baum-welch subroutine
- input:
- ln_alpha: matrix of forward log probabilities
- ln_beta: matrix of backward log probabilities
- output:
- ln_P_O: log probability of the observation sequence
'''
# add together the corresponding forward and backward log probabilities
ln_P_O_and_i = ln_product(ln_alpha, ln_beta)
# sum down the rows of the matrix, using the reduce function
ln_P_O_values = reduce(ln_sum, ln_P_O_and_i)
# compute the mean value of the T identical values