-
Notifications
You must be signed in to change notification settings - Fork 4
/
pfb_fixed.py
318 lines (277 loc) · 18.8 KB
/
pfb_fixed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# -*- coding: utf-8 -*-
"""
Created on Thu Aug 16 16:13:40 2018
@author: talonmyburgh
"""
import numpy as np
from fixpoint import fixpoint, cfixpoint
from pfb_coeff_gen import coeff_gen
# =============================================================================
# Bit reversal algorithms used for the iterative fft's
# =============================================================================
"""Arranges chronological values in an array in a bit reversed fashion"""
def bit_rev(a, bits):
a_copy = a.copy()
N = 1<<bits
for i in range(1,bits):
a >>=1
a_copy <<=1
a_copy |= (a[:]&1)
a_copy[:] &= N-1
return a_copy
"""Takes an array of length N which must be a power of two"""
def bitrevfixarray(array,N): #takes an array of length N which must be a power of two
bits = int(np.log2(N)) #how many bits it takes to represent all numbers in array
A = array.copy()
a = np.arange(N)
A[bit_rev(a,bits)] = array[:]
return A
# =============================================================================
# FFT: natural data order in, bit reversed twiddle factors, bit reversed
# order out.
# =============================================================================
def make_fix_twiddle(N,bits,fraction,underflow_method="ROUND",overflow_method="WRAP"):
twids = cfixpoint(bits,fraction, underflow_method = underflow_method,overflow_method=overflow_method)
twids.from_complex(np.exp(-2*np.arange(N//2)*np.pi*1j/N))
return twids
"""Natural order in DIT FFT that accepts the data, the twiddle factors
(must be bit reversed), a shift register, the bitwidth and fraction
bit width to process at, the twiddle factor bits and allows for staging"""
def iterffft_natural_DIT(DATA,twid,shiftreg,bits,fraction,twidfrac,staged=False):
data=DATA.copy()
N = data.data.shape[0] #how long is data stream
if(type(shiftreg)==int): #if integer is parsed rather than list
shiftreg = [int(x) for x in bin(shiftreg)[2:]]
if (len(shiftreg)<int(np.log2(N))):
for i in range(int(np.log2(N))-len(shiftreg)):
shiftreg.insert(0,0)
elif(type(shiftreg)==list and type(shiftreg[0])==int): #if list of integers is parsed
shiftreg = shiftreg
else:
raise ValueError('Shiftregister must be type int or binary list of ints')
if(staged):
stgd_data = DATA.copy()
stgd_data.from_complex(np.zeros((N,int(np.log2(N))+2),
dtype = np.complex64))
stgd_data[:,0] = data[:]
stages = int(np.log2(N))
if(len(shiftreg)!=stages and type(shiftreg) is not list):
raise ValueError("Shift register must be of type list, and its length "
+"must be log2(data length)")
num_of_groups = 1 #number of groups - how many subarrays are there?
distance = N//2 #how far between each fft arm?
stg=1 #stage counter
while num_of_groups < N: #basically iterates through stages
for k in range(num_of_groups): #iterate through each subarray
jfirst = 2*k*distance #index to beginning of a group
jlast = jfirst + distance - 1 #first index plus used to index whole group
W=twid[k]
slc1 = slice(jfirst,jlast+1,1)
slc2 = slice(jfirst+distance, jlast+1+distance,1)
tmp = W * data[slc2]
tmp >> twidfrac #slice off lower bit growth from multiply (caused by fraction only)
tmp.bits =bits
tmp.fraction=fraction #fraction will = (frac1+frac2) - hence right shift by frac2
tmp.normalise()
data[slc2] = data[slc1]-tmp
data[slc1] = data[slc1]+tmp
if shiftreg.pop(): #implement FFT shift and then normalise to correct at end of stage
data>>1
data.normalise()
num_of_groups *=2
distance //=2
if(staged): #if we are recording stages
stgd_data[:,stg]=data[:] #log each stage data to array
stg+=1
if(staged):
stgd_data[:,-1] = bitrevfixarray(stgd_data[:,-2],N) #post bit-reordering for last stage - added as extra stage
return stgd_data
else:
return bitrevfixarray(data,N) #post bit-reordering
# =============================================================================
# Floating point PFB implementation making use of the natural order in fft
# like CASPER does.
# =============================================================================
class FixPFB(object):
"""This function takes point size, how many taps, whether to integrate
the output or not, what windowing function to use, whether you're
running dual polarisations, what rounding and overflow scheme to use,
fwidth and whether to stage."""
def __init__(self, N, taps, bits_in, frac_in, bits_fft, frac_fft,
bits_out, frac_out, twidbits, twidfrac, shiftreg,
bitsofacc=32, fracofacc=31, unsigned = False,
chan_acc =False, datasrc = None, w = 'hann',
fir_underflow_method="ROUND", fft_underflow_method="ROUND",
fir_overflow_method="SATURATE", fft_overflow_method="WRAP",
dual = False, fwidth=1, staged = False):
"""Populate PFB object properties"""
self.N = N #how many points
self.chan_acc = chan_acc #if summing outputs
self.dual = dual #whether you're processing dual polarisations
self.taps = taps #how many taps
self.bitsofacc = bitsofacc #how many bits to grow to in integration
self.fracofacc = fracofacc
self.bits_in = bits_in #input data bitlength
self.frac_in = frac_in
self.bits_fft = bits_fft #fft data bitlength
self.frac_fft = frac_fft
self.bits_out = bits_out #what bitlength out you want
self.frac_out = frac_out
self.fwidth = fwidth #normalising factor for fir window
if(type(shiftreg)==int): #if integer is parsed rather than list
self.shiftreg = [int(x) for x in bin(shiftreg)[2:]]
if (len(self.shiftreg)<int(np.log2(N))):
for i in range(int(np.log2(N))-len(self.shiftreg)):
self.shiftreg.insert(0,0)
elif(type(shiftreg)==list and type(shiftreg[0])==int): #if list of integers is parsed
self.shiftreg = shiftreg
else:
raise ValueError('Shiftregister must be type int or binary list of ints')
self.unsigned = unsigned #only used if data parsed in is in a file
self.staged = staged #whether to record fft stages
self.twidbits = twidbits #how many bits to give twiddle factors
self.twidfrac = twidfrac
self.fir_underflow_method=fir_underflow_method #rounding scheme in firs
self.fir_overflow_method=fir_overflow_method #rounding scheme in firs
self.fft_underflow_method=fft_underflow_method #rounding scheme in fft
self.fft_overflow_method=fft_overflow_method #rounding scheme in fft
#Define variables to be used:
self.reg_real = fixpoint(self.bits_in, self.frac_in,unsigned = self.unsigned,
underflow_method= self.fir_underflow_method,overflow_method= self.fir_overflow_method)
self.reg_real.from_float(np.zeros([N,taps],dtype = np.int64)) #our fir register size filled with zeros orignally
self.reg_imag = self.reg_real.copy()
if(datasrc is not None and type(datasrc)==str): #if input data file is specified
self.inputdata = cfixpoint(self.bits_in, self.frac_in,unsigned = self.unsigned,
underflow_method= self.fir_underflow_method, overflow_method= self.fir_overflow_method)
self.inputdatadir = datasrc
self.outputdatadir = datasrc[:-4]+"out.npy"
self.inputdata.from_complex(np.load(datasrc, mmap_mode = 'r'))
else:
self.inputdatadir = None
#the window coefficients for the fir filter
self.window = fixpoint(self.bits_fft, self.frac_fft,unsigned = self.unsigned,
underflow_method= self.fir_underflow_method,overflow_method= self.fir_overflow_method)
tmpcoeff,self.firsc = coeff_gen(self.N,self.taps,w,self.fwidth)
self.window.from_float(tmpcoeff)
#the twiddle factors for the natural input fft
self.twids = make_fix_twiddle(self.N,self.twidbits,twidfrac,
underflow_method= self.fft_underflow_method,overflow_method= self.fft_overflow_method)
self.twids = bitrevfixarray(self.twids,self.twids.data.size)
"""Takes data segment (N long) and appends each value to each fir.
Returns data segment (N long) that is the sum of fircontents*window"""
def _FIR(self,x):
#push and pop from FIR register array
self.reg_real.data = np.column_stack(
(x.real.data,self.reg_real.data))[:,:-1]
self.reg_imag.data = np.column_stack(
(x.imag.data,self.reg_imag.data))[:,:-1]
X_real = self.reg_real*self.window #compute real and imag products
X_imag = self.reg_imag*self.window
prodgrth = X_real.fraction - self.frac_fft #-1 since the window coeffs have -1 less fraction
X = cfixpoint(real = X_real.sum(axis=1),imag = X_imag.sum(axis =1))
X >> prodgrth +self.firsc #remove growth
X.bits = self.bits_fft #normalise to correct bit and frac length
X.fraction = self.frac_fft
X.normalise()
X.underflow_method = self.fft_underflow_method #adjust so that it now uses FFT rounding scheme
X.overflow_method = self.fft_overflow_method #adjust so that it now uses FFT rounding scheme
return X #FIR output
"""In the event that that dual polarisations have been selected, we need to
split out the data after and return the individual X_k values"""
def _split(self,Yk):
#reverse the arrays for the splitting function correctly
R_k = Yk.real.copy()
I_k = Yk.imag.copy()
R_kflip = R_k.copy()
R_kflip[1:] = R_kflip[:0:-1]
I_kflip = I_k.copy()
I_kflip[1:] = I_kflip[:0:-1]
self.G_k = cfixpoint(real = R_k + R_kflip, imag = I_k - I_kflip) #declares two variables for 2 pols
self.G_k >> 1 #for bit growth from addition
self.G_k.bits = self.bits_fft
self.G_k.normalise()
self.H_k =cfixpoint(real = I_k + I_kflip, imag = R_kflip - R_k)
self.H_k >> 1
self.H_k.bits = self.bits_fft
self.H_k.normalise()
"""Here we take the power spectrum of the outputs. Chan_acc dictates
if one must sum over all outputs produced."""
def _pow(self,X):
if (self.chan_acc): #if accumulation specified
tmp = X.power() # X times X*
pwr = X.copy()
pwr.bits = self.bitsofacc
pwr.frac=self.fracofacc
pwr.normalise() #normalise multiplication
pwr.data = np.sum(tmp.data,axis=1) #accumulate
return pwr
else: #if no accumulation specified
pwr = X.power()
pwr.bits = self.bitsofacc
pwr.frac=self.fracofacc
pwr.normalise() #normalise multiplication
return pwr
"""Here one parses a data vector to the PFB to run. Note it must be
cfixpoint type if a data file was not specified before"""
def run(self,DATA, cont = False):
if (DATA is not None): #if a data vector has been parsed
if(self.bits_in != DATA.bits):
raise ValueError("Input data must match precision specified"
+"for input data with bits_in")
self.inputdata = DATA
elif(self.inputdata is None): #if no data was specified at all
raise ValueError ("No input data for PFB specified.")
size = self.inputdata.data.shape[0] #get length of data stream which should be multiple of N
data_iter = size//self.N #how many cycles of commutator
X = cfixpoint(self.bits_fft, self.frac_fft,unsigned = self.unsigned,
overflow_method= self.fft_overflow_method,underflow_method=self.fft_underflow_method)
if(self.staged): #if all stages need be stored
X.from_complex(np.empty((self.N,data_iter,int(np.log2(self.N))+2),
dtype = np.complex64)) #will be tapsize x datalen/point x fft stages +2
#(input and re-ordererd output)
for i in range(0,data_iter): #for each data_iter, populate all firs, and run FFT once
if(i == 0):
X[:,i,:] = iterffft_natural_DIT(self._FIR(self.inputdata[0:self.N]),
self.twids,self.shiftreg.copy(),self.bits_fft,self.frac_fft,
self.twidfrac,self.staged)
else:
X[:,i,:] = iterffft_natural_DIT(
self._FIR(self.inputdata[i*self.N:i*self.N+self.N]),
self.twids,self.shiftreg.copy(),self.bits_fft,
self.frac_fft, self.twidfrac, self.staged)
else: #if stages don't need to be stored
X.from_complex(np.empty((self.N,data_iter),
dtype = np.complex64)) #will be tapsize x datalen/point
for i in range(0,data_iter): #for each stage, populate all firs, and run FFT once
if(i == 0):
X[:,i] = iterffft_natural_DIT(self._FIR(self.inputdata[0:self.N]),
self.twids,self.shiftreg.copy(),self.bits_fft,self.frac_fft,
self.twidfrac, self.staged)
else:
X[:,i] = iterffft_natural_DIT(
self._FIR(self.inputdata[i*self.N:i*self.N+self.N]),
self.twids,self.shiftreg.copy(),self.bits_fft,
self.frac_fft, self.twidfrac, self.staged)
"""Requantise if bitsout<bitsfft"""
if(self.bits_out<self.bits_fft):
X>>(self.bits_fft-self.bits_out)
X.bits=self.bits_out
X.fraction = self.frac_out
X.normalise()
"""Decide on how to manipulate and display output data"""
if(self.dual and not self.staged): #If dual processing but not staged
self._split(X)
self.G_k_pow = self._pow(self.G_k)
self.H_k_pow = self._pow(self.H_k)
elif(not self.dual and self.staged): #If single pol processing and staged
self.X_k_stgd = X
self.X_k_pow = self._pow(X[:,:,-1])
self.X_k = X[:,:,-1]
elif(self.dual and self.staged): #If dual pol and staged
self.X_k_stgd = X
self.split(X[:,:,-1])
self.G_k_pow = self._pow(self.G_k)
self.H_k_pow = self._pow(self.H_k)
else: #If single pol and no staging
self.X_k = X
self.X_k_pow = self._pow(X)