Compare commits

...

10 commits

8 changed files with 552 additions and 240 deletions

View file

@ -35,6 +35,7 @@ packages = ["src/backscattering_analyzer"]
[tool.ruff]
line-length = 72
builtins = ["_"]
[tool.ruff.format]
quote-style = "double"

View file

@ -5,64 +5,3 @@ from gettext import install
install(__name__)
logger = getLogger(__name__)
def show_projection(
experiment: "Experiment",
start_frequency: float | None = None,
end_frequency: float | None = None,
) -> None:
"""
Show projection data with matplotlib
"""
logger.debug(_("showing experiment result"))
from matplotlib.pyplot import (
loglog,
show,
legend,
close,
vlines,
title,
xlabel,
ylabel,
)
excited = experiment.signals["excited"].psd().sqrt()
reference = experiment.signals["reference"].psd().sqrt()
loglog(
experiment.projection.x,
experiment.projection.y,
label="projection",
) # type: ignore[ReportUnusedCallResult]
loglog(excited.x, excited.y, label=_("excited bench")) # type: ignore[ReportUnusedCallResult]
loglog(reference.x, reference.y, label=_("reference bench")) # type: ignore[ReportUnusedCallResult]
loglog(
(experiment.projection + reference).x,
(experiment.projection + reference).y,
label=_("sum reference + excited"),
) # type: ignore[ReportUnusedCallResult]
if start_frequency is not None:
vlines(
[
start_frequency,
],
min(experiment.projection.y),
max(experiment.projection.y),
color="k",
) # type: ignore[ReportUnusedCallResult]
if end_frequency is not None:
vlines(
[
end_frequency,
],
min(experiment.projection.y),
max(experiment.projection.y),
color="k",
) # type: ignore[ReportUnusedCallResult]
legend() # type: ignore[ReportUnusedCallResult]
title(experiment.bench) # type: ignore[ReportUnusedCallResult]
xlabel(_("frequency (Hz)")) # type: ignore[ReportUnusedCallResult]
ylabel(_("stray ($\\frac{1} {\\sqrt {Hz}}$)")) # type: ignore[ReportUnusedCallResult]
show()
close()
logger.debug(_("experiment result showed"))

View file

@ -0,0 +1,22 @@
from enum import Enum
from dataclasses import dataclass
from science_signal.signal import Signal
from backscattering_analyzer.movements import Movements
class AcquisitionType(Enum):
REFERENCE = 0
EXCITED = 1
@dataclass
class Acquisition:
"""
A data acquisition with a specific time and duration
"""
movements: Movements
sensibility: Signal
type: AcquisitionType
time: float
duration: float

View file

