From 92d3d4885c5550442e22b49c03de3a97c73f9bdc Mon Sep 17 00:00:00 2001 From: linarphy Date: Tue, 21 May 2024 18:42:10 +0200 Subject: [PATCH] Refactor, version up --- src/backscattering_analyzer/__init__.py | 31 +++ src/backscattering_analyzer/analyzer.py | 284 +++++------------------- src/backscattering_analyzer/settings.py | 25 ++- src/backscattering_analyzer/signal.py | 170 ++++++++++++++ 4 files changed, 272 insertions(+), 238 deletions(-) create mode 100644 src/backscattering_analyzer/signal.py diff --git a/src/backscattering_analyzer/__init__.py b/src/backscattering_analyzer/__init__.py index e69de29..0f07c25 100644 --- a/src/backscattering_analyzer/__init__.py +++ b/src/backscattering_analyzer/__init__.py @@ -0,0 +1,31 @@ +from numpy.typing import NDArray +from numpy import arange +from scipy.interpolate import CubicSpline + + +def interpolate_abciss(signals: tuple["Signal", ...]) -> NDArray: + """ + return the axis that would be used by the interpolate function + """ + rates = [signal.rate for signal in signals] + + start = max([signal.x[0] for signal in signals]) + end = min([signal.x[-1] for signal in signals]) + + return arange(start, end, 1 / max(rates)) + + +def interpolate(signals: tuple["Signal", ...]) -> tuple["Signal", ...]: + """ + Interpolate multiple signals with a single abciss list, which has + the smallest interval and the bigget rate + """ + splines = [CubicSpline(signal.x, signal.y) for signal in signals] + + x = interpolate_abciss(signals) + + new_signals = [ + Signal(x, spline(x), signals[0].settings) for spline in splines + ] + + return tuple(new_signals) diff --git a/src/backscattering_analyzer/analyzer.py b/src/backscattering_analyzer/analyzer.py index 26a26ff..e599974 100644 --- a/src/backscattering_analyzer/analyzer.py +++ b/src/backscattering_analyzer/analyzer.py @@ -1,33 +1,13 @@ -# display -from rich.console import Console -from rich.theme import Theme -from rich.traceback import install as traceback_install - # utils from sys import argv as options from backscattering_analyzer.settings import Settings +from backscattering_analyzer.signal import Signal +from backscattering_analyzer import interpolate, interpolate_abciss from numpy import loadtxt, array from scipy.io.matlab import loadmat -from numpy.typing import NDArray # maths -from numpy import ( - mean, - zeros, - pi, - sin, - cos, - arange, - sqrt, - where, - real, - conj, - append, - flip, -) -from scipy.signal import welch as psd -from scipy.interpolate import CubicSpline -from scipy.fft import fft, ifft, fftfreq +from numpy import zeros, pi, sqrt class Analyzer: @@ -36,17 +16,7 @@ class Analyzer: """ def __init__(self, arguments: list = []) -> None: - self.theme = Theme( - { - "main": "white bold", - "option": "grey50 italic", - "argument": "red", - "description": "green italic", - } - ) - self.console = Console(theme=self.theme) - traceback_install(console=self.console, show_locals=True) - self.settings = Settings(options[1:] + arguments, self.console) + self.settings = Settings(options[1:] + arguments) self.load() self.process_movement() @@ -54,8 +24,7 @@ class Analyzer: """ Load all data """ - if self.settings.verbose: - self.console.log("Load all data") + self.settings.log("Load all data") try: self.load_bench() self.load_mirror() @@ -70,10 +39,11 @@ class Analyzer: Load bench movement """ file = self.settings.bench_file() - if self.settings.verbose: - self.console.log("loading bench movement") + self.settings.log("loading bench movement") try: - self.bench_movement = loadtxt(file).T + self.bench_movement = Signal( + *loadtxt(file).T, self.settings + ) except OSError: raise Exception("{file} does not exist".format(file=file)) @@ -82,10 +52,11 @@ class Analyzer: Load mirror movement """ file = self.settings.mirror_file() - if self.settings.verbose: - self.console.log("loading mirror movement") + self.settings.log("loading mirror movement") try: - self.mirror_movement = loadtxt(file).T + self.mirror_movement = Signal( + *loadtxt(file).T, self.settings + ) except OSError: raise Exception("{file} does not exist".format(file=file)) @@ -94,10 +65,9 @@ class Analyzer: Load excited h(t) """ file = self.settings.data_file() - if self.settings.verbose: - self.console.log("loading excited h(t)") + self.setings.log("loading excited h(t)") try: - self.data_signal = loadtxt(file).T + self.data_signal = Signal(*loadtxt(file).T, self.settings) except OSError: raise Exception("{file} does not exist".format(file=file)) @@ -106,10 +76,11 @@ class Analyzer: Load reference h(t) """ file = self.settings.reference_file() - if self.settings.verbose: - self.console.log("loading reference h(t)") + self.settings.log("loading reference h(t)") try: - self.reference_signal = loadtxt(file).T + self.reference_signal = Signal( + *loadtxt(file).T, self.settings + ) except OSError: raise Exception("{file} does not exist".format(file=file)) @@ -118,10 +89,22 @@ class Analyzer: Load modelisation coupling data """ file = self.settings.modelisation_file() - if self.settings.verbose: - self.console.log("loading matlab modelisation data") + self.settings.log("loading matlab modelisation data") try: self.modelisation = loadmat(file) + coupling_values = self.modelisation[ + self.settings.coupling_name() + ] + self.coupling = array( + [ + Signal( + self.modelisation["freq"][0], + coupling, + self.settings, + ) + for coupling in coupling_values + ] + ) except OSError: raise Exception("{file} does not exist".format(file=file)) @@ -130,171 +113,43 @@ class Analyzer: Clean and compute relative movement between the bench and the miror """ - if self.settings.verbose: - self.console.log("computing relative movement") + self.settings.log("computing relative movement") - self.movement = array( - [ - self.bench_movement[0], - (self.settings.calib_bench * self.bench_movement[1]) - - ( - self.settings.calib_mirror * self.mirror_movement[1] - ), - ] - ) - self.movement[1] -= mean(self.movement[1]) - self.movement = self.low_pass_filter( - self.movement, - 5 * self.settings.vibration_frequency, - ) - self.movement = self.movement[:, :-500000] - - def psd_signal( - self, signal: NDArray, fft_length: int = 10 - ) -> NDArray: - """ - Compute psd of a given signal - """ - if self.settings.verbose: - self.console.log("computing psd of the given signal") - rate = 1 / (signal[0, 1] - signal[0, 0]) - - return array( - psd( - signal[1], - rate, - nperseg=int(fft_length * rate), - detrend="linear", + self.movement = ( + ( + self.bench_movement * self.settings.calib_bench + - self.mirror_movement * self.settings.calib_mirror ) + .detrend("linear") + .low_pass_filter(5 * self.settings.vibration_frequency) ) - def diff_psd_ref_data(self): - """ - Give difference between reference and excited signal psd - """ - if self.settings.verbose: - self.console.log( - "compute difference between reference and excited " - "signal (on the psd)" - ) - freq, data_psd = self.psd_signal(self.data_signal) - _, reference_psd = self.psd_signal(self.reference_signal) - return array( - [ - freq, - data_psd - reference_psd, - ] - ) - - def interpolate_abciss( - self, abcisses: tuple[NDArray, ...] | list[NDArray] - ) -> NDArray: - """ - Return the axis that would be used by the interpolate method - """ - rates = 1 / array( - [abciss[1] - abciss[0] for abciss in abcisses] - ) - start = max([abciss[0] for abciss in abcisses]) - end = min([abciss[-1] for abciss in abcisses]) - - return arange(start, end, 1 / max(rates)) - - def interpolate( - self, signals: tuple[NDArray, ...] | list[NDArray] - ) -> list[NDArray]: - """ - Interpolate multiple signals with a single abciss list, which - has the smallest interval and the biggest rate - """ - - splines = [ - CubicSpline(signal[0], signal[1]) for signal in signals - ] - - x = self.interpolate_abciss([signal[0] for signal in signals]) - - signals = [array([x, spline(x)]) for spline in splines] - - return signals - def compute_light(self): """ Compute psd of the computed projection with current bench excitation """ - coupling = self.modelisation[self.settings.coupling_name()] - - freq_sample = 1 / (self.movement[0, 1] - self.movement[0, 0]) - nperseg = ( - int(2 * len(coupling[0])) - 1 + len(coupling[0]) - ) # minimum to keep same length than coupling, but not for - # the same frequency range, so more - - psd_args = { - "fs": freq_sample, - "nperseg": nperseg, - "noverlap": nperseg / 2, - "nfft": nperseg, - } - result = zeros( - len( - self.interpolate_abciss( - [ - psd( - self.movement[1], - **psd_args, - )[0], - self.modelisation["freq"][0], - ] - ) - ) + len(interpolate_abciss(self.movement, self.coupling[0])) ) # frequencies depends of psd result, which we do not have yet frequencies = 0 - for index in range(len(coupling)): + for index in range(len(self.coupling)): phase = (index + 1) * 4 * pi / self.settings.wavelength - factor_n = array( - [ - *psd( - sin(phase * self.movement[1]), - **psd_args, - ) - ] - ) - coupling_n = array( - [ - self.modelisation["freq"][0], - abs(coupling[0]), - ] - ) - factor_d = array( - [ - *psd( - cos(phase * self.movement[1]), - **psd_args, - ) - ] - ) - coupling_d = array( - [ - self.modelisation["freq"][0], - abs(coupling[1]), - ] - ) + factor_n = (self.movement * phase).sin().psd() + coupling_n = self.coupling[0].abs() + factor_d = (self.movement * phase).cos().psd() + coupling_d = self.coupling[1].abs() - factor_n, coupling_n, factor_d, coupling_d = ( - self.interpolate( - (factor_n, coupling_n, factor_d, coupling_d) - ) + factor_n, coupling_n, factor_d, coupling_d = interpolate( + (factor_n, coupling_n, factor_d, coupling_d) ) # no need to redefine it each time but simpler here - frequencies = factor_n[0] + frequencies = factor_n.x """ result += ( sqrt(self.settings.scattering_factor[index]) @@ -313,7 +168,7 @@ class Analyzer: * ( coupling_n[1] * factor_n[1] + coupling_d[1] * factor_d[1] - ) + ).y ) return array( [ @@ -321,42 +176,3 @@ class Analyzer: result, ] ) - - def low_pass_filter( - self, signal: NDArray, cutoff: float - ) -> NDArray: - """ - Cut higher frequencies than cutoff for a given temporal signal - """ - if self.settings.verbose: - self.console.log("filtering with a low pass filter") - sample_spacing = signal[0, 1] - signal[0, 0] - freq_signal = array(fft(signal[1])) - frequencies = fftfreq(signal[1].shape[0], sample_spacing) - index_to_remove = where(abs(frequencies) > cutoff) - freq_signal[index_to_remove] = 0 - return array( - [ - real( - array( - ifft( - array( - [ - frequencies, - conj(flip(frequencies)), - ] - ), - ), - ), - ), - real( - array( - ifft( - array( - [freq_signal, conj(flip(freq_signal))] - ), - ), - ), - ), - ] - ) diff --git a/src/backscattering_analyzer/settings.py b/src/backscattering_analyzer/settings.py index d7b166d..7210211 100644 --- a/src/backscattering_analyzer/settings.py +++ b/src/backscattering_analyzer/settings.py @@ -1,10 +1,23 @@ from pathlib import Path from rich.console import Console +from rich.theme import Theme + +from rich.traceback import install as traceback_install class Settings: - def __init__(self, options: list, console: Console): - self.version = "0.0.1" + def __init__(self, options: list): + self.theme = Theme( + { + "main": "white bold", + "option": "grey50 italic", + "argument": "red", + "description": "green italic", + } + ) + self.console = Console(theme=self.theme) + traceback_install(console=self.console, show_locals=True) + self.version = "0.1.0" self.help = ( "[main]display[/main] [option]\\[options][/option]" "\n [argument]-b --bench[/argument] [description]bench of the experiment[/description]" @@ -89,10 +102,10 @@ class Settings: ) if option[:2] == "--": if option == "--help": - console.print(self.help) + self.console.print(self.help) exit(0) if option == "--version": - console.print(self.version) + self.console.print(self.version) exit(0) if option == "--verbose": self.verbose = True @@ -166,3 +179,7 @@ class Settings: def coupling_name(self) -> str: return "{bench}coupling".format(bench=self.bench) + + def log(self, message) -> None: + if self.verbose: + self.console.log(message) diff --git a/src/backscattering_analyzer/signal.py b/src/backscattering_analyzer/signal.py new file mode 100644 index 0000000..9f69679 --- /dev/null +++ b/src/backscattering_analyzer/signal.py @@ -0,0 +1,170 @@ +from __future__ import annotations +from numpy.typing import NDArray +from backscattering_analyzer.settings import Settings +from backscattering_analyzer import interpolate + +from scipy.signal import welch, detrend +from scipy.fft import irfft, rfft, rfftfreq +from numpy import where, sin, cos, array + + +class Signal: + """ + A given signal, with its time/frequency coordinates + """ + + def __init__( + self, x: NDArray, value: NDArray, settings: Settings + ) -> None: + if x.shape != value.shape: + raise Exception("x and y does not have the same dimension") + self.sampling = x[1] - x[0] + self.rate = 1 / self.sampling + self.x = x + self.frequencies = self.x # alias + self.time = self.x # alias + self.y = value + self.value = self.y # alias + self.settings = settings + self.length = value.shape[0] + + def psd(self, fft_length: int = 10) -> Signal: + """ + Compute psd of a given signal + """ + self.settings.log("computing psd of the signal") + + return Signal( + *welch( + self.value, + self.rate, + nperseg=int(fft_length * self.rate), + detrend="linear", + ), + self.settings, + ) + + def detrend(self, type: str = "linear") -> Signal: + """ + Short alias for scipy.detrend with default value + """ + return Signal( + self.x, + detrend(self.y, type=type), + self.settings, + ) + + def low_pass_filter(self, cutoff): + """ + Cut higher frequencies than cutoff for this signal + """ + self.settings.log("filtering with a low pass filter") + + freq_x = rfftfreq(self.length, self.sampling) + freq_y = rfft(self.y) + index_to_remove = where(abs(freq_x) > cutoff) + freq_y[index_to_remove] = 0 + + return Signal( + self.x, + irfft(freq_y), + self.settings, + ) + + def abs(self) -> Signal: + """ + Abs of the signal (hacky way) + """ + return Signal( + self.x, + abs(self.y), + self.settings, + ) + + def cos(self) -> Signal: + """ + Cosinus of the signal (hacky way) + """ + return Signal( + self.x, + cos(self.y), + self.settings, + ) + + def sin(self) -> Signal: + """ + Sinus of the signal (hacky way) + """ + return Signal( + self.x, + sin(self.y), + self.settings, + ) + + def __sub__(self, other: float | Signal) -> Signal: + """ + Substract float or a signal to another + """ + if isinstance(other, Signal): + signal_1, signal_2 = interpolate((self, other)) + + return Signal( + signal_1.frequencies, + signal_2.value - signal_1.value, + self.settings, + ) + else: + return Signal( + self.x, + self.y - other, + self.settings, + ) + + def __add__(self, other: float | Signal) -> Signal: + """ + Add a float or a signal to another + """ + if isinstance(other, Signal): + signal_1, signal_2 = interpolate((self, other)) + + return Signal( + signal_1.x, + signal_1 + signal_2, + self.settings, + ) + else: + return Signal( + self.x, + self.y + other, + self.settings, + ) + + def __mul__(self, other: float | Signal) -> Signal: + """ + Multiply a signal by a value or another signal + """ + if isinstance(other, Signal): + signal_1, signal_2 = interpolate((self, other)) + + return Signal( + signal_1.frequencies, + signal_1.value * signal_2.value, + self.settings, + ) + else: + return Signal( + self.x, + other * self.value, + self.settings, + ) + + def __array__(self) -> NDArray: + """ + Convert to a numpy array + """ + return array( + [ + self.x, + self.y, + ] + )