Skip to content

Commit

Permalink
Merge pull request #47 from eric-brandao/development
Browse files Browse the repository at this point in the history
python 3.11 compatibility, some plot updates, and deconv with zero pa…
  • Loading branch information
Chum4k3r authored Oct 10, 2023
2 parents 1c74ae4 + 6233f1c commit 84321c2
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 42 deletions.
4 changes: 2 additions & 2 deletions pytta/_h5utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ def int_parser(attr):
if isinstance(attr, (np.int16,
np.int32,
np.int64,
np.int,
int,
np.int0)):
return int(attr)
else:
return attr

def float_parser(attr):
if isinstance(attr, (np.float,
if isinstance(attr, (float,
np.float16,
np.float32,
np.float64)):
Expand Down
13 changes: 8 additions & 5 deletions pytta/_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def freq(sigObjs, smooth, xLabel, yLabel, yLim, xLim, title, decimalSep):
if xLabel is None:
xLabel = 'Frequency [Hz]'
if yLabel is None:
yLabel = 'Magnitude'
yLabel = 'Magnitude [dB]'
if title is None:
title = ''

Expand Down Expand Up @@ -426,10 +426,6 @@ def freq(sigObjs, smooth, xLabel, yLabel, yLim, xLim, title, decimalSep):
ax.set_ylabel(yLabel, fontsize=16)
ax.legend(loc='best', fontsize=12)

if xLim is None:
xLim = [data['minFreq'], data['maxFreq']]
ax.set_xlim(xLim)

if yLim is None:
yLim = [np.nanmin(yLims[:,0]), np.nanmax(yLims[:,1])]
margin = (yLim[1] - yLim[0]) / 20
Expand All @@ -443,6 +439,13 @@ def freq(sigObjs, smooth, xLabel, yLabel, yLim, xLim, title, decimalSep):
ax.xaxis.set_major_formatter(ticker.ScalarFormatter(useOffset=True))
for item in (ax.get_xticklabels() + ax.get_yticklabels()):
item.set_fontsize(14)
freq_oct_list = [16, 31.5, 63, 125, 250, 500, 1000, 2000, 4000, 8000, 16000]
freq_oct_list_str = [str(item) for item in freq_oct_list]
ax.set_xticks(freq_oct_list)
ax.set_xticklabels(freq_oct_list_str)
if xLim is None:
xLim = [data['minFreq'], data['maxFreq']]
ax.set_xlim(xLim)

return fig

Expand Down
100 changes: 68 additions & 32 deletions pytta/classes/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,18 @@ def crop(self, startTime, endTime):
startIdx = np.where(self.timeVector >= startTime)[0][0]
self.timeSignal = self.timeSignal[startIdx:endIdx,:]

def zero_padding(self, num_zeros = 1000):
""" zero-padding the signal object
"""
newTimeSignal = np.zeros((self.timeSignal.shape[0] + num_zeros,
self.timeSignal.shape[1]))
# fill initial part with original
newTimeSignal[:self.timeSignal.shape[0],
:self.timeSignal.shape[1]] = self.timeSignal
# Make the SignalObj equal to original + zeros
zp_signal = SignalObj(newTimeSignal, 'time', self.samplingRate)
return zp_signal

def mean(self):
print('DEPRECATED! This method will be renamed to',
':method:``.channelMean()``',
Expand Down Expand Up @@ -1182,6 +1194,10 @@ class ImpulsiveResponse(_base.PyTTaObj):
* "linear":
Computes using the spectral division of the signals;
* "linear_zp":
zero pads input and output (double size) and
computes using the spectral division of the signals;
* "H1":
Uses power spectral density Ser divided by See, with
Expand Down Expand Up @@ -1284,7 +1300,7 @@ def __init__(self, excitation=None, recording=None,
"passing as parameter the 'excitation' " +
"and 'recording' signals, or a calculated " +
"'ir'.")
# Zero padding
# Initial zero padding
elif excitation.numSamples > recording.numSamples:
print("Zero padding on IR calculation!")
excitSig = excitation.timeSignal
Expand All @@ -1302,7 +1318,8 @@ def __init__(self, excitation=None, recording=None,
np.zeros((recSig.shape[0], excitSig.shape[1]))
newTimeSignal[:excitSig.shape[0], :excitSig.shape[1]] = \
excitSig
excitation.timeSignal = newTimeSignal
excitation.timeSignal = newTimeSignal

self._methodInfo = {'method': method, 'winType': winType,
'winSize': winSize, 'overlap': overlap}
self._systemSignal = self._calculate_tf_ir(excitation,
Expand Down Expand Up @@ -1396,6 +1413,40 @@ def _to_dict(self):
out = {'methodInfo': self.methodInfo}
return out

def _calculate_regu_spk(self, inputSignal, outputSignal):
""" Computes regularized spectrum
"""
data = _make_pk_spectra(inputSignal.freqSignal)
outputFreqSignal = _make_pk_spectra(outputSignal.freqSignal)
freqVector = inputSignal.freqVector
b = data * 0 + 10**(-200/20) # inside signal freq range
a = data * 0 + 1 # outside signal freq range
minFreq = np.max([inputSignal.freqMin, outputSignal.freqMin])
maxFreq = np.min([inputSignal.freqMax, outputSignal.freqMax])
# Calculate epsilon
eps = self._crossfade_spectruns(a, b,
[minFreq/np.sqrt(2),
minFreq],
freqVector)
if maxFreq < np.min([maxFreq*np.sqrt(2),
inputSignal.samplingRate/2]):
eps = self._crossfade_spectruns(eps, a,
[maxFreq,
maxFreq*np.sqrt(2)],
freqVector)
eps = \
eps \
* float(np.max(np.abs(outputFreqSignal)))**2 \
* 1/2
C = np.conj(data) / \
(np.conj(data)*data + eps)
C = _make_rms_spectra(C)
C = SignalObj(C,
'freq',
inputSignal.samplingRate,
signalType='energy')
return C

def _calculate_tf_ir(self, inputSignal, outputSignal, method='linear',
winType=None, winSize=None, overlap=None,
regularization=False):
Expand All @@ -1405,41 +1456,26 @@ def _calculate_tf_ir(self, inputSignal, outputSignal, method='linear',
elif inputSignal.samplingRate != outputSignal.samplingRate:
raise ValueError("Both signal-like objects must have the same\
sampling rate.")

if method == 'linear':
if regularization:
data = _make_pk_spectra(inputSignal.freqSignal)
outputFreqSignal = _make_pk_spectra(outputSignal.freqSignal)
freqVector = inputSignal.freqVector
b = data * 0 + 10**(-200/20) # inside signal freq range
a = data * 0 + 1 # outside signal freq range
minFreq = np.max([inputSignal.freqMin, outputSignal.freqMin])
maxFreq = np.min([inputSignal.freqMax, outputSignal.freqMax])
# Calculate epsilon
eps = self._crossfade_spectruns(a, b,
[minFreq/np.sqrt(2),
minFreq],
freqVector)
if maxFreq < np.min([maxFreq*np.sqrt(2),
inputSignal.samplingRate/2]):
eps = self._crossfade_spectruns(eps, a,
[maxFreq,
maxFreq*np.sqrt(2)],
freqVector)
eps = \
eps \
* float(np.max(np.abs(outputFreqSignal)))**2 \
* 1/2
C = np.conj(data) / \
(np.conj(data)*data + eps)
C = _make_rms_spectra(C)
C = SignalObj(C,
'freq',
inputSignal.samplingRate,
signalType='energy')
C = self._calculate_regu_spk(inputSignal, outputSignal)
result = outputSignal * C
else:
result = outputSignal / inputSignal

elif method == 'linear_zp': # linear with zero-padding safe-guard
inputSignal_zp = inputSignal.zero_padding(num_zeros =\
inputSignal.timeSignal.shape[0])
outputSignal_zp = outputSignal.zero_padding(num_zeros =\
outputSignal.timeSignal.shape[0])

if regularization:
C = self._calculate_regu_spk(inputSignal_zp, outputSignal_zp)
result = outputSignal_zp * C
else:
result = outputSignal_zp / inputSignal_zp

elif method == 'H1':
if winType is None:
winType = 'hann'
Expand Down Expand Up @@ -1594,7 +1630,7 @@ def _crossfade_spectruns(self, a, b, freqLims, freqVector):
f1idx = np.where(freqVector <= f1)[0][-1]
totalSamples = a.shape[0]
xsamples = f1idx - f0idx
win = ss.hanning(2*xsamples)
win = ss.hann(2*xsamples)

rightWin = win[xsamples-1:-1]
fullRightWin = np.concatenate((np.ones(f0idx),
Expand Down
6 changes: 3 additions & 3 deletions pytta/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ def __do_sweep_windowing(inputSweep,
# exact sample where the chirp reaches freqMax [Hz]
freqMaxSample = np.where(freqSweep <= freqMax)
freqMaxSample = len(freqSweep) - freqMaxSample[-1][-1]
windowStart = ss.hanning(2*freqMinSample)
windowEnd = ss.hanning(2*freqMaxSample)
windowStart = ss.hann(2*freqMinSample)
windowEnd = ss.hann(2*freqMaxSample)

# Uses first half of windowStart, last half of windowEnd, and a vector of
# ones with the remaining length, in between the half windows
Expand Down Expand Up @@ -459,7 +459,7 @@ def __do_noise_windowing(inputNoise,
window):
# sample equivalent to the first five percent of noise duration
fivePercentSample = int((5/100) * (noiseSamples))
windowStart = ss.hanning(2*fivePercentSample)
windowStart = ss.hann(2*fivePercentSample)
fullWindow = np.concatenate((windowStart[0:fivePercentSample],
np.ones(int(noiseSamples-fivePercentSample))))
newNoise = (fullWindow * inputNoise.T).T
Expand Down

0 comments on commit 84321c2

Please sign in to comment.