Update on refactoring

This commit is contained in:
linarphy 2024-06-12 15:17:16 +02:00
parent 2dea7090e3
commit 2283a9a778
No known key found for this signature in database
GPG key ID: E61920135EFF2295
5 changed files with 192 additions and 134 deletions

View file

@ -1,17 +1,20 @@
from pathlib import Path
from tomllib import load
from numpy.lib.npyio import loadtxt
from science_signal import to_signal
from science_signal.signal import Signal
from scipy.io.matlab import loadmat
from science_signal import to_signal # type: ignore[reportMissingTypeStubs]
from science_signal.signal import Signal # type: ignore[reportMissingTypeStubs]
from scipy.io.matlab import loadmat # type: ignore[reportMissingTypeStubs]
from backscattering_analyzer import logger
from backscattering_analyzer.acquisition import Acquisition, AcquisitionType
from backscattering_analyzer.acquisition import (
Acquisition,
AcquisitionType,
)
from backscattering_analyzer.movements import Movements
class Data:
def __init__(self, folder: Path, bench: str, date: str):
logger.info(
logger.debug(
_("loading data for {bench} in {date}").format(
bench=bench, date=date
)
@ -27,7 +30,7 @@ class Data:
try:
with open(files["metadata"], "rb") as file:
metadata = load(file)
logger.info(
logger.debug(
_("successfully load {metadata}").format(
metadata=files["metadata"],
)
@ -136,53 +139,52 @@ class Data:
"reference"
] / "{channel}.csv".format(channel=channel_names["sensibility"])
self.reference = Acquisition(type = AcquisitionType(0))
time: float
duration: float
try:
self.reference.movements = Movements(
movements = Movements(
bench=to_signal(loadtxt(files["ref_bench"]).T),
mirror=to_signal(loadtxt(files["ref_mirror"]).T),
)
self.reference.sensibility = to_signal(
loadtxt(files["ref_sensibility"]).T
sensibility = to_signal(loadtxt(files["ref_sensibility"]).T)
time = sensibility.x[0]
duration = sensibility.x[-1] - sensibility.x[0]
logger.debug(
_(
"successfully loaded reference data files: "
+ "\n {bench}"
+ "\n {mirror}"
+ "\n {sensibility}"
).format(
bench=files["ref_bench"],
mirror=files["ref_mirror"],
sensibility=files["ref_sensibility"],
)
self.reference.time = self.reference.sensibility[0][0]
self.reference.duration = (
self.reference.sensibility[0][-1]
- self.reference.sensibility[0][0]
)
logger.debug(_("successfully loaded reference data"))
try:
self.reference.time = metadata_reference["time"]
self.reference.duration = metadata_reference["duration"]
if (
self.reference.sensibility[0][0]
!= self.reference.time
):
time = metadata_reference["time"]
duration = metadata_reference["duration"]
if sensibility.x[0] != time:
logger.warning(
_(
"time between metadata and data does not match: "
+ "{data} != {metadata}"
).format(
data=self.reference.sensibility[0][0],
metadata=self.reference.time,
data=sensibility.x[0],
metadata=time,
)
)
if (
self.reference.sensibility[0][-1]
- self.reference.sensibility[0][0]
!= self.reference.duration
):
if abs(
sensibility.x[-1] - sensibility.x[0] - duration
) > 1e-2:
logger.warning(
_(
"duration between metadata and data does not "
+ "match: {data} != {metadata}"
).format(
data=(
self.reference.sensibility[0][-1]
- self.reference.sensibility[0][0]
),
metadata=self.reference.duration,
data=(sensibility.x[-1] - sensibility.x[0]),
metadata=duration,
)
)
except KeyError:
@ -202,9 +204,15 @@ class Data:
)
raise e
logger.debug(_("reference data loaded and checked"))
self.reference = Acquisition(
time=time,
duration=duration,
movements=movements,
sensibility=sensibility,
type=AcquisitionType.REFERENCE,
)
self.excited = Acquisition(type = AcquisitionType(1))
logger.debug(_("reference data loaded and checked"))
if excited:
files["exc_bench"] = folders[
@ -219,50 +227,52 @@ class Data:
channel=channel_names["sensibility"]
)
try:
self.excited.movements = Movements(
movements = Movements(
bench=to_signal(loadtxt(files["exc_bench"]).T),
mirror=to_signal(loadtxt(files["exc_mirror"]).T),
)
self.excited_sensibility = to_signal(
sensibility = to_signal(
loadtxt(files["exc_sensibility"]).T
)
self.excited.time = self.excited.sensibility[0][0]
self.excited.duration = (
self.excited.sensibility[0][-1]
- self.excited.sensibility[0][0]
time = sensibility.x[0]
duration = sensibility.x[-1] - sensibility.x[0]
logger.debug(
_(
"successfully loaded excited data files: "
+ "\n {bench}"
+ "\n {mirror}"
+ "\n {sensibility}"
).format(
bench=files["exc_bench"],
mirror=files["exc_mirror"],
sensibility=files["exc_sensibility"],
)
)
logger.debug(_("successfully loadd excited data"))
try:
self.excited.time = metadata_excited["time"]
self.excited.duration = metadata_excited["duration"]
if (
self.excited.sensibility[0][0]
!= self.excited.time
):
time = metadata_excited["time"]
duration = metadata_excited["duration"]
if sensibility.x[0] != time:
logger.warning(
_(
"time between metadata and data does not "
+ "match: {data} != {metadata}"
).format(
data=self.excited.sensibility[0][0],
metadata=self.excited.time,
data=sensibility.x[0],
metadata=time,
)
)
if (
self.excited.sensibility[0][-1]
- self.excited.sensibility[0][0]
!= self.excited.duration
):
if abs(
sensibility.x[-1] - sensibility.x[0] - duration
) > 1e-2:
logger.warning(
_(
"duration between metadata and data does "
+ "not match: {data} != {metadata}"
).format(
data=(
self.excited.sensibility[0][-1]
- self.excited.sensibility[0][0]
sensibility.x[-1] - sensibility.x[0]
),
metadata=self.excited.duration,
metadata=duration,
)
)
except KeyError:
@ -281,6 +291,13 @@ class Data:
)
)
)
self.excited = Acquisition(
time=time,
duration=duration,
movements=movements,
sensibility=sensibility,
type=AcquisitionType.EXCITED,
)
logger.debug(_("excited data loaded and checked"))
@ -313,7 +330,7 @@ def load_coupling(
"""
Load and correct coupling date from modelisation
"""
logger.info(
logger.debug(
_("loading coupling for {bench} in {date}").format(
bench=bench, date=date
)
@ -321,7 +338,7 @@ def load_coupling(
modelisation_file = folder / file
try:
modelisation_data = loadmat(modelisation_file)
logger.info(
logger.debug(
_("file successfully loaded:\n{file}").format(
file=modelisation_file
)
@ -378,7 +395,7 @@ def load_coupling(
/ model_powers["laser"]
) # radiation
elif bench in ["SIB2", "SPRB"]:
logger.info(
logger.warning(
_(
"cannot fix projection power for SIB2 and SPRB coupling"
)

View file

@ -16,43 +16,37 @@ def show_projection(
show,
legend,
close,
vlines,
title,
xlabel,
ylabel,
)
excited = experiment.signals["excited"].psd().sqrt()
reference = experiment.signals["reference"].psd().sqrt()
excited = experiment.data.excited.sensibility.psd().sqrt()
reference = experiment.data.reference.sensibility.psd().sqrt()
loglog(
experiment.projection.x,
experiment.projection.y,
label="projection",
experiment.projection_excited.x,
experiment.projection_excited.y,
label=_("projection with excitation"),
) # type: ignore[ReportUnusedCallResult]
loglog(excited.x, excited.y, label=_("excited bench")) # type: ignore[ReportUnusedCallResult]
loglog(reference.x, reference.y, label=_("reference bench")) # type: ignore[ReportUnusedCallResult]
loglog(
(experiment.projection + reference).x,
(experiment.projection + reference).y,
label=_("sum reference + excited"),
experiment.projection_reference.x,
experiment.projection_reference.y,
label=_("projection without excitation"),
) # type: ignore[ReportUnunsedCallResult]
loglog(
excited.x,
excited.y,
label=_("detected sensibility with excitation"),
) # type: ignore[ReportUnusedCallResult]
if start_frequency is not None:
vlines(
[
start_frequency,
],
min(experiment.projection.y),
max(experiment.projection.y),
color="k",
loglog(
reference.x,
reference.y,
label=_("detected sensibility without excitation"),
) # type: ignore[ReportUnusedCallResult]
if end_frequency is not None:
vlines(
[
end_frequency,
],
min(experiment.projection.y),
max(experiment.projection.y),
color="k",
loglog(
(experiment.projection_excited + reference).x,
(experiment.projection_excited + reference).y,
label=_("sum: no excitation and projection with excitation"),
) # type: ignore[ReportUnusedCallResult]
legend() # type: ignore[ReportUnusedCallResult]
title(experiment.bench) #  type: ignore[ReportUnusedCallResult]

View file

@ -7,6 +7,7 @@ from numpy.core.multiarray import array
from science_signal import interpolate
from science_signal.signal import Signal
from backscattering_analyzer import logger
from backscattering_analyzer.acquisition import AcquisitionType
from backscattering_analyzer.data import (
Data,
load_coupling,
@ -40,7 +41,7 @@ class Experiment:
try:
with open(values_file, "rb") as file:
values = load(file)
logger.info(
logger.debug(
_("successfully loaded values from {file}").format(
file=values_file
)
@ -254,25 +255,74 @@ class Experiment:
)
self.factors = self.fit_factors()
self.projection = self.compute_projection()
self.projection_excited = self.compute_projection(
AcquisitionType.EXCITED,
)
self.projection_reference = self.compute_projection(
AcquisitionType.REFERENCE,
)
logger.debug(_("end of experiment initialisation"))
def get_factors(
self,
phase: float,
*signals: Signal,
start: float | None = None,
end: float | None = None,
) -> tuple[Signal, Signal, Signal, Signal, Signal, Signal, float]:
type: AcquisitionType = AcquisitionType.EXCITED,
) -> tuple[Signal, ...]:
"""
Get factor to compute the projection for a given scatter factor
"""
phase = 4 * pi / self.wavelength
factor_n = (self.excited_movement * phase).sin().psd().sqrt()
coupling_n = self.coupling[0]
factor_d = (
(self.excited_movement["true"] * phase).cos().psd().sqrt()
)
coupling_d = self.coupling[1]
if type is AcquisitionType.EXCITED:
if self.excited_movement is not None:
factor_n = (
(self.excited_movement * phase).sin().psd().sqrt()
)
factor_d = (
(self.excited_movement * phase).cos().psd().sqrt()
)
else:
logger.critical(
_(
"no excited signal given, cannot get factors "
+ "of the excited signal"
)
)
raise Exception(
_(
"no excited signal given, cannot get facotrs "
+ "of the excited signal"
)
)
elif type is AcquisitionType.REFERENCE:
if self.reference_movement is not None:
factor_n = (
(self.reference_movement * phase).sin().psd().sqrt()
)
factor_d = (
(self.reference_movement * phase).cos().psd().sqrt()
)
else:
logger.critical(
_(
"no reference signal given, cannot get factors "
+ "of the reference signal"
)
)
raise Exception(
_(
"no reference signal given, cannot get factors "
+ "of the reference signal"
)
)
else:
logger.critical(_("unknown acquisition type"))
raise Exception(_("unknown acquisition type"))
if start is None:
start = coupling_n.x[0]
if end is None:
@ -280,30 +330,12 @@ class Experiment:
coupling_n = coupling_n.cut(start, end)
(
return interpolate(
factor_n,
coupling_n,
factor_d,
coupling_d,
excited,
reference,
) = interpolate(
factor_n,
coupling_n,
factor_d,
coupling_d,
self.signals["excited"].psd().sqrt(),
self.signals["reference"].psd().sqrt(),
)
return (
factor_n,
coupling_n,
factor_d,
coupling_d,
excited,
reference,
phase,
*signals,
)
def fit_factors(
@ -318,6 +350,7 @@ class Experiment:
Find the best factor (first order only) to get the projection
that best match the excited signal
"""
phase = 4 * pi / self.wavelength
(
factor_n,
coupling_n,
@ -325,8 +358,14 @@ class Experiment:
coupling_d,
excited,
reference,
) = self.get_factors(
phase,
) = self.get_factors(start=start_frequency, end=end_frequency)
self.data.excited.sensibility.psd().sqrt(),
self.data.reference.sensibility.psd().sqrt(),
start=start_frequency,
end=end_frequency,
type=AcquisitionType.EXCITED,
)
for index in range(precision):
logger.debug(_("search for a local minimum"))
@ -358,17 +397,17 @@ class Experiment:
if argmin(y) == 0:
logger.warning(_("smaller than current range allows"))
scatter_max: float = x[1]
scatter_max = x[1]
elif argmin(y) == len(x) - 1:
logger.warning(_("bigger than current range allows"))
scatter_min: float = x[-2]
scatter_min = x[-2]
else:
scatter_min: float = x[argmin(y) - 1]
scatter_max: float = x[argmin(y) + 1]
scatter_min = x[argmin(y) - 1]
scatter_max = x[argmin(y) + 1]
logger.debug(_("local minimum found"))
pre_scatter_factor: float = scatter_min / 2 + scatter_max / 2
pre_scatter_factor = scatter_min / 2 + scatter_max / 2
logger.info(
_("found a scattering factor of {factor}").format(
@ -381,19 +420,19 @@ class Experiment:
"true": pre_scatter_factor * 1e-6,
}
def compute_projection(self) -> Signal:
def compute_projection(
self, type: AcquisitionType = AcquisitionType.EXCITED
) -> Signal:
"""
Compute the projection of the noise from current values
"""
phase = 4 * pi / self.wavelength
(
factor_n,
coupling_n,
factor_d,
coupling_d,
_,
_,
phase,
) = self.get_factors()
) = self.get_factors(phase, type=type)
return Signal(
factor_n.x,

View file

@ -8,5 +8,9 @@ class Movements:
Container for the movement of the bench and the mirror
"""
bench: str
mirror: str
bench: Signal
mirror: Signal
def __init__(self, bench: Signal, mirror: Signal):
self.bench = bench * 1e-6
self.mirror = mirror * 1e-6

View file

@ -14,7 +14,7 @@ def process_movement(
"""
Clean and compute relative movement between the bench and the mirror
"""
logger.info(_("computing relative movement"))
logger.debug(_("computing relative movement"))
return (
(
@ -28,7 +28,7 @@ def process_movement(
def compute_light(
pre_scatter_factor: float,
pre_scatter_factor: float | None,
factor_n: Signal,
coupling_n: Signal,
factor_d: Signal,
@ -38,6 +38,10 @@ def compute_light(
"""
Optimized computing of light with pre-computed factor
"""
if pre_scatter_factor is None:
raise Exception(_(
"Cannot compute projection without backscattering factor"
))
return (
sqrt(pre_scatter_factor)
/ phase