Compare commits

...

10 commits

8 changed files with 552 additions and 240 deletions

View file

@ -35,6 +35,7 @@ packages = ["src/backscattering_analyzer"]
[tool.ruff] [tool.ruff]
line-length = 72 line-length = 72
builtins = ["_"]
[tool.ruff.format] [tool.ruff.format]
quote-style = "double" quote-style = "double"

View file

@ -5,64 +5,3 @@ from gettext import install
install(__name__) install(__name__)
logger = getLogger(__name__) logger = getLogger(__name__)
def show_projection(
experiment: "Experiment",
start_frequency: float | None = None,
end_frequency: float | None = None,
) -> None:
"""
Show projection data with matplotlib
"""
logger.debug(_("showing experiment result"))
from matplotlib.pyplot import (
loglog,
show,
legend,
close,
vlines,
title,
xlabel,
ylabel,
)
excited = experiment.signals["excited"].psd().sqrt()
reference = experiment.signals["reference"].psd().sqrt()
loglog(
experiment.projection.x,
experiment.projection.y,
label="projection",
) # type: ignore[ReportUnusedCallResult]
loglog(excited.x, excited.y, label=_("excited bench")) # type: ignore[ReportUnusedCallResult]
loglog(reference.x, reference.y, label=_("reference bench")) # type: ignore[ReportUnusedCallResult]
loglog(
(experiment.projection + reference).x,
(experiment.projection + reference).y,
label=_("sum reference + excited"),
) # type: ignore[ReportUnusedCallResult]
if start_frequency is not None:
vlines(
[
start_frequency,
],
min(experiment.projection.y),
max(experiment.projection.y),
color="k",
) # type: ignore[ReportUnusedCallResult]
if end_frequency is not None:
vlines(
[
end_frequency,
],
min(experiment.projection.y),
max(experiment.projection.y),
color="k",
) # type: ignore[ReportUnusedCallResult]
legend() # type: ignore[ReportUnusedCallResult]
title(experiment.bench) # type: ignore[ReportUnusedCallResult]
xlabel(_("frequency (Hz)")) # type: ignore[ReportUnusedCallResult]
ylabel(_("stray ($\\frac{1} {\\sqrt {Hz}}$)")) # type: ignore[ReportUnusedCallResult]
show()
close()
logger.debug(_("experiment result showed"))

View file

@ -0,0 +1,22 @@
from enum import Enum
from dataclasses import dataclass
from science_signal.signal import Signal
from backscattering_analyzer.movements import Movements
class AcquisitionType(Enum):
REFERENCE = 0
EXCITED = 1
@dataclass
class Acquisition:
"""
A data acquisition with a specific time and duration
"""
movements: Movements
sensibility: Signal
type: AcquisitionType
time: float
duration: float

View file