@ -1,87 +1,322 @@
from pathlib import Path
from numpy.lib.npyio import loadtxt
from science_signal.signal import Signal
from scipy.io.matlab import loadmat
from tomllib import load
from numpy import loadtxt
from science_signal import to_signal # type: ignore[reportMissingTypeStubs]
from science_signal.signal import Signal # type: ignore[reportMissingTypeStubs]
from scipy.io.matlab import loadmat # type: ignore[reportMissingTypeStubs]
from backscattering_analyzer import logger
from backscattering_analyzer.acquisition import (
Acquisition,
AcquisitionType,
)
from backscattering_analyzer.movements import Movements
def load_signals(
folder: Path, bench: str, date: str
) -> dict[str, Signal]:
"""
Load excited and reference signal of an experiment
"""
logger.info(
_("loading signals for {bench} in {date}").format(
bench=bench, date=date
)
)
excited_file = folder / "{bench}_{date}_dat.csv".format(
bench=bench,
date=date,
)
reference_file = folder / "{bench}_{date}_ref.csv".format(
bench=bench,
date=date,
)
try:
excited_data = loadtxt(excited_file).T
reference_data = loadtxt(reference_file).T
logger.info(
_(
"files successfully loaded:\n{excited}\n{reference}"
).format(excited=excited_file, reference=reference_file)
)
except OSError as e:
logger.critical(
_("some file cannot be read: {error}").format(error=e)
)
raise e
return {
"excited": Signal(excited_data[0], excited_data[1]),
"reference": Signal(reference_data[0], reference_data[1]),
}
def load_movements(
folder: Path, bench: str, date: str
) -> dict[str, Signal]:
"""
Load excited movement of the bench and the nearest mirror of an
experiment
"""
logger.info(
_("loading movements for {bench} in {date}").format(
bench=bench, date=date
)
)
bench_file = folder / "{bench}_{date}_ben.csv".format(
bench=bench,
date=date,
)
mirror_file = folder / "{bench}_{date}_mir.csv".format(
bench=bench,
date=date,
)
try:
bench_movement = loadtxt(bench_file).T
mirror_movement = loadtxt(mirror_file).T
bench_movement[1] = bench_movement[1] * 1e-6 # um -> m
mirror_movement[1] = mirror_movement[1] * 1e-6 # um -> m
logger.info(
_("files successfully loaded:\n{bench}\n{mirror}").format(
bench=bench_file, mirror=mirror_file
class Data:
def __init__(self, folder: Path, bench: str, date: str):
logger.debug(
_("loading data for {bench} in {date}").format(
bench=bench, date=date
)
)
except OSError as e:
logger.critical(
_("some file cannot be read: {error}").format(error=e)
folders: dict[str, Path] = {} # list of folder
files: dict[str, Path] = {} # list of file
folders["experiment"] = folder / "experiment_{date}".format(
date=date,
)
raise e
return {
"bench": Signal(bench_movement[0], bench_movement[1]),
"mirror": Signal(mirror_movement[0], mirror_movement[1]),
files["metadata"] = folders["experiment"] / "metadata.toml"
try:
with open(files["metadata"], "rb") as file:
metadata = load(file)
logger.debug(
_("successfully load {metadata}").format(
metadata=files["metadata"],
)
)
except OSError as error:
logger.critical(
_("cannot load {metadata}: {error}").format(
metadata=files["metadata"],
error=error,
)
)
raise error
try:
available_benches: list[str] = metadata["general"][
"benches"
]
except KeyError:
logger.critical(
_("malformed metadata, no benches provided")
)
raise Exception(
_("malformed metadata, no benches provided")
)
if bench not in available_benches:
logger.critical(
_("no data on {bench} in {date}").format(
bench=bench,
date=date,
)
)
raise Exception(
_("no data on {bench} in {date}").format(
bench=bench,
date=date,
)
)
description: str | None = None
try:
description = metadata["general"]["description"]
except KeyError:
logger.warning(
_("malformed metadata, no description provided")
)
logger.info(
"data description: {description}".format(
description=description
)
)
metadata_reference: dict[str, float]
metadata_excited: dict[str, float]
excited = False
try:
metadata_reference = metadata["benches"][bench]["reference"]
folders["reference"] = folders[
"experiment"
] / "{bench}/reference".format(
bench=bench,
)
logger.debug(
_("there are multiple reference for each benches")
)
except KeyError:
try:
metadata_reference = metadata["reference"]
folders["reference"] = (
folders["experiment"] / "reference"
)
logger.debug(
_("there is one reference for every benches")
)
except KeyError:
logger.critical(
_("malformed metadata, no reference provided")
)
raise Exception(
_("malformed metadata, no reference provided")
)
try:
metadata_excited = metadata["benches"][bench]["excited"]
folders["excited"] = folders[
"experiment"
] / "{bench}/excited".format(
bench=bench,
)
excited = True
logger.debug(_("excited data have been taken"))
except KeyError:
logger.debug(_("no excited data have been taken"))
logger.debug(_("metadata fully read, starting to load data"))
channel_names = channel_name_mapping(bench)
files["ref_bench"] = folders[
"reference"
] / "{channel}.csv".format(channel=channel_names["bench"])
files["ref_mirror"] = folders[
"reference"
] / "{channel}.csv".format(channel=channel_names["mirror"])
files["ref_sensibility"] = folders[
"reference"
] / "{channel}.csv".format(channel=channel_names["sensibility"])
time: float
duration: float
try:
movements = Movements(
bench=to_signal(loadtxt(files["ref_bench"]).T),
mirror=to_signal(loadtxt(files["ref_mirror"]).T),
)
sensibility = to_signal(loadtxt(files["ref_sensibility"]).T)
time = sensibility.x[0]
duration = sensibility.x[-1] - sensibility.x[0]
logger.debug(
_(
"successfully loaded reference data files: "
+ "\n {bench}"
+ "\n {mirror}"
+ "\n {sensibility}"
).format(
bench=files["ref_bench"],
mirror=files["ref_mirror"],
sensibility=files["ref_sensibility"],
)
)
try:
time = metadata_reference["time"]
duration = metadata_reference["duration"]
if sensibility.x[0] != time:
logger.warning(
_(
"time between metadata and data does not match: "
+ "{data} != {metadata}"
).format(
data=sensibility.x[0],
metadata=time,
)
)
if abs(
sensibility.x[-1] - sensibility.x[0] - duration
) > 1e-2:
logger.warning(
_(
"duration between metadata and data does not "
+ "match: {data} != {metadata}"
).format(
data=(sensibility.x[-1] - sensibility.x[0]),
metadata=duration,
)
)
except KeyError:
logger.warning(
_(
"malformed metadata, each reference entry should have "
+ "its own time and duration. Defining it with data"
)
)
except OSError as e:
logger.critical(
_(
"some files could not be read: {error}".format(
error=e
)
)
)
raise e
self.reference = Acquisition(
time=time,
duration=duration,
movements=movements,
sensibility=sensibility,
type=AcquisitionType.REFERENCE,
)
logger.debug(_("reference data loaded and checked"))
if excited:
files["exc_bench"] = folders[
"excited"
] / "{channel}.csv".format(channel=channel_names["bench"])
files["exc_mirror"] = folders[
"excited"
] / "{channel}.csv".format(channel=channel_names["mirror"])
files["exc_sensibility"] = folders[
"excited"
] / "{channel}.csv".format(
channel=channel_names["sensibility"]
)
try:
movements = Movements(
bench=to_signal(loadtxt(files["exc_bench"]).T),
mirror=to_signal(loadtxt(files["exc_mirror"]).T),
)
sensibility = to_signal(
loadtxt(files["exc_sensibility"]).T
)
time = sensibility.x[0]
duration = sensibility.x[-1] - sensibility.x[0]
logger.debug(
_(
"successfully loaded excited data files: "
+ "\n {bench}"
+ "\n {mirror}"
+ "\n {sensibility}"
).format(
bench=files["exc_bench"],
mirror=files["exc_mirror"],
sensibility=files["exc_sensibility"],
)
)
try:
time = metadata_excited["time"]
duration = metadata_excited["duration"]
if sensibility.x[0] != time:
logger.warning(
_(
"time between metadata and data does not "
+ "match: {data} != {metadata}"
).format(
data=sensibility.x[0],
metadata=time,
)
)
if abs(
sensibility.x[-1] - sensibility.x[0] - duration
) > 1e-2:
logger.warning(
_(
"duration between metadata and data does "
+ "not match: {data} != {metadata}"
).format(
data=(
sensibility.x[-1] - sensibility.x[0]
),
metadata=duration,
)
)
except KeyError:
logger.warning(
_(
"malformed metadata, each excited entry should "
+ "have its own time and duration. Defining it "
+ "with data"
)
)
except OSError as e:
logger.critical(
_(
"some files could not be read: {error}".format(
error=e
)
)
)
self.excited = Acquisition(
time=time,
duration=duration,
movements=movements,
sensibility=sensibility,
type=AcquisitionType.EXCITED,
)
logger.debug(_("excited data loaded and checked"))
def channel_name_mapping(bench: str) -> dict[str, str]:
names = {
"sensibility": "h",
"bench": bench,
}
if bench in ["SDB1", "SDB2"]:
names["mirror"] = "SR"
elif bench in ["SNEB", "SWEB"]:
names["mirror"] = bench[1:3]
elif bench == "SIB2":
names["mirror"] = "MC"
elif bench == "SPRB":
names["mirror"] = "SR" # not good, choice between MC, SR and PR
else:
raise ValueError(_("Unknow bench {bench}".format(bench=bench)))
return names
def load_coupling(
@ -95,7 +330,7 @@ def load_coupling(
"""
Load and correct coupling date from modelisation
"""
logger.info(
logger.debug(
_("loading coupling for {bench} in {date}").format(
bench=bench, date=date
)
@ -103,7 +338,7 @@ def load_coupling(
modelisation_file = folder / file
try:
modelisation_data = loadmat(modelisation_file)
logger.info(
logger.debug(
_("file successfully loaded:\n{file}").format(
file=modelisation_file
)
@ -160,7 +395,7 @@ def load_coupling(
/ model_powers["laser"]
) # radiation
elif bench in ["SIB2", "SPRB"]:
logger.info(
logger.warning(
_(
"cannot fix projection power for SIB2 and SPRB coupling"
)

View file

@ -0,0 +1,70 @@
from backscattering_analyzer import logger
from backscattering_analyzer.experiment import Experiment
def show_projection(
experiment: Experiment,
start_frequency: float | None = None,
end_frequency: float | None = None,
) -> None:
"""
Show projection data with matplotlib
"""
logger.debug(_("showing experiment result"))
from matplotlib.pyplot import (
loglog,
show,
legend,
close,
title,
xlabel,
ylabel,
xlim,
ylim,
grid,
)
reference = experiment.data.reference.sensibility.psd().sqrt()
try:
excited = experiment.data.excited.sensibility.psd().sqrt()
loglog(
experiment.projection_excited.x,
experiment.projection_excited.y,
label=_("projection with excitation"),
) # type: ignore[ReportUnusedCallResult]
loglog(
excited.x,
excited.y,
label=_("detected sensibility with excitation"),
) # type: ignore[ReportUnusedCallResult]
loglog(
(experiment.projection_excited + reference).x,
(experiment.projection_excited + reference).y,
label=_(
"sum: no excitation and projection with excitation"
),
) # type: ignore[ReportUnusedCallResult]
except AttributeError:
logger.debug(
_("no excited signal, will only show reference signal")
)
loglog(
experiment.projection_reference.x,
experiment.projection_reference.y,
label=_("projection without excitation"),
) # type: ignore[ReportUnunsedCallResult]
loglog(
reference.x,
reference.y,
label=_("detected sensibility without excitation"),
) # type: ignore[ReportUnusedCallResult]
legend() # type: ignore[ReportUnusedCallResult]
title(experiment.bench) # type: ignore[ReportUnusedCallResult]
xlabel(_("frequencies (Hz)")) # type: ignore[ReportUnusedCallResult]
ylabel(_("sensibility ($\\frac{1} {\\sqrt {Hz}}$)")) # type: ignore[ReportUnusedCallResult]
xlim(5, 100) # type: ignore[ReportUnusedCallResult]
ylim(10e-28, 10e-18) # type: ignore[ReportUnusedCallResult]
grid(True, "both") # type: ignore[ReportUnusedCallResult]
show()
close()
logger.debug(_("experiment result showed"))

View file

@ -7,10 +7,10 @@ from numpy.core.multiarray import array
from science_signal import interpolate
from science_signal.signal import Signal
from backscattering_analyzer import logger
from backscattering_analyzer.acquisition import AcquisitionType
from backscattering_analyzer.data import (
Data,
load_coupling,
load_movements,
load_signals,
)
from backscattering_analyzer.utils import (
compute_light,
@ -41,7 +41,7 @@ class Experiment:
try:
with open(values_file, "rb") as file:
values = load(file)
logger.info(
logger.debug(
_("successfully loaded values from {file}").format(
file=values_file
)
@ -71,8 +71,12 @@ class Experiment:
wavelength=self.wavelength
)
)
self.calibrations: dict[str, float] = {
"bench": 1.0,
"mirror": 1.0,
}
try:
self.calibrations: dict[str, float] = {
self.calibrations = {
"bench": values["benches"][bench]["calib"]["bench"],
"mirror": values["benches"][bench]["calib"]["mirror"],
}
@ -84,10 +88,6 @@ class Experiment:
+ "wrong: {error}"
).format(file=values_file, error=e)
)
self.calibrations: dict[str, float] = {
"bench": 1.0,
"mirror": 1.0,
}
logger.debug(
_(
"value of calibrations (bench, mirror): ({bench}, "
@ -97,8 +97,12 @@ class Experiment:
mirror=self.calibrations["mirror"],
)
)
self.factors: dict[str, float | None] = {
"pre": None,
"true": None,
}
try:
self.factors: dict[str, float | None] = {
self.factors = {
"pre": 1e6
* values["benches"][bench]["scatter_factor"][0],
"true": values["benches"][bench]["scatter_factor"][0],
@ -111,10 +115,6 @@ class Experiment:
+ "be searched with a fit"
).format(file=values_file)
)
self.factors: dict[str, float | None] = {
"pre": None,
"true": None,
}
logger.debug(
_(
"values of scattering factor (outscale, real): ({pre}, "
@ -124,8 +124,12 @@ class Experiment:
true=self.factors["true"],
)
)
self.powers: dict[str, float] = {
"laser": 23,
"dark_fringe": 8e-3,
}
try:
self.powers: dict[str, float] = {
self.powers = {
"laser": values["dates"][date]["laser"],
"dark_fringe": values["dates"][date]["dark_fringe"],
}
@ -137,10 +141,6 @@ class Experiment:
+ "wrong: {error}"
).format(file=values_file, error=e)
)
self.powers: dict[str, float] = {
"laser": 23,
"dark_fringe": 8e-3,
}
logger.debug(
_(
"values of powers (laser, dark_fringe): ({laser}, "
@ -168,8 +168,9 @@ class Experiment:
file=self.modelisation_file,
)
)
data_folder_name: str = "/home/demagny/data"
try:
data_folder_name: str = values["general"]["data_folder"]
data_folder_name = values["general"]["data_folder"]
except KeyError:
logger.error(
_(
@ -178,7 +179,6 @@ class Experiment:
+ "set, if its doesn't exist the program will crash"
).format(file=values_file)
)
data_folder_name = "/home/demagny/data"
logger.debug(
_("data folder containing signal values: {folder}").format(
folder=data_folder_name,
@ -187,41 +187,33 @@ class Experiment:
data_folder = Path(data_folder_name)
try:
self.signals: dict[str, Signal] = load_signals(
data_folder, bench, date
)
logger.debug(
_("excited and reference signal successfully loaded")
)
except OSError:
logger.critical(_("cannot load signals"))
raise Exception(_("cannot load signals"))
self.data = Data(data_folder, bench, date)
try:
self.movements: dict[str, Signal] = load_movements(
data_folder, bench, date
)
logger.debug(
_(
"movements of the bench and the mirror "
+ "successfully loaded"
)
)
except OSError:
logger.critical(_("cannot load movements"))
raise Exception(_("cannot load movements"))
self.movements["true"] = process_movement(
self.movements["bench"],
self.movements["mirror"],
self.reference_movement: None | Signal = process_movement(
self.data.reference.movements.bench,
self.data.reference.movements.mirror,
self.calibrations,
vibration_frequency,
)
self.excited_movement: None | Signal = None
try:
self.excited_movement = process_movement(
self.data.excited.movements.bench,
self.data.excited.movements.mirror,
self.calibrations,
vibration_frequency,
)
except AttributeError:
pass
logger.debug(_("movement processed"))
model_powers: dict[str, float] = {
"laser": 0,
"dark_fringe": 0,
}
correct_power: bool = False
try:
correct_power = bool(values["dates"][date]["correct-power"])
correct_power = values["dates"][date]["correct-power"]
if correct_power:
model_powers = {
"laser": values["dates"][date]["coupling"]["laser"],
@ -229,11 +221,6 @@ class Experiment:
"dark_fringe"
],
}
else:
model_powers = {
"laser": 0,
"dark_fringe": 0,
}
except KeyError as error:
print(error)
logger.warning(
@ -244,11 +231,6 @@ class Experiment:
+ "applied"
)
)
correct_power = False
model_powers = {
"laser": 0,
"dark_fringe": 0,
}
try:
self.coupling = load_coupling(
@ -273,23 +255,77 @@ class Experiment:
)
self.factors = self.fit_factors()
self.projection = self.compute_projection()
self.projection_reference = self.compute_projection(
AcquisitionType.REFERENCE,
)
try:
self.projection_excited = self.compute_projection(
AcquisitionType.EXCITED,
)
except Exception:
pass
logger.debug(_("end of experiment initialisation"))
def get_factors(
self,
phase: float,
*signals: Signal,
start: float | None = None,
end: float | None = None,
) -> tuple[Signal, Signal, Signal, Signal, Signal, Signal, float]:
type: AcquisitionType = AcquisitionType.EXCITED,
) -> tuple[Signal, ...]:
"""
Get factor to compute the projection for a given scatter factor
"""
phase = 4 * pi / self.wavelength
factor_n = (self.movements["true"] * phase).sin().psd().sqrt()
coupling_n = self.coupling[0]
factor_d = (self.movements["true"] * phase).cos().psd().sqrt()
coupling_d = self.coupling[1]
if type is AcquisitionType.EXCITED:
if self.excited_movement is not None:
factor_n = (
(self.excited_movement * phase).sin().psd().sqrt()
)
factor_d = (
(self.excited_movement * phase).cos().psd().sqrt()
)
else:
logger.critical(
_(
"no excited signal given, cannot get factors "
+ "of the excited signal"
)
)
raise Exception(
_(
"no excited signal given, cannot get factors "
+ "of the excited signal"
)
)
elif type is AcquisitionType.REFERENCE:
if self.reference_movement is not None:
factor_n = (
(self.reference_movement * phase).sin().psd().sqrt()
)
factor_d = (
(self.reference_movement * phase).cos().psd().sqrt()
)
else:
logger.critical(
_(
"no reference signal given, cannot get factors "
+ "of the reference signal"
)
)
raise Exception(
_(
"no reference signal given, cannot get factors "
+ "of the reference signal"
)
)
else:
logger.critical(_("unknown acquisition type"))
raise Exception(_("unknown acquisition type"))
if start is None:
start = coupling_n.x[0]
if end is None:
@ -297,30 +333,12 @@ class Experiment:
coupling_n = coupling_n.cut(start, end)
(
return interpolate(
factor_n,
coupling_n,
factor_d,
coupling_d,
excited,
reference,
) = interpolate(
factor_n,
coupling_n,
factor_d,
coupling_d,
self.signals["excited"].psd().sqrt(),
self.signals["reference"].psd().sqrt(),
)
return (
factor_n,
coupling_n,
factor_d,
coupling_d,
excited,
reference,
phase,
*signals,
)
def fit_factors(
@ -330,11 +348,12 @@ class Experiment:
start_frequency: float = 15,
end_frequency: float = 100,
precision: int = 3,
) -> dict[str, float]:
) -> dict[str, None | float]:
"""
Find the best factor (first order only) to get the projection
that best match the excited signal
"""
phase = 4 * pi / self.wavelength
(
factor_n,
coupling_n,
@ -342,8 +361,14 @@ class Experiment:
coupling_d,
excited,
reference,
) = self.get_factors(
phase,
) = self.get_factors(start=start_frequency, end=end_frequency)
self.data.excited.sensibility.psd().sqrt(),
self.data.reference.sensibility.psd().sqrt(),
start=start_frequency,
end=end_frequency,
type=AcquisitionType.EXCITED,
)
for index in range(precision):
logger.debug(_("search for a local minimum"))
@ -375,17 +400,17 @@ class Experiment:
if argmin(y) == 0:
logger.warning(_("smaller than current range allows"))
scatter_max: float = x[1]
scatter_max = x[1]
elif argmin(y) == len(x) - 1:
logger.warning(_("bigger than current range allows"))
scatter_min: float = x[-2]
scatter_min = x[-2]
else:
scatter_min: float = x[argmin(y) - 1]
scatter_max: float = x[argmin(y) + 1]
scatter_min = x[argmin(y) - 1]
scatter_max = x[argmin(y) + 1]
logger.debug(_("local minimum found"))
pre_scatter_factor: float = scatter_min / 2 + scatter_max / 2
pre_scatter_factor = scatter_min / 2 + scatter_max / 2
logger.info(
_("found a scattering factor of {factor}").format(
@ -398,19 +423,19 @@ class Experiment:
"true": pre_scatter_factor * 1e-6,
}
def compute_projection(self) -> Signal:
def compute_projection(
self, type: AcquisitionType = AcquisitionType.EXCITED
) -> Signal:
"""
Compute the projection of the noise from current values
"""
phase = 4 * pi / self.wavelength
(
factor_n,
coupling_n,
factor_d,
coupling_d,
_,
_,
phase,
) = self.get_factors()
) = self.get_factors(phase, type=type)
return Signal(
factor_n.x,

View file

@ -0,0 +1,16 @@
from science_signal.signal import Signal
from dataclasses import dataclass
@dataclass
class Movements:
"""
Container for the movement of the bench and the mirror
"""
bench: Signal
mirror: Signal
def __init__(self, bench: Signal, mirror: Signal):
self.bench = bench * 1e-6
self.mirror = mirror * 1e-6

View file

@ -1,4 +1,4 @@
from science_signal.signal import Signal
from science_signal.signal import Signal # type: ignore[reportMissingTypeStubs]
from backscattering_analyzer import logger
from numpy.typing import NDArray
@ -14,7 +14,7 @@ def process_movement(
"""
Clean and compute relative movement between the bench and the mirror
"""
logger.info(_("computing relative movement"))
logger.debug(_("computing relative movement"))
return (
(
@ -22,13 +22,13 @@ def process_movement(
- mirror * calibrations["mirror"]
)
.detrend("constant")
.filter(end=5 * vibration_frequency)
.filter(end=1)
.detrend("constant")
)
def compute_light(
pre_scatter_factor: float,
pre_scatter_factor: float | None,
factor_n: Signal,
coupling_n: Signal,
factor_d: Signal,
@ -38,8 +38,12 @@ def compute_light(
"""
Optimized computing of light with pre-computed factor
"""
if pre_scatter_factor is None:
raise Exception(_(
"Cannot compute projection without backscattering factor"
))
return (
sqrt(pre_scatter_factor)
/ phase
* (coupling_n * factor_n + coupling_d * factor_d).y
)
) # type: ignore[reportAny]