diff --git a/src/backscattering_analyzer/analyzer.py b/src/backscattering_analyzer/analyzer.py index 5bdb7e6..26a26ff 100644 --- a/src/backscattering_analyzer/analyzer.py +++ b/src/backscattering_analyzer/analyzer.py @@ -8,9 +8,23 @@ from sys import argv as options from backscattering_analyzer.settings import Settings from numpy import loadtxt, array from scipy.io.matlab import loadmat +from numpy.typing import NDArray # maths -from numpy import mean, zeros, pi, sin, cos, arange, sqrt, where, real +from numpy import ( + mean, + zeros, + pi, + sin, + cos, + arange, + sqrt, + where, + real, + conj, + append, + flip, +) from scipy.signal import welch as psd from scipy.interpolate import CubicSpline from scipy.fft import fft, ifft, fftfreq @@ -21,7 +35,7 @@ class Analyzer: Utility class to study backscattering light in VIRGO """ - def __init__(self, arguments=[]): + def __init__(self, arguments: list = []) -> None: self.theme = Theme( { "main": "white bold", @@ -129,9 +143,15 @@ class Analyzer: ] ) self.movement[1] -= mean(self.movement[1]) + self.movement = self.low_pass_filter( + self.movement, + 5 * self.settings.vibration_frequency, + ) self.movement = self.movement[:, :-500000] - def psd_signal(self, signal, fft_length=10): + def psd_signal( + self, signal: NDArray, fft_length: int = 10 + ) -> NDArray: """ Compute psd of a given signal """ @@ -166,7 +186,9 @@ class Analyzer: ] ) - def interpolate_abciss(self, abcisses): + def interpolate_abciss( + self, abcisses: tuple[NDArray, ...] | list[NDArray] + ) -> NDArray: """ Return the axis that would be used by the interpolate method """ @@ -178,7 +200,9 @@ class Analyzer: return arange(start, end, 1 / max(rates)) - def interpolate(self, signals): + def interpolate( + self, signals: tuple[NDArray, ...] | list[NDArray] + ) -> list[NDArray]: """ Interpolate multiple signals with a single abciss list, which has the smallest interval and the biggest rate @@ -207,14 +231,20 @@ class Analyzer: ) # minimum to keep same length than coupling, but not for # the same frequency range, so more + psd_args = { + "fs": freq_sample, + "nperseg": nperseg, + "noverlap": nperseg / 2, + "nfft": nperseg, + } + result = zeros( len( self.interpolate_abciss( [ psd( self.movement[1], - fs=freq_sample, - nperseg=nperseg, + **psd_args, )[0], self.modelisation["freq"][0], ] @@ -222,6 +252,7 @@ class Analyzer: ) ) + # frequencies depends of psd result, which we do not have yet frequencies = 0 for index in range(len(coupling)): @@ -231,8 +262,7 @@ class Analyzer: [ *psd( sin(phase * self.movement[1]), - fs=freq_sample, - nperseg=nperseg, + **psd_args, ) ] ) @@ -246,8 +276,7 @@ class Analyzer: [ *psd( cos(phase * self.movement[1]), - fs=freq_sample, - nperseg=nperseg, + **psd_args, ) ] ) @@ -264,15 +293,26 @@ class Analyzer: ) ) + # no need to redefine it each time but simpler here frequencies = factor_n[0] + """ + result += ( + sqrt(self.settings.scattering_factor[index]) + * self.settings.wavelength / ( 4 * pi ) + * sqrt( + coupling_n[1]**2 * factor_n[1] + + coupling_d[1]**2 * factor_d[1] + )**2 + ) + """ result += ( sqrt(self.settings.scattering_factor[index]) * self.settings.power_in / self.settings.power_out * ( - sqrt(coupling_n[1]) * sqrt(factor_n[1]) - + sqrt(coupling_d[1]) * sqrt(factor_d[1]) + coupling_n[1] * factor_n[1] + + coupling_d[1] * factor_d[1] ) ) return array( @@ -282,13 +322,41 @@ class Analyzer: ] ) - def low_pass_filter(self, signal, cutoff): + def low_pass_filter( + self, signal: NDArray, cutoff: float + ) -> NDArray: """ Cut higher frequencies than cutoff for a given temporal signal """ + if self.settings.verbose: + self.console.log("filtering with a low pass filter") sample_spacing = signal[0, 1] - signal[0, 0] freq_signal = array(fft(signal[1])) frequencies = fftfreq(signal[1].shape[0], sample_spacing) index_to_remove = where(abs(frequencies) > cutoff) freq_signal[index_to_remove] = 0 - return array([signal[0], real(array(ifft(freq_signal)))]) + return array( + [ + real( + array( + ifft( + array( + [ + frequencies, + conj(flip(frequencies)), + ] + ), + ), + ), + ), + real( + array( + ifft( + array( + [freq_signal, conj(flip(freq_signal))] + ), + ), + ), + ), + ] + ) diff --git a/src/backscattering_analyzer/settings.py b/src/backscattering_analyzer/settings.py index ac6f00c..d7b166d 100644 --- a/src/backscattering_analyzer/settings.py +++ b/src/backscattering_analyzer/settings.py @@ -1,8 +1,9 @@ from pathlib import Path +from rich.console import Console class Settings: - def __init__(self, options, console): + def __init__(self, options: list, console: Console): self.version = "0.0.1" self.help = ( "[main]display[/main] [option]\\[options][/option]" @@ -13,14 +14,15 @@ class Settings: "\n [argument]-V --version[/argument] [description]print version number and exit[/description]" ) self.verbose = False - self.bench = "SWEB" + self.bench = "SDB1" self.date = "2023_03_24" self.folder = Path("/home/demagny/data") self.wavelength = 1.064e-6 # m self.calib_bench = 1.15 self.calib_mirror = 1.15 - self.scattering_factor = [1e-17, 0] # parameter to change + self.scattering_factor = [1e-3, 0] # parameter to change + self.vibration_frequency = 0.2 index = 0 while index < len(options): @@ -127,7 +129,7 @@ class Settings: self.power_in = 23 # W self.power_out = 8e-3 # W - def bench_file(self): + def bench_file(self) -> Path: return self.folder / ( "{bench}_{date}_ben.csv".format( bench=self.bench, @@ -135,7 +137,7 @@ class Settings: ) ) - def mirror_file(self): + def mirror_file(self) -> Path: return self.folder / ( "{bench}_{date}_mir.csv".format( bench=self.bench, @@ -143,7 +145,7 @@ class Settings: ) ) - def data_file(self): + def data_file(self) -> Path: return self.folder / ( "{bench}_{date}_dat.csv".format( bench=self.bench, @@ -151,7 +153,7 @@ class Settings: ) ) - def reference_file(self): + def reference_file(self) -> Path: return self.folder / ( "{bench}_{date}_ref.csv".format( bench=self.bench, @@ -159,8 +161,8 @@ class Settings: ) ) - def modelisation_file(self): + def modelisation_file(self) -> Path: return self.folder / (self.modelisation) - def coupling_name(self): + def coupling_name(self) -> str: return "{bench}coupling".format(bench=self.bench)