@ -1,87 +1,322 @@
from pathlib import Path from pathlib import Path
from numpy.lib.npyio import loadtxt from tomllib import load
from science_signal.signal import Signal from numpy import loadtxt
from scipy.io.matlab import loadmat from science_signal import to_signal # type: ignore[reportMissingTypeStubs]
from science_signal.signal import Signal # type: ignore[reportMissingTypeStubs]
from scipy.io.matlab import loadmat # type: ignore[reportMissingTypeStubs]
from backscattering_analyzer import logger from backscattering_analyzer import logger
from backscattering_analyzer.acquisition import (
Acquisition,
AcquisitionType,
)
from backscattering_analyzer.movements import Movements
def load_signals( class Data:
folder: Path, bench: str, date: str def __init__(self, folder: Path, bench: str, date: str):
) -> dict[str, Signal]: logger.debug(
""" _("loading data for {bench} in {date}").format(
Load excited and reference signal of an experiment
"""
logger.info(
_("loading signals for {bench} in {date}").format(
bench=bench, date=date bench=bench, date=date
) )
) )
excited_file = folder / "{bench}_{date}_dat.csv".format( folders: dict[str, Path] = {} # list of folder
bench=bench, files: dict[str, Path] = {} # list of file
date=date,
) folders["experiment"] = folder / "experiment_{date}".format(
reference_file = folder / "{bench}_{date}_ref.csv".format(
bench=bench,
date=date, date=date,
) )
files["metadata"] = folders["experiment"] / "metadata.toml"
try: try:
excited_data = loadtxt(excited_file).T with open(files["metadata"], "rb") as file:
reference_data = loadtxt(reference_file).T metadata = load(file)
logger.debug(
_("successfully load {metadata}").format(
metadata=files["metadata"],
)
)
except OSError as error:
logger.critical(
_("cannot load {metadata}: {error}").format(
metadata=files["metadata"],
error=error,
)
)
raise error
try:
available_benches: list[str] = metadata["general"][
"benches"
]
except KeyError:
logger.critical(
_("malformed metadata, no benches provided")
)
raise Exception(
_("malformed metadata, no benches provided")
)
if bench not in available_benches:
logger.critical(
_("no data on {bench} in {date}").format(
bench=bench,
date=date,
)
)
raise Exception(
_("no data on {bench} in {date}").format(
bench=bench,
date=date,
)
)
description: str | None = None
try:
description = metadata["general"]["description"]
except KeyError:
logger.warning(
_("malformed metadata, no description provided")
)
logger.info( logger.info(
"data description: {description}".format(
description=description
)
)
metadata_reference: dict[str, float]
metadata_excited: dict[str, float]
excited = False
try:
metadata_reference = metadata["benches"][bench]["reference"]
folders["reference"] = folders[
"experiment"
] / "{bench}/reference".format(
bench=bench,
)
logger.debug(
_("there are multiple reference for each benches")
)
except KeyError:
try:
metadata_reference = metadata["reference"]
folders["reference"] = (
folders["experiment"] / "reference"
)
logger.debug(
_("there is one reference for every benches")
)
except KeyError:
logger.critical(
_("malformed metadata, no reference provided")
)
raise Exception(
_("malformed metadata, no reference provided")
)
try:
metadata_excited = metadata["benches"][bench]["excited"]
folders["excited"] = folders[
"experiment"
] / "{bench}/excited".format(
bench=bench,
)
excited = True
logger.debug(_("excited data have been taken"))
except KeyError:
logger.debug(_("no excited data have been taken"))
logger.debug(_("metadata fully read, starting to load data"))
channel_names = channel_name_mapping(bench)
files["ref_bench"] = folders[
"reference"
] / "{channel}.csv".format(channel=channel_names["bench"])
files["ref_mirror"] = folders[
"reference"
] / "{channel}.csv".format(channel=channel_names["mirror"])
files["ref_sensibility"] = folders[
"reference"
] / "{channel}.csv".format(channel=channel_names["sensibility"])
time: float
duration: float
try:
movements = Movements(
bench=to_signal(loadtxt(files["ref_bench"]).T),
mirror=to_signal(loadtxt(files["ref_mirror"]).T),
)
sensibility = to_signal(loadtxt(files["ref_sensibility"]).T)
time = sensibility.x[0]
duration = sensibility.x[-1] - sensibility.x[0]
logger.debug(
_( _(
"files successfully loaded:\n{excited}\n{reference}" "successfully loaded reference data files: "
).format(excited=excited_file, reference=reference_file) + "\n {bench}"
+ "\n {mirror}"
+ "\n {sensibility}"
).format(
bench=files["ref_bench"],
mirror=files["ref_mirror"],
sensibility=files["ref_sensibility"],
) )
except OSError as e:
logger.critical(
_("some file cannot be read: {error}").format(error=e)
)
raise e
return {
"excited": Signal(excited_data[0], excited_data[1]),
"reference": Signal(reference_data[0], reference_data[1]),
}
def load_movements(
folder: Path, bench: str, date: str
) -> dict[str, Signal]:
"""
Load excited movement of the bench and the nearest mirror of an
experiment
"""
logger.info(
_("loading movements for {bench} in {date}").format(
bench=bench, date=date
)
)
bench_file = folder / "{bench}_{date}_ben.csv".format(
bench=bench,
date=date,
)
mirror_file = folder / "{bench}_{date}_mir.csv".format(
bench=bench,
date=date,
) )
try: try:
bench_movement = loadtxt(bench_file).T time = metadata_reference["time"]
mirror_movement = loadtxt(mirror_file).T duration = metadata_reference["duration"]
bench_movement[1] = bench_movement[1] * 1e-6 # um -> m if sensibility.x[0] != time:
mirror_movement[1] = mirror_movement[1] * 1e-6 # um -> m logger.warning(
logger.info( _(
_("files successfully loaded:\n{bench}\n{mirror}").format( "time between metadata and data does not match: "
bench=bench_file, mirror=mirror_file + "{data} != {metadata}"
).format(
data=sensibility.x[0],
metadata=time,
)
)
if abs(
sensibility.x[-1] - sensibility.x[0] - duration
) > 1e-2:
logger.warning(
_(
"duration between metadata and data does not "
+ "match: {data} != {metadata}"
).format(
data=(sensibility.x[-1] - sensibility.x[0]),
metadata=duration,
)
)
except KeyError:
logger.warning(
_(
"malformed metadata, each reference entry should have "
+ "its own time and duration. Defining it with data"
) )
) )
except OSError as e: except OSError as e:
logger.critical( logger.critical(
_("some file cannot be read: {error}").format(error=e) _(
"some files could not be read: {error}".format(
error=e
)
)
) )
raise e raise e
return {
"bench": Signal(bench_movement[0], bench_movement[1]), self.reference = Acquisition(
"mirror": Signal(mirror_movement[0], mirror_movement[1]), time=time,
duration=duration,
movements=movements,
sensibility=sensibility,
type=AcquisitionType.REFERENCE,
)
logger.debug(_("reference data loaded and checked"))
if excited:
files["exc_bench"] = folders[
"excited"
] / "{channel}.csv".format(channel=channel_names["bench"])
files["exc_mirror"] = folders[
"excited"
] / "{channel}.csv".format(channel=channel_names["mirror"])
files["exc_sensibility"] = folders[
"excited"
] / "{channel}.csv".format(
channel=channel_names["sensibility"]
)
try:
movements = Movements(
bench=to_signal(loadtxt(files["exc_bench"]).T),
mirror=to_signal(loadtxt(files["exc_mirror"]).T),
)
sensibility = to_signal(
loadtxt(files["exc_sensibility"]).T
)
time = sensibility.x[0]
duration = sensibility.x[-1] - sensibility.x[0]
logger.debug(
_(
"successfully loaded excited data files: "
+ "\n {bench}"
+ "\n {mirror}"
+ "\n {sensibility}"
).format(
bench=files["exc_bench"],
mirror=files["exc_mirror"],
sensibility=files["exc_sensibility"],
)
)
try:
time = metadata_excited["time"]
duration = metadata_excited["duration"]
if sensibility.x[0] != time:
logger.warning(
_(
"time between metadata and data does not "
+ "match: {data} != {metadata}"
).format(
data=sensibility.x[0],
metadata=time,
)
)
if abs(
sensibility.x[-1] - sensibility.x[0] - duration
) > 1e-2:
logger.warning(
_(
"duration between metadata and data does "
+ "not match: {data} != {metadata}"
).format(
data=(
sensibility.x[-1] - sensibility.x[0]
),
metadata=duration,
)
)
except KeyError:
logger.warning(
_(
"malformed metadata, each excited entry should "
+ "have its own time and duration. Defining it "
+ "with data"
)
)
except OSError as e:
logger.critical(
_(
"some files could not be read: {error}".format(
error=e
)
)
)
self.excited = Acquisition(
time=time,
duration=duration,
movements=movements,
sensibility=sensibility,
type=AcquisitionType.EXCITED,
)
logger.debug(_("excited data loaded and checked"))
def channel_name_mapping(bench: str) -> dict[str, str]:
names = {
"sensibility": "h",
"bench": bench,
} }
if bench in ["SDB1", "SDB2"]:
names["mirror"] = "SR"
elif bench in ["SNEB", "SWEB"]:
names["mirror"] = bench[1:3]
elif bench == "SIB2":
names["mirror"] = "MC"
elif bench == "SPRB":
names["mirror"] = "SR" # not good, choice between MC, SR and PR
else:
raise ValueError(_("Unknow bench {bench}".format(bench=bench)))
return names
def load_coupling( def load_coupling(
@ -95,7 +330,7 @@ def load_coupling(
""" """
Load and correct coupling date from modelisation Load and correct coupling date from modelisation
""" """
logger.info( logger.debug(
_("loading coupling for {bench} in {date}").format( _("loading coupling for {bench} in {date}").format(
bench=bench, date=date bench=bench, date=date
) )
@ -103,7 +338,7 @@ def load_coupling(
modelisation_file = folder / file modelisation_file = folder / file
try: try:
modelisation_data = loadmat(modelisation_file) modelisation_data = loadmat(modelisation_file)
logger.info( logger.debug(
_("file successfully loaded:\n{file}").format( _("file successfully loaded:\n{file}").format(
file=modelisation_file file=modelisation_file
) )
@ -160,7 +395,7 @@ def load_coupling(
/ model_powers["laser"] / model_powers["laser"]
) # radiation ) # radiation
elif bench in ["SIB2", "SPRB"]: elif bench in ["SIB2", "SPRB"]:
logger.info( logger.warning(
_( _(
"cannot fix projection power for SIB2 and SPRB coupling" "cannot fix projection power for SIB2 and SPRB coupling"
) )

View file

@ -0,0 +1,70 @@
from backscattering_analyzer import logger
from backscattering_analyzer.experiment import Experiment
def show_projection(
experiment: Experiment,
start_frequency: float | None = None,
end_frequency: float | None = None,
) -> None:
"""
Show projection data with matplotlib
"""
logger.debug(_("showing experiment result"))
from matplotlib.pyplot import (
loglog,
show,
legend,
close,
title,
xlabel,
ylabel,
xlim,
ylim,
grid,
)
reference = experiment.data.reference.sensibility.psd().sqrt()
try:
excited = experiment.data.excited.sensibility.psd().sqrt()
loglog(
experiment.projection_excited.x,
experiment.projection_excited.y,
label=_("projection with excitation"),
) # type: ignore[ReportUnusedCallResult]
loglog(
excited.x,
excited.y,
label=_("detected sensibility with excitation"),
) # type: ignore[ReportUnusedCallResult]
loglog(
(experiment.projection_excited + reference).x,
(experiment.projection_excited + reference).y,
label=_(
"sum: no excitation and projection with excitation"
),
) # type: ignore[ReportUnusedCallResult]
except AttributeError:
logger.debug(
_("no excited signal, will only show reference signal")
)
loglog(
experiment.projection_reference.x,
experiment.projection_reference.y,
label=_("projection without excitation"),
) # type: ignore[ReportUnunsedCallResult]
loglog(
reference.x,
reference.y,
label=_("detected sensibility without excitation"),
) # type: ignore[ReportUnusedCallResult]
legend() # type: ignore[ReportUnusedCallResult]
title(experiment.bench) # type: ignore[ReportUnusedCallResult]
xlabel(_("frequencies (Hz)")) # type: ignore[ReportUnusedCallResult]
ylabel(_("sensibility ($\\frac{1} {\\sqrt {Hz}}$)")) # type: ignore[ReportUnusedCallResult]
xlim(5, 100) # type: ignore[ReportUnusedCallResult]
ylim(10e-28, 10e-18) # type: ignore[ReportUnusedCallResult]
grid(True, "both") # type: ignore[ReportUnusedCallResult]
show()
close()
logger.debug(_("experiment result showed"))

View file

@ -7,10 +7,10 @@ from numpy.core.multiarray import array
from science_signal import interpolate from science_signal import interpolate
from science_signal.signal import Signal from science_signal.signal import Signal
from backscattering_analyzer import logger from backscattering_analyzer import logger
from backscattering_analyzer.acquisition import AcquisitionType
from backscattering_analyzer.data import ( from backscattering_analyzer.data import (
Data,
load_coupling, load_coupling,
load_movements,
load_signals,
) )
from backscattering_analyzer.utils import ( from backscattering_analyzer.utils import (
compute_light, compute_light,
@ -41,7 +41,7 @@ class Experiment:
try: try:
with open(values_file, "rb") as file: with open(values_file, "rb") as file:
values = load(file) values = load(file)
logger.info( logger.debug(
_("successfully loaded values from {file}").format( _("successfully loaded values from {file}").format(
file=values_file file=values_file
) )
@ -71,8 +71,12 @@ class Experiment:
wavelength=self.wavelength wavelength=self.wavelength
) )
) )
try:
self.calibrations: dict[str, float] = { self.calibrations: dict[str, float] = {
"bench": 1.0,
"mirror": 1.0,
}
try:
self.calibrations = {
"bench": values["benches"][bench]["calib"]["bench"], "bench": values["benches"][bench]["calib"]["bench"],
"mirror": values["benches"][bench]["calib"]["mirror"], "mirror": values["benches"][bench]["calib"]["mirror"],
} }
@ -84,10 +88,6 @@ class Experiment:
+ "wrong: {error}" + "wrong: {error}"
).format(file=values_file, error=e) ).format(file=values_file, error=e)
) )
self.calibrations: dict[str, float] = {
"bench": 1.0,
"mirror": 1.0,
}
logger.debug( logger.debug(
_( _(
"value of calibrations (bench, mirror): ({bench}, " "value of calibrations (bench, mirror): ({bench}, "
@ -97,8 +97,12 @@ class Experiment:
mirror=self.calibrations["mirror"], mirror=self.calibrations["mirror"],
) )
) )
try:
self.factors: dict[str, float | None] = { self.factors: dict[str, float | None] = {
"pre": None,
"true": None,
}
try:
self.factors = {
"pre": 1e6 "pre": 1e6
* values["benches"][bench]["scatter_factor"][0], * values["benches"][bench]["scatter_factor"][0],
"true": values["benches"][bench]["scatter_factor"][0], "true": values["benches"][bench]["scatter_factor"][0],
@ -111,10 +115,6 @@ class Experiment:
+ "be searched with a fit" + "be searched with a fit"
).format(file=values_file) ).format(file=values_file)
) )
self.factors: dict[str, float | None] = {
"pre": None,
"true": None,
}
logger.debug( logger.debug(
_( _(
"values of scattering factor (outscale, real): ({pre}, " "values of scattering factor (outscale, real): ({pre}, "
@ -124,8 +124,12 @@ class Experiment:
true=self.factors["true"], true=self.factors["true"],
) )
) )
try:
self.powers: dict[str, float] = { self.powers: dict[str, float] = {
"laser": 23,
"dark_fringe": 8e-3,
}
try:
self.powers = {
"laser": values["dates"][date]["laser"], "laser": values["dates"][date]["laser"],
"dark_fringe": values["dates"][date]["dark_fringe"], "dark_fringe": values["dates"][date]["dark_fringe"],
} }
@ -137,10 +141,6 @@ class Experiment:
+ "wrong: {error}" + "wrong: {error}"
).format(file=values_file, error=e) ).format(file=values_file, error=e)
) )
self.powers: dict[str, float] = {
"laser": 23,
"dark_fringe": 8e-3,
}
logger.debug( logger.debug(
_( _(
"values of powers (laser, dark_fringe): ({laser}, " "values of powers (laser, dark_fringe): ({laser}, "
@ -168,8 +168,9 @@ class Experiment:
file=self.modelisation_file, file=self.modelisation_file,
) )
) )
data_folder_name: str = "/home/demagny/data"
try: try:
data_folder_name: str = values["general"]["data_folder"] data_folder_name = values["general"]["data_folder"]
except KeyError: except KeyError:
logger.error( logger.error(
_( _(
@ -178,7 +179,6 @@ class Experiment:
+ "set, if its doesn't exist the program will crash" + "set, if its doesn't exist the program will crash"
).format(file=values_file) ).format(file=values_file)
) )
data_folder_name = "/home/demagny/data"
logger.debug( logger.debug(
_("data folder containing signal values: {folder}").format( _("data folder containing signal values: {folder}").format(
folder=data_folder_name, folder=data_folder_name,
@ -187,41 +187,33 @@ class Experiment:
data_folder = Path(data_folder_name) data_folder = Path(data_folder_name)
try: self.data = Data(data_folder, bench, date)
self.signals: dict[str, Signal] = load_signals(
data_folder, bench, date
)
logger.debug(
_("excited and reference signal successfully loaded")
)
except OSError:
logger.critical(_("cannot load signals"))
raise Exception(_("cannot load signals"))
try: self.reference_movement: None | Signal = process_movement(
self.movements: dict[str, Signal] = load_movements( self.data.reference.movements.bench,
data_folder, bench, date self.data.reference.movements.mirror,
)
logger.debug(
_(
"movements of the bench and the mirror "
+ "successfully loaded"
)
)
except OSError:
logger.critical(_("cannot load movements"))
raise Exception(_("cannot load movements"))
self.movements["true"] = process_movement(
self.movements["bench"],
self.movements["mirror"],
self.calibrations, self.calibrations,
vibration_frequency, vibration_frequency,
) )
self.excited_movement: None | Signal = None
try:
self.excited_movement = process_movement(
self.data.excited.movements.bench,
self.data.excited.movements.mirror,
self.calibrations,
vibration_frequency,
)
except AttributeError:
pass
logger.debug(_("movement processed")) logger.debug(_("movement processed"))
model_powers: dict[str, float] = {
"laser": 0,
"dark_fringe": 0,
}
correct_power: bool = False
try: try:
correct_power = bool(values["dates"][date]["correct-power"]) correct_power = values["dates"][date]["correct-power"]
if correct_power: if correct_power:
model_powers = { model_powers = {
"laser": values["dates"][date]["coupling"]["laser"], "laser": values["dates"][date]["coupling"]["laser"],
@ -229,11 +221,6 @@ class Experiment:
"dark_fringe" "dark_fringe"
], ],
} }
else:
model_powers = {
"laser": 0,
"dark_fringe": 0,
}
except KeyError as error: except KeyError as error:
print(error) print(error)
logger.warning( logger.warning(
@ -244,11 +231,6 @@ class Experiment:
+ "applied" + "applied"
) )
) )
correct_power = False
model_powers = {
"laser": 0,
"dark_fringe": 0,
}
try: try:
self.coupling = load_coupling( self.coupling = load_coupling(
@ -273,23 +255,77 @@ class Experiment:
) )
self.factors = self.fit_factors() self.factors = self.fit_factors()
self.projection = self.compute_projection() self.projection_reference = self.compute_projection(
AcquisitionType.REFERENCE,
)
try:
self.projection_excited = self.compute_projection(
AcquisitionType.EXCITED,
)
except Exception:
pass
logger.debug(_("end of experiment initialisation")) logger.debug(_("end of experiment initialisation"))
def get_factors( def get_factors(
self, self,
phase: float,
*signals: Signal,
start: float | None = None, start: float | None = None,
end: float | None = None, end: float | None = None,
) -> tuple[Signal, Signal, Signal, Signal, Signal, Signal, float]: type: AcquisitionType = AcquisitionType.EXCITED,
) -> tuple[Signal, ...]:
""" """
Get factor to compute the projection for a given scatter factor Get factor to compute the projection for a given scatter factor
""" """
phase = 4 * pi / self.wavelength
factor_n = (self.movements["true"] * phase).sin().psd().sqrt()
coupling_n = self.coupling[0] coupling_n = self.coupling[0]
factor_d = (self.movements["true"] * phase).cos().psd().sqrt()
coupling_d = self.coupling[1] coupling_d = self.coupling[1]
if type is AcquisitionType.EXCITED:
if self.excited_movement is not None:
factor_n = (
(self.excited_movement * phase).sin().psd().sqrt()
)
factor_d = (
(self.excited_movement * phase).cos().psd().sqrt()
)
else:
logger.critical(
_(
"no excited signal given, cannot get factors "
+ "of the excited signal"
)
)
raise Exception(
_(
"no excited signal given, cannot get factors "
+ "of the excited signal"
)
)
elif type is AcquisitionType.REFERENCE:
if self.reference_movement is not None:
factor_n = (
(self.reference_movement * phase).sin().psd().sqrt()
)
factor_d = (
(self.reference_movement * phase).cos().psd().sqrt()
)
else:
logger.critical(
_(
"no reference signal given, cannot get factors "
+ "of the reference signal"
)
)
raise Exception(
_(
"no reference signal given, cannot get factors "
+ "of the reference signal"
)
)
else:
logger.critical(_("unknown acquisition type"))
raise Exception(_("unknown acquisition type"))
if start is None: if start is None:
start = coupling_n.x[0] start = coupling_n.x[0]
if end is None: if end is None:
@ -297,30 +333,12 @@ class Experiment:
coupling_n = coupling_n.cut(start, end) coupling_n = coupling_n.cut(start, end)
( return interpolate(
factor_n, factor_n,
coupling_n, coupling_n,
factor_d, factor_d,
coupling_d, coupling_d,
excited, *signals,
reference,
) = interpolate(
factor_n,
coupling_n,
factor_d,
coupling_d,
self.signals["excited"].psd().sqrt(),
self.signals["reference"].psd().sqrt(),
)
return (
factor_n,
coupling_n,
factor_d,
coupling_d,
excited,
reference,
phase,
) )
def fit_factors( def fit_factors(
@ -330,11 +348,12 @@ class Experiment:
start_frequency: float = 15, start_frequency: float = 15,
end_frequency: float = 100, end_frequency: float = 100,
precision: int = 3, precision: int = 3,
) -> dict[str, float]: ) -> dict[str, None | float]:
""" """
Find the best factor (first order only) to get the projection Find the best factor (first order only) to get the projection
that best match the excited signal that best match the excited signal
""" """
phase = 4 * pi / self.wavelength
( (
factor_n, factor_n,
coupling_n, coupling_n,
@ -342,8 +361,14 @@ class Experiment:
coupling_d, coupling_d,
excited, excited,
reference, reference,
) = self.get_factors(
phase, phase,
) = self.get_factors(start=start_frequency, end=end_frequency) self.data.excited.sensibility.psd().sqrt(),
self.data.reference.sensibility.psd().sqrt(),
start=start_frequency,
end=end_frequency,
type=AcquisitionType.EXCITED,
)
for index in range(precision): for index in range(precision):
logger.debug(_("search for a local minimum")) logger.debug(_("search for a local minimum"))
@ -375,17 +400,17 @@ class Experiment:
if argmin(y) == 0: if argmin(y) == 0:
logger.warning(_("smaller than current range allows")) logger.warning(_("smaller than current range allows"))
scatter_max: float = x[1] scatter_max = x[1]
elif argmin(y) == len(x) - 1: elif argmin(y) == len(x) - 1:
logger.warning(_("bigger than current range allows")) logger.warning(_("bigger than current range allows"))
scatter_min: float = x[-2] scatter_min = x[-2]
else: else:
scatter_min: float = x[argmin(y) - 1] scatter_min = x[argmin(y) - 1]
scatter_max: float = x[argmin(y) + 1] scatter_max = x[argmin(y) + 1]
logger.debug(_("local minimum found")) logger.debug(_("local minimum found"))
pre_scatter_factor: float = scatter_min / 2 + scatter_max / 2 pre_scatter_factor = scatter_min / 2 + scatter_max / 2
logger.info( logger.info(
_("found a scattering factor of {factor}").format( _("found a scattering factor of {factor}").format(
@ -398,19 +423,19 @@ class Experiment:
"true": pre_scatter_factor * 1e-6, "true": pre_scatter_factor * 1e-6,
} }
def compute_projection(self) -> Signal: def compute_projection(
self, type: AcquisitionType = AcquisitionType.EXCITED
) -> Signal:
""" """
Compute the projection of the noise from current values Compute the projection of the noise from current values
""" """
phase = 4 * pi / self.wavelength
( (
factor_n, factor_n,
coupling_n, coupling_n,
factor_d, factor_d,
coupling_d, coupling_d,
_, ) = self.get_factors(phase, type=type)
_,
phase,
) = self.get_factors()
return Signal( return Signal(
factor_n.x, factor_n.x,

View file

@ -0,0 +1,16 @@
from science_signal.signal import Signal
from dataclasses import dataclass
@dataclass
class Movements:
"""
Container for the movement of the bench and the mirror
"""
bench: Signal
mirror: Signal
def __init__(self, bench: Signal, mirror: Signal):
self.bench = bench * 1e-6
self.mirror = mirror * 1e-6

View file

@ -1,4 +1,4 @@
from science_signal.signal import Signal from science_signal.signal import Signal # type: ignore[reportMissingTypeStubs]
from backscattering_analyzer import logger from backscattering_analyzer import logger
from numpy.typing import NDArray from numpy.typing import NDArray
@ -14,7 +14,7 @@ def process_movement(
""" """
Clean and compute relative movement between the bench and the mirror Clean and compute relative movement between the bench and the mirror
""" """
logger.info(_("computing relative movement")) logger.debug(_("computing relative movement"))
return ( return (
( (
@ -22,13 +22,13 @@ def process_movement(
- mirror * calibrations["mirror"] - mirror * calibrations["mirror"]
) )
.detrend("constant") .detrend("constant")
.filter(end=5 * vibration_frequency) .filter(end=1)
.detrend("constant") .detrend("constant")
) )
def compute_light( def compute_light(
pre_scatter_factor: float, pre_scatter_factor: float | None,
factor_n: Signal, factor_n: Signal,
coupling_n: Signal, coupling_n: Signal,
factor_d: Signal, factor_d: Signal,
@ -38,8 +38,12 @@ def compute_light(
""" """
Optimized computing of light with pre-computed factor Optimized computing of light with pre-computed factor
""" """
if pre_scatter_factor is None:
raise Exception(_(
"Cannot compute projection without backscattering factor"
))
return ( return (
sqrt(pre_scatter_factor) sqrt(pre_scatter_factor)
/ phase / phase
* (coupling_n * factor_n + coupling_d * factor_d).y * (coupling_n * factor_n + coupling_d * factor_d).y
) ) # type: ignore[reportAny]