diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..52dcff1 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,36 @@ +[project] +name = "backscattering-analyzer" +version = "0.0.1" +authors = [ + { name="linarphy", email="linarphy@linarphy.com" }, +] +description = "Analyze backscattering noise" +readme = "README.md" +requires-python = ">=3.8" +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", + "Intended Audience :: Science/Research", + "Operating System :: OS Independent", + "Development Status :: 2 - Pre-Alpha", + "Topic :: Scientific/Engineering", +] + +[project.urls] +Homepage = "https://git.linarphy.net/linarphy/backscattering-analyzer" +Issues = "https://git.linarphy.net/linarphy/backscattering-analyzer/issues" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/backscattering-analyzer"] + +[tool.ruff] +line-length = 72 + +[tool.ruff.format] +quote-style = "double" +indent-style = "space" +docstring-code-format = true diff --git a/src/backscattering-analyzer/__init__.py b/src/backscattering-analyzer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backscattering-analyzer/analyzer.py b/src/backscattering-analyzer/analyzer.py new file mode 100644 index 0000000..0e4db3b --- /dev/null +++ b/src/backscattering-analyzer/analyzer.py @@ -0,0 +1,245 @@ +# display +from rich.console import Console +from rich.theme import Theme +from rich.traceback import install as traceback_install + +# utils +from sys import argv as options +from settings import Settings +from numpy import loadtxt, array +from scipy.io.matlab import loadmat + +# maths +from numpy import mean, zeros, pi, sin, cos, arange +from scipy.signal import welch as psd +from scipy.interpolate import CubicSpline + + +class Analyzer: + """ + Utility class to study backscattering light in VIRGO + """ + + def __init__(self, arguments=[]): + self.theme = Theme( + { + "main": "white bold", + "option": "grey50 italic", + "argument": "red", + "description": "green italic", + } + ) + self.console = Console(theme=self.theme) + traceback_install(console=self.console, show_locals=True) + self.settings = Settings(options[1:] + arguments, self.console) + self.load() + self.process_movement() + + def load(self): + """ + Load all data + """ + if self.settings.verbose: + self.console.log("Load all data") + try: + self.load_bench() + self.load_mirror() + self.load_data() + self.load_reference() + self.load_coupling() + except Exception: + raise Exception("An error occured during data loading") + + def load_bench(self): + """ + Load bench movement + """ + file = self.settings.bench_file() + if self.settings.verbose: + self.console.log("loading bench movement") + try: + self.bench_movement = loadtxt(file).T + except OSError: + raise Exception("{file} does not exist".format(file=file)) + + def load_mirror(self): + """ + Load mirror movement + """ + file = self.settings.mirror_file() + if self.settings.verbose: + self.console.log("loading mirror movement") + try: + self.mirror_movement = loadtxt(file).T + except OSError: + raise Exception("{file} does not exist".format(file=file)) + + def load_data(self): + """ + Load excited h(t) + """ + file = self.settings.data_file() + if self.settings.verbose: + self.console.log("loading excited h(t)") + try: + self.data_signal = loadtxt(file).T + except OSError: + raise Exception("{file} does not exist".format(file=file)) + + def load_reference(self): + """ + Load reference h(t) + """ + file = self.settings.reference_file() + if self.settings.verbose: + self.console.log("loading reference h(t)") + try: + self.reference_signal = loadtxt(file).T + except OSError: + raise Exception("{file} does not exist".format(file=file)) + + def load_coupling(self): + """ + Load modelisation coupling data + """ + file = self.settings.modelisation_file() + if self.settings.verbose: + self.console.log("loading matlab modelisation data") + try: + self.modelisation = loadmat(file) + except OSError: + raise Exception("{file} does not exist".format(file=file)) + + def process_movement(self): + """ + Clean and compute relative movement between the bench and the + miror + """ + if self.settings.verbose: + self.console.log("computing relative movement") + + self.movement = array( + [ + self.bench_movement[0], + self.bench_movement[1] - self.mirror_movement[1], + ] + ) + self.movement[1] -= mean(self.movement[1]) + + def psd_signal(self, signal, fft_length=10): + """ + Compute psd of a given signal + """ + if self.settings.verbose: + self.console.log("computing psd of the given signal") + rate = 1 / (signal[0, 1] - signal[0, 0]) + + return array( + psd( + signal[1], + rate, + nperseg=int(fft_length * rate), + detrend="linear", + ) + ) + + def diff_psd_ref_data(self): + """ + Give difference between reference and excited signal psd + """ + if self.settings.verbose: + self.console.log( + "compute difference between reference and excited " + "signal (on the psd)" + ) + freq, data_psd = self.psd_signal(self.data_signal) + _, reference_psd = self.psd_signal(self.reference_signal) + return array( + [ + freq, + data_psd - reference_psd, + ] + ) + + def interpolate(self, signals): + """ + Interpolate multiple signals with a single abciss list, which + has the smallest interval and the biggest rate + """ + rates = 1 / (signals[:, 0, 1] - signals[:, 0, 0]) + start = max(signals[:, 0, 0]) + end = min(signals[:, 0, -1]) + + splines = [ + CubicSpline(signal[0], signal[1]) for signal in signals + ] + + x = arange(start, end, 1 / max(rates)) + + signals = [array([x, spline(x)]) for spline in splines] + + return signals + + def compute_light(self): + """ + Compute psd of the computed projection with current bench + excitation + """ + coupling = self.modelisation[self.settings.coupling_name()] + + coupling_1, coupling_2 = ( + array( + [ + self.modelisation["freq"][0], + coupling[0], + ] + ), + array( + [ + self.modelisation["freq"][0], + coupling[1], + ] + ), + ) + + rate = 1 / (self.movement[0, 1] - self.movement[0, 0]) + + result = zeros(self.bench_movement.shape[1]) + scattering_factor = [0, 0] + + for index in range( + len(self.modelisation[self.settings.coupling_name()]) + ): + argument = ( + self.movement[1] + * 4 + * pi + * (index + 1) + / self.settings.wavelength + ) + + result += ( + scattering_factor[index] + * (self.settings.wavelength / 4 * pi) ** 2 + * ( + ( + psd( + sin(argument), + fs=rate, + nperseg=2999, + )[1] + * abs(coupling[0]) + ) + ** 2 + + ( + psd( + cos(argument), + fs=rate, + nperseg=2999, + )[1] + * abs(coupling[1]) + ) + ** 2 + ) + ) + return result diff --git a/src/backscattering-analyzer/settings.py b/src/backscattering-analyzer/settings.py new file mode 100644 index 0000000..a5735ad --- /dev/null +++ b/src/backscattering-analyzer/settings.py @@ -0,0 +1,146 @@ +from pathlib import Path + + +class Settings: + def __init__(self, options, console): + self.version = "0.0.1" + self.help = ( + "[main]display[/main] [option]\\[options][/option]" + "\n [argument]-b --bench[argument] [description]bench of the experiment[/description]" + "\n [argument]-d --date[argument] [description]date of the experiment[/description]" + "\n [argument]-h --help[argument] [description]print this help and exit[/description]" + "\n [argument]-v --verbose[argument] [description]be verbose[/description]" + "\n [argument]-V --version[argument] [description]print version number and exit[/description]" + ) + self.verbose = False + self.bench = "SWEB" + self.date = "2023_03_24" + self.folder = Path("/home/demagny/data") + self.modelisation = "scatterCouplingO4.mat" + self.wavelength = 1.064e-6 + + index = 0 + while index < len(options): + option = options[index] + try: + if option[0] != "-": + if option == "h": + option = "--help" + if option == "V": + option = "--version" + if "v" in option: + options[index] = option.replace("v", "") + option = "--verbose" + index -= 1 + if option == "b": + index += 1 + try: + options[index] = "--bench={}".format( + options[index] + ) + except IndexError: + raise Exception( + "bench name needed after b option" + ) + if option == "d": + index += 1 + try: + options[index] = "--date={}".format( + options[index] + ) + except IndexError: + raise Exception( + "date needed after d option" + ) + if option[0] == "-": + if option[1] != "-": + if option == "-h": + option = "--help" + if option == "-V": + option = "--version" + if "v" in option: + options[index] = option.replace("v", "") + option = "--verbose" + index -= 1 + if option == "-b": + index += 1 + try: + options[index] = "--bench={}".format( + options[index] + ) + except IndexError: + raise Exception( + "bench name needed after -b option" + ) + if option == "-d": + index += 1 + try: + options[index] = "--date={}".format( + options[index] + ) + except IndexError: + raise Exception( + "date needed after -d option" + ) + if option[:2] == "--": + if option == "--help": + console.print(self.help) + exit(0) + if option == "--version": + console.print(self.version) + exit(0) + if option == "--verbose": + self.verbose = True + try: + if option[:8] == "--bench=": + self.bench = str(option[8:]).upper() + except IndexError: + continue + try: + if option[:7] == "--date=": + self.date = option[7:] + except IndexError: + continue + else: + raise Exception("unknown option {}".format(option)) + except IndexError: + pass + index += 1 + + def bench_file(self): + return self.folder / ( + "{bench}_{date}_ben.csv".format( + bench=self.bench, + date=self.date, + ) + ) + + def mirror_file(self): + return self.folder / ( + "{bench}_{date}_mir.csv".format( + bench=self.bench, + date=self.date, + ) + ) + + def data_file(self): + return self.folder / ( + "{bench}_{date}_dat.csv".format( + bench=self.bench, + date=self.date, + ) + ) + + def reference_file(self): + return self.folder / ( + "{bench}_{date}_ref.csv".format( + bench=self.bench, + date=self.date, + ) + ) + + def modelisation_file(self): + return self.folder / (self.modelisation) + + def coupling_name(self): + return "{bench}coupling".format(bench=self.bench)