-
Notifications
You must be signed in to change notification settings - Fork 2
/
gibbs.py
182 lines (154 loc) · 6.05 KB
/
gibbs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
"""
Some experiment with Gibbs sampling as opposed to optimization.
Try to use pure numpy where possible without re-writing the function from the other sections.
WARNING: partially finished/ongoing I *think* I added an extra sample dimension for gibbs population weighting.
"""
import numpy as np
import pandas as pd
import functools
from scipy.special import logit
from .quantile_regression import get_data
class Dense():
"""
The point is to make something where we can broadcast across weights.
"""
def __init__(self,
activation=None,
kernel_initializer=None,
kernel_constraint=None,
dtype=np.float64):
self._kernel = None
self._bias = None
self.kernel_mapping = None
self.kernel_initializer = kernel_initializer
self.dtype = dtype
if kernel_constraint == 'nonneg':
self.kernel_mapping = np.exp
self.activation = None
if self.activation == 'tanh':
self.activation = np.tanh
def initialize_weights(self, *, input_dim, weight_dim, output_dim):
ki = np.random.randn
if self.kernel_initializer == 'uniform':
ki = np.random.rand
self._kernel = ki(input_dim, weight_dim, output_dim).astype(self.dtype)
self._bias = np.random.randn(weight_dim, output_dim).astype(self.dtype)
@property
def kernel(self):
return self._kernel if self.kernel_mapping is None else self.kernel_mapping(self._kernel)
@property
def bias(self):
return self._bias
@property
def weights(self):
return [self._kernel, self._bias]
def set_weights(self, weights):
self._kernel = weights[0]
self._bias = weights[1]
def __call__(self, inputs):
# inputs is tau_dim, weight_dim, input_dim
# kernal is input_dim x weight_dim x output_dim
# output will be tau_dim, weight_dim, output_dim
# print(f'einsum {inputs.shape} x {self.kernel.shape}')
x = np.einsum('ijk,kjl->ijl', inputs, self.kernel) # + self.bias[None, :]
# print(f'result {x.shape}')
return x if self.activation is None else self.activation(x)
def make_layers(*, input_dim, weight_dim, dims, activation="tanh", final_activation=None, kernel_constraint="nonneg", kernel_initializer="uniform"):
"""
A utility for making layers.
If all kernels are non-negative you should have monotonic property.
"""
layers = list()
for i, dim in enumerate(dims):
if i == len(dims) - 1:
activation = final_activation
layer = Dense(kernel_initializer=kernel_initializer,
kernel_constraint=kernel_constraint,
activation=activation,
dtype=np.float64)
layer.initialize_weights(input_dim=input_dim, weight_dim=weight_dim, output_dim=dim)
layers.append(layer)
input_dim = dim
return layers
def reduce_layers(inputs, layers):
# for debug:
# out = inputs
# for i, layer in enumerate(layers):
# out = layer(out)
# return out
return functools.reduce(lambda x, y: y(x), [inputs] + layers)
class QuantileNetworkNoX():
def __init__(self, *, input_dim, weight_dim, dims):
"""
input_dim: feature dim
weight_dim: number of weight samples to run in parallel
dims: sequence of output/input dims
Remember the confusing thing in the 1d case is that I have left out the n_samples batch dim entirely until the loss calculation. This is probably confusing and should be changed so it is
easier to map to the Y|X code.
"""
self._my_layers = make_layers(
input_dim=input_dim,
weight_dim=weight_dim,
dims=dims,
activation="tanh",
kernel_constraint="nonneg")
def quantile(self, tau):
check_tau(tau)
# make tau be 3d because inputs to future layers will be 3d.
# batch dims are (tau, theta) space
# inputs are (tau, theta, blah)
# see the einsum calc for details
tau = tau[:, None, :]
u = logit(tau)
return reduce_layers(u, self._my_layers)
def __call__(self, inputs):
tau, y = inputs
return self.quantile(tau)
@property
def weights(self):
return [x.weights for x in self._my_layers]
def set_weights(self, weights):
for w, layer in zip(weights, self._my_layers):
layer.set_weights(w)
def check_tau(tau):
assert len(tau.shape) == 2
assert tau.shape[1] == 1, f'I do not think it ever makes sense to have 2nd dimension of tau be different from one. Got {tau.shape}'
def rho_quantile_loss(tau_y, uu):
tau, y = tau_y
check_tau(tau)
# we now have 3 batch dims! data_samples x tau x weight_samples
uu = y[:, None, None] - uu[None]
tau = tau[None, :, None]
J = uu * (tau - np.where(uu <= 0, 1, 0))
# NOTE the reduce is different that in the tf example
# we are not summing across the weight samples
# we want to be extrinsic in n_samples (sum), intrinsic in n_tau (mean)
return np.sum(np.mean(J, axis=1).squeeze(), axis=0)
def sanity_plot_nox(steps=1000):
l = get_data()
# I can't remember why these were 2d, probably something to do with tf
tau = l["tau"]
y = l["y"]
weight_dim = 20
model = QuantileNetworkNoX(
input_dim=1, # tau.shape[1] if tau is still 2d
weight_dim=weight_dim,
dims=[16, 16, 1]
)
loss = rho_quantile_loss((tau, y), model((tau, y)))
weights = model.weights
model.set_weights(weights)
return locals()
def step(*, J_old, x_old, J_new, x_new, n_store):
"""
see bottleneck.argpartsort vs np.argpartition
You have n_old, n_new elements.
You want to take the best n = min(n_store, n_old + n_new)
elements and store them discarding the rest.
I suppose you could also store the worst if you thought that was helpful.
"""
n_old = len(J_old)
n_new = len(J_new)
n_best = min(n_store, n_old + n_new)
if n_best <= n_store:
pass