From 2283a9a778bafe23ad705b993be5f2678230102e Mon Sep 17 00:00:00 2001 From: linarphy Date: Wed, 12 Jun 2024 15:17:16 +0200 Subject: [PATCH] Update on refactoring --- src/backscattering_analyzer/data.py | 141 ++++++++++++---------- src/backscattering_analyzer/display.py | 52 ++++---- src/backscattering_analyzer/experiment.py | 117 ++++++++++++------ src/backscattering_analyzer/movements.py | 8 +- src/backscattering_analyzer/utils.py | 8 +- 5 files changed, 192 insertions(+), 134 deletions(-) diff --git a/src/backscattering_analyzer/data.py b/src/backscattering_analyzer/data.py index 16c4f95..cee5b59 100644 --- a/src/backscattering_analyzer/data.py +++ b/src/backscattering_analyzer/data.py @@ -1,17 +1,20 @@ from pathlib import Path from tomllib import load from numpy.lib.npyio import loadtxt -from science_signal import to_signal -from science_signal.signal import Signal -from scipy.io.matlab import loadmat +from science_signal import to_signal # type: ignore[reportMissingTypeStubs] +from science_signal.signal import Signal # type: ignore[reportMissingTypeStubs] +from scipy.io.matlab import loadmat # type: ignore[reportMissingTypeStubs] from backscattering_analyzer import logger -from backscattering_analyzer.acquisition import Acquisition, AcquisitionType +from backscattering_analyzer.acquisition import ( + Acquisition, + AcquisitionType, +) from backscattering_analyzer.movements import Movements class Data: def __init__(self, folder: Path, bench: str, date: str): - logger.info( + logger.debug( _("loading data for {bench} in {date}").format( bench=bench, date=date ) @@ -27,7 +30,7 @@ class Data: try: with open(files["metadata"], "rb") as file: metadata = load(file) - logger.info( + logger.debug( _("successfully load {metadata}").format( metadata=files["metadata"], ) @@ -136,53 +139,52 @@ class Data: "reference" ] / "{channel}.csv".format(channel=channel_names["sensibility"]) - self.reference = Acquisition(type = AcquisitionType(0)) + time: float + duration: float try: - self.reference.movements = Movements( + movements = Movements( bench=to_signal(loadtxt(files["ref_bench"]).T), mirror=to_signal(loadtxt(files["ref_mirror"]).T), ) - self.reference.sensibility = to_signal( - loadtxt(files["ref_sensibility"]).T + sensibility = to_signal(loadtxt(files["ref_sensibility"]).T) + time = sensibility.x[0] + duration = sensibility.x[-1] - sensibility.x[0] + logger.debug( + _( + "successfully loaded reference data files: " + + "\n {bench}" + + "\n {mirror}" + + "\n {sensibility}" + ).format( + bench=files["ref_bench"], + mirror=files["ref_mirror"], + sensibility=files["ref_sensibility"], + ) ) - self.reference.time = self.reference.sensibility[0][0] - self.reference.duration = ( - self.reference.sensibility[0][-1] - - self.reference.sensibility[0][0] - ) - logger.debug(_("successfully loaded reference data")) try: - self.reference.time = metadata_reference["time"] - self.reference.duration = metadata_reference["duration"] - if ( - self.reference.sensibility[0][0] - != self.reference.time - ): + time = metadata_reference["time"] + duration = metadata_reference["duration"] + if sensibility.x[0] != time: logger.warning( _( "time between metadata and data does not match: " + "{data} != {metadata}" ).format( - data=self.reference.sensibility[0][0], - metadata=self.reference.time, + data=sensibility.x[0], + metadata=time, ) ) - if ( - self.reference.sensibility[0][-1] - - self.reference.sensibility[0][0] - != self.reference.duration - ): + if abs( + sensibility.x[-1] - sensibility.x[0] - duration + ) > 1e-2: logger.warning( _( "duration between metadata and data does not " + "match: {data} != {metadata}" ).format( - data=( - self.reference.sensibility[0][-1] - - self.reference.sensibility[0][0] - ), - metadata=self.reference.duration, + data=(sensibility.x[-1] - sensibility.x[0]), + metadata=duration, ) ) except KeyError: @@ -202,9 +204,15 @@ class Data: ) raise e - logger.debug(_("reference data loaded and checked")) + self.reference = Acquisition( + time=time, + duration=duration, + movements=movements, + sensibility=sensibility, + type=AcquisitionType.REFERENCE, + ) - self.excited = Acquisition(type = AcquisitionType(1)) + logger.debug(_("reference data loaded and checked")) if excited: files["exc_bench"] = folders[ @@ -219,50 +227,52 @@ class Data: channel=channel_names["sensibility"] ) try: - self.excited.movements = Movements( + movements = Movements( bench=to_signal(loadtxt(files["exc_bench"]).T), mirror=to_signal(loadtxt(files["exc_mirror"]).T), ) - self.excited_sensibility = to_signal( + sensibility = to_signal( loadtxt(files["exc_sensibility"]).T ) - self.excited.time = self.excited.sensibility[0][0] - self.excited.duration = ( - self.excited.sensibility[0][-1] - - self.excited.sensibility[0][0] + time = sensibility.x[0] + duration = sensibility.x[-1] - sensibility.x[0] + logger.debug( + _( + "successfully loaded excited data files: " + + "\n {bench}" + + "\n {mirror}" + + "\n {sensibility}" + ).format( + bench=files["exc_bench"], + mirror=files["exc_mirror"], + sensibility=files["exc_sensibility"], + ) ) - logger.debug(_("successfully loadd excited data")) try: - self.excited.time = metadata_excited["time"] - self.excited.duration = metadata_excited["duration"] - if ( - self.excited.sensibility[0][0] - != self.excited.time - ): + time = metadata_excited["time"] + duration = metadata_excited["duration"] + if sensibility.x[0] != time: logger.warning( _( "time between metadata and data does not " + "match: {data} != {metadata}" ).format( - data=self.excited.sensibility[0][0], - metadata=self.excited.time, + data=sensibility.x[0], + metadata=time, ) ) - if ( - self.excited.sensibility[0][-1] - - self.excited.sensibility[0][0] - != self.excited.duration - ): + if abs( + sensibility.x[-1] - sensibility.x[0] - duration + ) > 1e-2: logger.warning( _( "duration between metadata and data does " + "not match: {data} != {metadata}" ).format( data=( - self.excited.sensibility[0][-1] - - self.excited.sensibility[0][0] + sensibility.x[-1] - sensibility.x[0] ), - metadata=self.excited.duration, + metadata=duration, ) ) except KeyError: @@ -281,6 +291,13 @@ class Data: ) ) ) + self.excited = Acquisition( + time=time, + duration=duration, + movements=movements, + sensibility=sensibility, + type=AcquisitionType.EXCITED, + ) logger.debug(_("excited data loaded and checked")) @@ -313,7 +330,7 @@ def load_coupling( """ Load and correct coupling date from modelisation """ - logger.info( + logger.debug( _("loading coupling for {bench} in {date}").format( bench=bench, date=date ) @@ -321,7 +338,7 @@ def load_coupling( modelisation_file = folder / file try: modelisation_data = loadmat(modelisation_file) - logger.info( + logger.debug( _("file successfully loaded:\n{file}").format( file=modelisation_file ) @@ -378,7 +395,7 @@ def load_coupling( / model_powers["laser"] ) # radiation elif bench in ["SIB2", "SPRB"]: - logger.info( + logger.warning( _( "cannot fix projection power for SIB2 and SPRB coupling" ) diff --git a/src/backscattering_analyzer/display.py b/src/backscattering_analyzer/display.py index 4afe2c3..f7e0346 100644 --- a/src/backscattering_analyzer/display.py +++ b/src/backscattering_analyzer/display.py @@ -16,44 +16,38 @@ def show_projection( show, legend, close, - vlines, title, xlabel, ylabel, ) - excited = experiment.signals["excited"].psd().sqrt() - reference = experiment.signals["reference"].psd().sqrt() + excited = experiment.data.excited.sensibility.psd().sqrt() + reference = experiment.data.reference.sensibility.psd().sqrt() loglog( - experiment.projection.x, - experiment.projection.y, - label="projection", + experiment.projection_excited.x, + experiment.projection_excited.y, + label=_("projection with excitation"), ) # type: ignore[ReportUnusedCallResult] - loglog(excited.x, excited.y, label=_("excited bench")) # type: ignore[ReportUnusedCallResult] - loglog(reference.x, reference.y, label=_("reference bench")) # type: ignore[ReportUnusedCallResult] loglog( - (experiment.projection + reference).x, - (experiment.projection + reference).y, - label=_("sum reference + excited"), + experiment.projection_reference.x, + experiment.projection_reference.y, + label=_("projection without excitation"), + ) # type: ignore[ReportUnunsedCallResult] + loglog( + excited.x, + excited.y, + label=_("detected sensibility with excitation"), + ) # type: ignore[ReportUnusedCallResult] + loglog( + reference.x, + reference.y, + label=_("detected sensibility without excitation"), + ) # type: ignore[ReportUnusedCallResult] + loglog( + (experiment.projection_excited + reference).x, + (experiment.projection_excited + reference).y, + label=_("sum: no excitation and projection with excitation"), ) # type: ignore[ReportUnusedCallResult] - if start_frequency is not None: - vlines( - [ - start_frequency, - ], - min(experiment.projection.y), - max(experiment.projection.y), - color="k", - ) # type: ignore[ReportUnusedCallResult] - if end_frequency is not None: - vlines( - [ - end_frequency, - ], - min(experiment.projection.y), - max(experiment.projection.y), - color="k", - ) # type: ignore[ReportUnusedCallResult] legend() # type: ignore[ReportUnusedCallResult] title(experiment.bench) #  type: ignore[ReportUnusedCallResult] xlabel(_("frequency (Hz)")) # type: ignore[ReportUnusedCallResult] diff --git a/src/backscattering_analyzer/experiment.py b/src/backscattering_analyzer/experiment.py index 91750f7..360d424 100644 --- a/src/backscattering_analyzer/experiment.py +++ b/src/backscattering_analyzer/experiment.py @@ -7,6 +7,7 @@ from numpy.core.multiarray import array from science_signal import interpolate from science_signal.signal import Signal from backscattering_analyzer import logger +from backscattering_analyzer.acquisition import AcquisitionType from backscattering_analyzer.data import ( Data, load_coupling, @@ -40,7 +41,7 @@ class Experiment: try: with open(values_file, "rb") as file: values = load(file) - logger.info( + logger.debug( _("successfully loaded values from {file}").format( file=values_file ) @@ -254,25 +255,74 @@ class Experiment: ) self.factors = self.fit_factors() - self.projection = self.compute_projection() + self.projection_excited = self.compute_projection( + AcquisitionType.EXCITED, + ) + self.projection_reference = self.compute_projection( + AcquisitionType.REFERENCE, + ) logger.debug(_("end of experiment initialisation")) def get_factors( self, + phase: float, + *signals: Signal, start: float | None = None, end: float | None = None, - ) -> tuple[Signal, Signal, Signal, Signal, Signal, Signal, float]: + type: AcquisitionType = AcquisitionType.EXCITED, + ) -> tuple[Signal, ...]: """ Get factor to compute the projection for a given scatter factor """ - phase = 4 * pi / self.wavelength - factor_n = (self.excited_movement * phase).sin().psd().sqrt() coupling_n = self.coupling[0] - factor_d = ( - (self.excited_movement["true"] * phase).cos().psd().sqrt() - ) coupling_d = self.coupling[1] + if type is AcquisitionType.EXCITED: + if self.excited_movement is not None: + factor_n = ( + (self.excited_movement * phase).sin().psd().sqrt() + ) + factor_d = ( + (self.excited_movement * phase).cos().psd().sqrt() + ) + else: + logger.critical( + _( + "no excited signal given, cannot get factors " + + "of the excited signal" + ) + ) + raise Exception( + _( + "no excited signal given, cannot get facotrs " + + "of the excited signal" + ) + ) + elif type is AcquisitionType.REFERENCE: + if self.reference_movement is not None: + factor_n = ( + (self.reference_movement * phase).sin().psd().sqrt() + ) + factor_d = ( + (self.reference_movement * phase).cos().psd().sqrt() + ) + else: + logger.critical( + _( + "no reference signal given, cannot get factors " + + "of the reference signal" + ) + ) + raise Exception( + _( + "no reference signal given, cannot get factors " + + "of the reference signal" + ) + ) + else: + logger.critical(_("unknown acquisition type")) + raise Exception(_("unknown acquisition type")) + if start is None: start = coupling_n.x[0] if end is None: @@ -280,30 +330,12 @@ class Experiment: coupling_n = coupling_n.cut(start, end) - ( + return interpolate( factor_n, coupling_n, factor_d, coupling_d, - excited, - reference, - ) = interpolate( - factor_n, - coupling_n, - factor_d, - coupling_d, - self.signals["excited"].psd().sqrt(), - self.signals["reference"].psd().sqrt(), - ) - - return ( - factor_n, - coupling_n, - factor_d, - coupling_d, - excited, - reference, - phase, + *signals, ) def fit_factors( @@ -318,6 +350,7 @@ class Experiment: Find the best factor (first order only) to get the projection that best match the excited signal """ + phase = 4 * pi / self.wavelength ( factor_n, coupling_n, @@ -325,8 +358,14 @@ class Experiment: coupling_d, excited, reference, + ) = self.get_factors( phase, - ) = self.get_factors(start=start_frequency, end=end_frequency) + self.data.excited.sensibility.psd().sqrt(), + self.data.reference.sensibility.psd().sqrt(), + start=start_frequency, + end=end_frequency, + type=AcquisitionType.EXCITED, + ) for index in range(precision): logger.debug(_("search for a local minimum")) @@ -358,17 +397,17 @@ class Experiment: if argmin(y) == 0: logger.warning(_("smaller than current range allows")) - scatter_max: float = x[1] + scatter_max = x[1] elif argmin(y) == len(x) - 1: logger.warning(_("bigger than current range allows")) - scatter_min: float = x[-2] + scatter_min = x[-2] else: - scatter_min: float = x[argmin(y) - 1] - scatter_max: float = x[argmin(y) + 1] + scatter_min = x[argmin(y) - 1] + scatter_max = x[argmin(y) + 1] logger.debug(_("local minimum found")) - pre_scatter_factor: float = scatter_min / 2 + scatter_max / 2 + pre_scatter_factor = scatter_min / 2 + scatter_max / 2 logger.info( _("found a scattering factor of {factor}").format( @@ -381,19 +420,19 @@ class Experiment: "true": pre_scatter_factor * 1e-6, } - def compute_projection(self) -> Signal: + def compute_projection( + self, type: AcquisitionType = AcquisitionType.EXCITED + ) -> Signal: """ Compute the projection of the noise from current values """ + phase = 4 * pi / self.wavelength ( factor_n, coupling_n, factor_d, coupling_d, - _, - _, - phase, - ) = self.get_factors() + ) = self.get_factors(phase, type=type) return Signal( factor_n.x, diff --git a/src/backscattering_analyzer/movements.py b/src/backscattering_analyzer/movements.py index 5f6c3c1..0ed4ee2 100644 --- a/src/backscattering_analyzer/movements.py +++ b/src/backscattering_analyzer/movements.py @@ -8,5 +8,9 @@ class Movements: Container for the movement of the bench and the mirror """ - bench: str - mirror: str + bench: Signal + mirror: Signal + + def __init__(self, bench: Signal, mirror: Signal): + self.bench = bench * 1e-6 + self.mirror = mirror * 1e-6 diff --git a/src/backscattering_analyzer/utils.py b/src/backscattering_analyzer/utils.py index 43fcd5d..55d9491 100644 --- a/src/backscattering_analyzer/utils.py +++ b/src/backscattering_analyzer/utils.py @@ -14,7 +14,7 @@ def process_movement( """ Clean and compute relative movement between the bench and the mirror """ - logger.info(_("computing relative movement")) + logger.debug(_("computing relative movement")) return ( ( @@ -28,7 +28,7 @@ def process_movement( def compute_light( - pre_scatter_factor: float, + pre_scatter_factor: float | None, factor_n: Signal, coupling_n: Signal, factor_d: Signal, @@ -38,6 +38,10 @@ def compute_light( """ Optimized computing of light with pre-computed factor """ + if pre_scatter_factor is None: + raise Exception(_( + "Cannot compute projection without backscattering factor" + )) return ( sqrt(pre_scatter_factor) / phase