# display from rich.console import Console from rich.theme import Theme from rich.traceback import install as traceback_install # utils from sys import argv as options from backscattering_analyzer.settings import Settings from numpy import loadtxt, array from scipy.io.matlab import loadmat # maths from numpy import mean, zeros, pi, sin, cos, arange, sqrt, where, real from scipy.signal import welch as psd from scipy.interpolate import CubicSpline from scipy.fft import fft, ifft, fftfreq class Analyzer: """ Utility class to study backscattering light in VIRGO """ def __init__(self, arguments=[]): self.theme = Theme( { "main": "white bold", "option": "grey50 italic", "argument": "red", "description": "green italic", } ) self.console = Console(theme=self.theme) traceback_install(console=self.console, show_locals=True) self.settings = Settings(options[1:] + arguments, self.console) self.load() self.process_movement() def load(self): """ Load all data """ if self.settings.verbose: self.console.log("Load all data") try: self.load_bench() self.load_mirror() self.load_data() self.load_reference() self.load_coupling() except Exception: raise Exception("An error occured during data loading") def load_bench(self): """ Load bench movement """ file = self.settings.bench_file() if self.settings.verbose: self.console.log("loading bench movement") try: self.bench_movement = loadtxt(file).T except OSError: raise Exception("{file} does not exist".format(file=file)) def load_mirror(self): """ Load mirror movement """ file = self.settings.mirror_file() if self.settings.verbose: self.console.log("loading mirror movement") try: self.mirror_movement = loadtxt(file).T except OSError: raise Exception("{file} does not exist".format(file=file)) def load_data(self): """ Load excited h(t) """ file = self.settings.data_file() if self.settings.verbose: self.console.log("loading excited h(t)") try: self.data_signal = loadtxt(file).T except OSError: raise Exception("{file} does not exist".format(file=file)) def load_reference(self): """ Load reference h(t) """ file = self.settings.reference_file() if self.settings.verbose: self.console.log("loading reference h(t)") try: self.reference_signal = loadtxt(file).T except OSError: raise Exception("{file} does not exist".format(file=file)) def load_coupling(self): """ Load modelisation coupling data """ file = self.settings.modelisation_file() if self.settings.verbose: self.console.log("loading matlab modelisation data") try: self.modelisation = loadmat(file) except OSError: raise Exception("{file} does not exist".format(file=file)) def process_movement(self): """ Clean and compute relative movement between the bench and the miror """ if self.settings.verbose: self.console.log("computing relative movement") self.movement = array( [ self.bench_movement[0], (self.settings.calib_bench * self.bench_movement[1]) - ( self.settings.calib_mirror * self.mirror_movement[1] ), ] ) self.movement[1] -= mean(self.movement[1]) self.movement = self.movement[:,:-500000] def psd_signal(self, signal, fft_length=10): """ Compute psd of a given signal """ if self.settings.verbose: self.console.log("computing psd of the given signal") rate = 1 / (signal[0, 1] - signal[0, 0]) return array( psd( signal[1], rate, nperseg=int(fft_length * rate), detrend="linear", ) ) def diff_psd_ref_data(self): """ Give difference between reference and excited signal psd """ if self.settings.verbose: self.console.log( "compute difference between reference and excited " "signal (on the psd)" ) freq, data_psd = self.psd_signal(self.data_signal) _, reference_psd = self.psd_signal(self.reference_signal) return array( [ freq, data_psd - reference_psd, ] ) def interpolate_abciss(self, abcisses): """ Return the axis that would be used by the interpolate method """ rates = 1 / array( [abciss[1] - abciss[0] for abciss in abcisses] ) start = max([abciss[0] for abciss in abcisses]) end = min([abciss[-1] for abciss in abcisses]) return arange(start, end, 1 / max(rates)) def interpolate(self, signals): """ Interpolate multiple signals with a single abciss list, which has the smallest interval and the biggest rate """ splines = [ CubicSpline(signal[0], signal[1]) for signal in signals ] x = self.interpolate_abciss([signal[0] for signal in signals]) signals = [array([x, spline(x)]) for spline in splines] return signals def compute_light(self): """ Compute psd of the computed projection with current bench excitation """ coupling = self.modelisation[self.settings.coupling_name()] freq_sample = 1 / (self.movement[0, 1] - self.movement[0, 0]) nperseg = ( int(2 * len(coupling[0])) - 1 + len(coupling[0]) ) # minimum to keep same length than coupling, but not for # the same frequency range, so more result = zeros( len( self.interpolate_abciss( [ psd( self.movement[1], fs=freq_sample, nperseg=nperseg, )[0], self.modelisation["freq"][0], ] ) ) ) frequencies = 0 for index in range(len(coupling)): phase = (index + 1) * 4 * pi / self.settings.wavelength factor_n = array( [ *psd( sin(phase * self.movement[1]), fs=freq_sample, nperseg=nperseg, ) ] ) coupling_n = array( [ self.modelisation["freq"][0], abs(coupling[0]), ] ) factor_d = array( [ *psd( cos(phase * self.movement[1]), fs=freq_sample, nperseg=nperseg, ) ] ) coupling_d = array( [ self.modelisation["freq"][0], abs(coupling[1]), ] ) factor_n, coupling_n, factor_d, coupling_d = self.interpolate( (factor_n, coupling_n, factor_d, coupling_d) ) frequencies = factor_n[0] result += ( sqrt(self.settings.scattering_factor[index]) * self.settings.power_in / self.settings.power_out * ( sqrt(coupling_n[1]) * sqrt(factor_n[1]) + sqrt(coupling_d[1]) * sqrt(factor_d[1]) ) ) return array( [ frequencies, result, ] ) def low_pass_filter(self, signal, cutoff): """ Cut higher frequencies than cutoff for a given temporal signal """ sample_spacing = signal[0,1] - signal[0,0] freq_signal = fft(signal[1]) frequencies = fftfreq(signal[1].shape[0], sample_spacing) index_to_remove = where(abs(frequencies) > cutoff) freq_signal[index_to_remove] = 0 return array([signal[0], real(ifft(freq_signal))])