Compare commits
10 commits
8299997213
...
0aa6732390
Author | SHA1 | Date | |
---|---|---|---|
0aa6732390 | |||
abf127b6bb | |||
656a36ae54 | |||
d1f5706061 | |||
d49ca3e39f | |||
35ae306712 | |||
3cd6a8fbf5 | |||
73914bbac5 | |||
2283a9a778 | |||
2dea7090e3 |
8 changed files with 552 additions and 240 deletions
|
@ -35,6 +35,7 @@ packages = ["src/backscattering_analyzer"]
|
|||
|
||||
[tool.ruff]
|
||||
line-length = 72
|
||||
builtins = ["_"]
|
||||
|
||||
[tool.ruff.format]
|
||||
quote-style = "double"
|
||||
|
|
|
@ -5,64 +5,3 @@ from gettext import install
|
|||
install(__name__)
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
|
||||
def show_projection(
|
||||
experiment: "Experiment",
|
||||
start_frequency: float | None = None,
|
||||
end_frequency: float | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Show projection data with matplotlib
|
||||
"""
|
||||
logger.debug(_("showing experiment result"))
|
||||
from matplotlib.pyplot import (
|
||||
loglog,
|
||||
show,
|
||||
legend,
|
||||
close,
|
||||
vlines,
|
||||
title,
|
||||
xlabel,
|
||||
ylabel,
|
||||
)
|
||||
|
||||
excited = experiment.signals["excited"].psd().sqrt()
|
||||
reference = experiment.signals["reference"].psd().sqrt()
|
||||
loglog(
|
||||
experiment.projection.x,
|
||||
experiment.projection.y,
|
||||
label="projection",
|
||||
) # type: ignore[ReportUnusedCallResult]
|
||||
loglog(excited.x, excited.y, label=_("excited bench")) # type: ignore[ReportUnusedCallResult]
|
||||
loglog(reference.x, reference.y, label=_("reference bench")) # type: ignore[ReportUnusedCallResult]
|
||||
loglog(
|
||||
(experiment.projection + reference).x,
|
||||
(experiment.projection + reference).y,
|
||||
label=_("sum reference + excited"),
|
||||
) # type: ignore[ReportUnusedCallResult]
|
||||
if start_frequency is not None:
|
||||
vlines(
|
||||
[
|
||||
start_frequency,
|
||||
],
|
||||
min(experiment.projection.y),
|
||||
max(experiment.projection.y),
|
||||
color="k",
|
||||
) # type: ignore[ReportUnusedCallResult]
|
||||
if end_frequency is not None:
|
||||
vlines(
|
||||
[
|
||||
end_frequency,
|
||||
],
|
||||
min(experiment.projection.y),
|
||||
max(experiment.projection.y),
|
||||
color="k",
|
||||
) # type: ignore[ReportUnusedCallResult]
|
||||
legend() # type: ignore[ReportUnusedCallResult]
|
||||
title(experiment.bench) # type: ignore[ReportUnusedCallResult]
|
||||
xlabel(_("frequency (Hz)")) # type: ignore[ReportUnusedCallResult]
|
||||
ylabel(_("stray ($\\frac{1} {\\sqrt {Hz}}$)")) # type: ignore[ReportUnusedCallResult]
|
||||
show()
|
||||
close()
|
||||
logger.debug(_("experiment result showed"))
|
||||
|
|
22
src/backscattering_analyzer/acquisition.py
Normal file
22
src/backscattering_analyzer/acquisition.py
Normal file
|
@ -0,0 +1,22 @@
|
|||
from enum import Enum
|
||||
from dataclasses import dataclass
|
||||
from science_signal.signal import Signal
|
||||
from backscattering_analyzer.movements import Movements
|
||||
|
||||
|
||||
class AcquisitionType(Enum):
|
||||
REFERENCE = 0
|
||||
EXCITED = 1
|
||||
|
||||
|
||||
@dataclass
|
||||
class Acquisition:
|
||||
"""
|
||||
A data acquisition with a specific time and duration
|
||||
"""
|
||||
|
||||
movements: Movements
|
||||
sensibility: Signal
|
||||
type: AcquisitionType
|
||||
time: float
|
||||
duration: float
|
|
@ -1,87 +1,322 @@
|
|||
from pathlib import Path
|
||||
from numpy.lib.npyio import loadtxt
|
||||
from science_signal.signal import Signal
|
||||
from scipy.io.matlab import loadmat
|
||||
from tomllib import load
|
||||
from numpy import loadtxt
|
||||
from science_signal import to_signal # type: ignore[reportMissingTypeStubs]
|
||||
from science_signal.signal import Signal # type: ignore[reportMissingTypeStubs]
|
||||
from scipy.io.matlab import loadmat # type: ignore[reportMissingTypeStubs]
|
||||
from backscattering_analyzer import logger
|
||||
from backscattering_analyzer.acquisition import (
|
||||
Acquisition,
|
||||
AcquisitionType,
|
||||
)
|
||||
from backscattering_analyzer.movements import Movements
|
||||
|
||||
|
||||
def load_signals(
|
||||
folder: Path, bench: str, date: str
|
||||
) -> dict[str, Signal]:
|
||||
"""
|
||||
Load excited and reference signal of an experiment
|
||||
"""
|
||||
logger.info(
|
||||
_("loading signals for {bench} in {date}").format(
|
||||
class Data:
|
||||
def __init__(self, folder: Path, bench: str, date: str):
|
||||
logger.debug(
|
||||
_("loading data for {bench} in {date}").format(
|
||||
bench=bench, date=date
|
||||
)
|
||||
)
|
||||
excited_file = folder / "{bench}_{date}_dat.csv".format(
|
||||
bench=bench,
|
||||
date=date,
|
||||
)
|
||||
reference_file = folder / "{bench}_{date}_ref.csv".format(
|
||||
bench=bench,
|
||||
folders: dict[str, Path] = {} # list of folder
|
||||
files: dict[str, Path] = {} # list of file
|
||||
|
||||
folders["experiment"] = folder / "experiment_{date}".format(
|
||||
date=date,
|
||||
)
|
||||
files["metadata"] = folders["experiment"] / "metadata.toml"
|
||||
|
||||
try:
|
||||
excited_data = loadtxt(excited_file).T
|
||||
reference_data = loadtxt(reference_file).T
|
||||
with open(files["metadata"], "rb") as file:
|
||||
metadata = load(file)
|
||||
logger.debug(
|
||||
_("successfully load {metadata}").format(
|
||||
metadata=files["metadata"],
|
||||
)
|
||||
)
|
||||
except OSError as error:
|
||||
logger.critical(
|
||||
_("cannot load {metadata}: {error}").format(
|
||||
metadata=files["metadata"],
|
||||
error=error,
|
||||
)
|
||||
)
|
||||
raise error
|
||||
|
||||
try:
|
||||
available_benches: list[str] = metadata["general"][
|
||||
"benches"
|
||||
]
|
||||
except KeyError:
|
||||
logger.critical(
|
||||
_("malformed metadata, no benches provided")
|
||||
)
|
||||
raise Exception(
|
||||
_("malformed metadata, no benches provided")
|
||||
)
|
||||
|
||||
if bench not in available_benches:
|
||||
logger.critical(
|
||||
_("no data on {bench} in {date}").format(
|
||||
bench=bench,
|
||||
date=date,
|
||||
)
|
||||
)
|
||||
raise Exception(
|
||||
_("no data on {bench} in {date}").format(
|
||||
bench=bench,
|
||||
date=date,
|
||||
)
|
||||
)
|
||||
|
||||
description: str | None = None
|
||||
try:
|
||||
description = metadata["general"]["description"]
|
||||
except KeyError:
|
||||
logger.warning(
|
||||
_("malformed metadata, no description provided")
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"data description: {description}".format(
|
||||
description=description
|
||||
)
|
||||
)
|
||||
metadata_reference: dict[str, float]
|
||||
metadata_excited: dict[str, float]
|
||||
excited = False
|
||||
try:
|
||||
metadata_reference = metadata["benches"][bench]["reference"]
|
||||
folders["reference"] = folders[
|
||||
"experiment"
|
||||
] / "{bench}/reference".format(
|
||||
bench=bench,
|
||||
)
|
||||
logger.debug(
|
||||
_("there are multiple reference for each benches")
|
||||
)
|
||||
except KeyError:
|
||||
try:
|
||||
metadata_reference = metadata["reference"]
|
||||
folders["reference"] = (
|
||||
folders["experiment"] / "reference"
|
||||
)
|
||||
logger.debug(
|
||||
_("there is one reference for every benches")
|
||||
)
|
||||
except KeyError:
|
||||
logger.critical(
|
||||
_("malformed metadata, no reference provided")
|
||||
)
|
||||
raise Exception(
|
||||
_("malformed metadata, no reference provided")
|
||||
)
|
||||
|
||||
try:
|
||||
metadata_excited = metadata["benches"][bench]["excited"]
|
||||
folders["excited"] = folders[
|
||||
"experiment"
|
||||
] / "{bench}/excited".format(
|
||||
bench=bench,
|
||||
)
|
||||
excited = True
|
||||
logger.debug(_("excited data have been taken"))
|
||||
except KeyError:
|
||||
logger.debug(_("no excited data have been taken"))
|
||||
|
||||
logger.debug(_("metadata fully read, starting to load data"))
|
||||
|
||||
channel_names = channel_name_mapping(bench)
|
||||
|
||||
files["ref_bench"] = folders[
|
||||
"reference"
|
||||
] / "{channel}.csv".format(channel=channel_names["bench"])
|
||||
files["ref_mirror"] = folders[
|
||||
"reference"
|
||||
] / "{channel}.csv".format(channel=channel_names["mirror"])
|
||||
files["ref_sensibility"] = folders[
|
||||
"reference"
|
||||
] / "{channel}.csv".format(channel=channel_names["sensibility"])
|
||||
|
||||
time: float
|
||||
duration: float
|
||||
|
||||
try:
|
||||
movements = Movements(
|
||||
bench=to_signal(loadtxt(files["ref_bench"]).T),
|
||||
mirror=to_signal(loadtxt(files["ref_mirror"]).T),
|
||||
)
|
||||
sensibility = to_signal(loadtxt(files["ref_sensibility"]).T)
|
||||
time = sensibility.x[0]
|
||||
duration = sensibility.x[-1] - sensibility.x[0]
|
||||
logger.debug(
|
||||
_(
|
||||
"files successfully loaded:\n{excited}\n{reference}"
|
||||
).format(excited=excited_file, reference=reference_file)
|
||||
"successfully loaded reference data files: "
|
||||
+ "\n {bench}"
|
||||
+ "\n {mirror}"
|
||||
+ "\n {sensibility}"
|
||||
).format(
|
||||
bench=files["ref_bench"],
|
||||
mirror=files["ref_mirror"],
|
||||
sensibility=files["ref_sensibility"],
|
||||
)
|
||||
except OSError as e:
|
||||
logger.critical(
|
||||
_("some file cannot be read: {error}").format(error=e)
|
||||
)
|
||||
raise e
|
||||
return {
|
||||
"excited": Signal(excited_data[0], excited_data[1]),
|
||||
"reference": Signal(reference_data[0], reference_data[1]),
|
||||
}
|
||||
|
||||
|
||||
def load_movements(
|
||||
folder: Path, bench: str, date: str
|
||||
) -> dict[str, Signal]:
|
||||
"""
|
||||
Load excited movement of the bench and the nearest mirror of an
|
||||
experiment
|
||||
"""
|
||||
logger.info(
|
||||
_("loading movements for {bench} in {date}").format(
|
||||
bench=bench, date=date
|
||||
)
|
||||
)
|
||||
bench_file = folder / "{bench}_{date}_ben.csv".format(
|
||||
bench=bench,
|
||||
date=date,
|
||||
)
|
||||
mirror_file = folder / "{bench}_{date}_mir.csv".format(
|
||||
bench=bench,
|
||||
date=date,
|
||||
)
|
||||
try:
|
||||
bench_movement = loadtxt(bench_file).T
|
||||
mirror_movement = loadtxt(mirror_file).T
|
||||
bench_movement[1] = bench_movement[1] * 1e-6 # um -> m
|
||||
mirror_movement[1] = mirror_movement[1] * 1e-6 # um -> m
|
||||
logger.info(
|
||||
_("files successfully loaded:\n{bench}\n{mirror}").format(
|
||||
bench=bench_file, mirror=mirror_file
|
||||
time = metadata_reference["time"]
|
||||
duration = metadata_reference["duration"]
|
||||
if sensibility.x[0] != time:
|
||||
logger.warning(
|
||||
_(
|
||||
"time between metadata and data does not match: "
|
||||
+ "{data} != {metadata}"
|
||||
).format(
|
||||
data=sensibility.x[0],
|
||||
metadata=time,
|
||||
)
|
||||
)
|
||||
if abs(
|
||||
sensibility.x[-1] - sensibility.x[0] - duration
|
||||
) > 1e-2:
|
||||
logger.warning(
|
||||
_(
|
||||
"duration between metadata and data does not "
|
||||
+ "match: {data} != {metadata}"
|
||||
).format(
|
||||
data=(sensibility.x[-1] - sensibility.x[0]),
|
||||
metadata=duration,
|
||||
)
|
||||
)
|
||||
except KeyError:
|
||||
logger.warning(
|
||||
_(
|
||||
"malformed metadata, each reference entry should have "
|
||||
+ "its own time and duration. Defining it with data"
|
||||
)
|
||||
)
|
||||
except OSError as e:
|
||||
logger.critical(
|
||||
_("some file cannot be read: {error}").format(error=e)
|
||||
_(
|
||||
"some files could not be read: {error}".format(
|
||||
error=e
|
||||
)
|
||||
)
|
||||
)
|
||||
raise e
|
||||
return {
|
||||
"bench": Signal(bench_movement[0], bench_movement[1]),
|
||||
"mirror": Signal(mirror_movement[0], mirror_movement[1]),
|
||||
|
||||
self.reference = Acquisition(
|
||||
time=time,
|
||||
duration=duration,
|
||||
movements=movements,
|
||||
sensibility=sensibility,
|
||||
type=AcquisitionType.REFERENCE,
|
||||
)
|
||||
|
||||
logger.debug(_("reference data loaded and checked"))
|
||||
|
||||
if excited:
|
||||
files["exc_bench"] = folders[
|
||||
"excited"
|
||||
] / "{channel}.csv".format(channel=channel_names["bench"])
|
||||
files["exc_mirror"] = folders[
|
||||
"excited"
|
||||
] / "{channel}.csv".format(channel=channel_names["mirror"])
|
||||
files["exc_sensibility"] = folders[
|
||||
"excited"
|
||||
] / "{channel}.csv".format(
|
||||
channel=channel_names["sensibility"]
|
||||
)
|
||||
try:
|
||||
movements = Movements(
|
||||
bench=to_signal(loadtxt(files["exc_bench"]).T),
|
||||
mirror=to_signal(loadtxt(files["exc_mirror"]).T),
|
||||
)
|
||||
sensibility = to_signal(
|
||||
loadtxt(files["exc_sensibility"]).T
|
||||
)
|
||||
time = sensibility.x[0]
|
||||
duration = sensibility.x[-1] - sensibility.x[0]
|
||||
logger.debug(
|
||||
_(
|
||||
"successfully loaded excited data files: "
|
||||
+ "\n {bench}"
|
||||
+ "\n {mirror}"
|
||||
+ "\n {sensibility}"
|
||||
).format(
|
||||
bench=files["exc_bench"],
|
||||
mirror=files["exc_mirror"],
|
||||
sensibility=files["exc_sensibility"],
|
||||
)
|
||||
)
|
||||
try:
|
||||
time = metadata_excited["time"]
|
||||
duration = metadata_excited["duration"]
|
||||
if sensibility.x[0] != time:
|
||||
logger.warning(
|
||||
_(
|
||||
"time between metadata and data does not "
|
||||
+ "match: {data} != {metadata}"
|
||||
).format(
|
||||
data=sensibility.x[0],
|
||||
metadata=time,
|
||||
)
|
||||
)
|
||||
if abs(
|
||||
sensibility.x[-1] - sensibility.x[0] - duration
|
||||
) > 1e-2:
|
||||
logger.warning(
|
||||
_(
|
||||
"duration between metadata and data does "
|
||||
+ "not match: {data} != {metadata}"
|
||||
).format(
|
||||
data=(
|
||||
sensibility.x[-1] - sensibility.x[0]
|
||||
),
|
||||
metadata=duration,
|
||||
)
|
||||
)
|
||||
except KeyError:
|
||||
logger.warning(
|
||||
_(
|
||||
"malformed metadata, each excited entry should "
|
||||
+ "have its own time and duration. Defining it "
|
||||
+ "with data"
|
||||
)
|
||||
)
|
||||
except OSError as e:
|
||||
logger.critical(
|
||||
_(
|
||||
"some files could not be read: {error}".format(
|
||||
error=e
|
||||
)
|
||||
)
|
||||
)
|
||||
self.excited = Acquisition(
|
||||
time=time,
|
||||
duration=duration,
|
||||
movements=movements,
|
||||
sensibility=sensibility,
|
||||
type=AcquisitionType.EXCITED,
|
||||
)
|
||||
logger.debug(_("excited data loaded and checked"))
|
||||
|
||||
|
||||
def channel_name_mapping(bench: str) -> dict[str, str]:
|
||||
names = {
|
||||
"sensibility": "h",
|
||||
"bench": bench,
|
||||
}
|
||||
if bench in ["SDB1", "SDB2"]:
|
||||
names["mirror"] = "SR"
|
||||
elif bench in ["SNEB", "SWEB"]:
|
||||
names["mirror"] = bench[1:3]
|
||||
elif bench == "SIB2":
|
||||
names["mirror"] = "MC"
|
||||
elif bench == "SPRB":
|
||||
names["mirror"] = "SR" # not good, choice between MC, SR and PR
|
||||
else:
|
||||
raise ValueError(_("Unknow bench {bench}".format(bench=bench)))
|
||||
return names
|
||||
|
||||
|
||||
def load_coupling(
|
||||
|
@ -95,7 +330,7 @@ def load_coupling(
|
|||
"""
|
||||
Load and correct coupling date from modelisation
|
||||
"""
|
||||
logger.info(
|
||||
logger.debug(
|
||||
_("loading coupling for {bench} in {date}").format(
|
||||
bench=bench, date=date
|
||||
)
|
||||
|
@ -103,7 +338,7 @@ def load_coupling(
|
|||
modelisation_file = folder / file
|
||||
try:
|
||||
modelisation_data = loadmat(modelisation_file)
|
||||
logger.info(
|
||||
logger.debug(
|
||||
_("file successfully loaded:\n{file}").format(
|
||||
file=modelisation_file
|
||||
)
|
||||
|
@ -160,7 +395,7 @@ def load_coupling(
|
|||
/ model_powers["laser"]
|
||||
) # radiation
|
||||
elif bench in ["SIB2", "SPRB"]:
|
||||
logger.info(
|
||||
logger.warning(
|
||||
_(
|
||||
"cannot fix projection power for SIB2 and SPRB coupling"
|
||||
)
|
||||
|
|
70
src/backscattering_analyzer/display.py
Normal file
70
src/backscattering_analyzer/display.py
Normal file
|
@ -0,0 +1,70 @@
|
|||
from backscattering_analyzer import logger
|
||||
from backscattering_analyzer.experiment import Experiment
|
||||
|
||||
|
||||
def show_projection(
|
||||
experiment: Experiment,
|
||||
start_frequency: float | None = None,
|
||||
end_frequency: float | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Show projection data with matplotlib
|
||||
"""
|
||||
logger.debug(_("showing experiment result"))
|
||||
from matplotlib.pyplot import (
|
||||
loglog,
|
||||
show,
|
||||
legend,
|
||||
close,
|
||||
title,
|
||||
xlabel,
|
||||
ylabel,
|
||||
xlim,
|
||||
ylim,
|
||||
grid,
|
||||
)
|
||||
|
||||
reference = experiment.data.reference.sensibility.psd().sqrt()
|
||||
try:
|
||||
excited = experiment.data.excited.sensibility.psd().sqrt()
|
||||
loglog(
|
||||
experiment.projection_excited.x,
|
||||
experiment.projection_excited.y,
|
||||
label=_("projection with excitation"),
|
||||
) # type: ignore[ReportUnusedCallResult]
|
||||
loglog(
|
||||
excited.x,
|
||||
excited.y,
|
||||
label=_("detected sensibility with excitation"),
|
||||
) # type: ignore[ReportUnusedCallResult]
|
||||
loglog(
|
||||
(experiment.projection_excited + reference).x,
|
||||
(experiment.projection_excited + reference).y,
|
||||
label=_(
|
||||
"sum: no excitation and projection with excitation"
|
||||
),
|
||||
) # type: ignore[ReportUnusedCallResult]
|
||||
except AttributeError:
|
||||
logger.debug(
|
||||
_("no excited signal, will only show reference signal")
|
||||
)
|
||||
loglog(
|
||||
experiment.projection_reference.x,
|
||||
experiment.projection_reference.y,
|
||||
label=_("projection without excitation"),
|
||||
) # type: ignore[ReportUnunsedCallResult]
|
||||
loglog(
|
||||
reference.x,
|
||||
reference.y,
|
||||
label=_("detected sensibility without excitation"),
|
||||
) # type: ignore[ReportUnusedCallResult]
|
||||
legend() # type: ignore[ReportUnusedCallResult]
|
||||
title(experiment.bench) # type: ignore[ReportUnusedCallResult]
|
||||
xlabel(_("frequencies (Hz)")) # type: ignore[ReportUnusedCallResult]
|
||||
ylabel(_("sensibility ($\\frac{1} {\\sqrt {Hz}}$)")) # type: ignore[ReportUnusedCallResult]
|
||||
xlim(5, 100) # type: ignore[ReportUnusedCallResult]
|
||||
ylim(10e-28, 10e-18) # type: ignore[ReportUnusedCallResult]
|
||||
grid(True, "both") # type: ignore[ReportUnusedCallResult]
|
||||
show()
|
||||
close()
|
||||
logger.debug(_("experiment result showed"))
|
|
@ -7,10 +7,10 @@ from numpy.core.multiarray import array
|
|||
from science_signal import interpolate
|
||||
from science_signal.signal import Signal
|
||||
from backscattering_analyzer import logger
|
||||
from backscattering_analyzer.acquisition import AcquisitionType
|
||||
from backscattering_analyzer.data import (
|
||||
Data,
|
||||
load_coupling,
|
||||
load_movements,
|
||||
load_signals,
|
||||
)
|
||||
from backscattering_analyzer.utils import (
|
||||
compute_light,
|
||||
|
@ -41,7 +41,7 @@ class Experiment:
|
|||
try:
|
||||
with open(values_file, "rb") as file:
|
||||
values = load(file)
|
||||
logger.info(
|
||||
logger.debug(
|
||||
_("successfully loaded values from {file}").format(
|
||||
file=values_file
|
||||
)
|
||||
|
@ -71,8 +71,12 @@ class Experiment:
|
|||
wavelength=self.wavelength
|
||||
)
|
||||
)
|
||||
try:
|
||||
self.calibrations: dict[str, float] = {
|
||||
"bench": 1.0,
|
||||
"mirror": 1.0,
|
||||
}
|
||||
try:
|
||||
self.calibrations = {
|
||||
"bench": values["benches"][bench]["calib"]["bench"],
|
||||
"mirror": values["benches"][bench]["calib"]["mirror"],
|
||||
}
|
||||
|
@ -84,10 +88,6 @@ class Experiment:
|
|||
+ "wrong: {error}"
|
||||
).format(file=values_file, error=e)
|
||||
)
|
||||
self.calibrations: dict[str, float] = {
|
||||
"bench": 1.0,
|
||||
"mirror": 1.0,
|
||||
}
|
||||
logger.debug(
|
||||
_(
|
||||
"value of calibrations (bench, mirror): ({bench}, "
|
||||
|
@ -97,8 +97,12 @@ class Experiment:
|
|||
mirror=self.calibrations["mirror"],
|
||||
)
|
||||
)
|
||||
try:
|
||||
self.factors: dict[str, float | None] = {
|
||||
"pre": None,
|
||||
"true": None,
|
||||
}
|
||||
try:
|
||||
self.factors = {
|
||||
"pre": 1e6
|
||||
* values["benches"][bench]["scatter_factor"][0],
|
||||
"true": values["benches"][bench]["scatter_factor"][0],
|
||||
|
@ -111,10 +115,6 @@ class Experiment:
|
|||
+ "be searched with a fit"
|
||||
).format(file=values_file)
|
||||
)
|
||||
self.factors: dict[str, float | None] = {
|
||||
"pre": None,
|
||||
"true": None,
|
||||
}
|
||||
logger.debug(
|
||||
_(
|
||||
"values of scattering factor (outscale, real): ({pre}, "
|
||||
|
@ -124,8 +124,12 @@ class Experiment:
|
|||
true=self.factors["true"],
|
||||
)
|
||||
)
|
||||
try:
|
||||
self.powers: dict[str, float] = {
|
||||
"laser": 23,
|
||||
"dark_fringe": 8e-3,
|
||||
}
|
||||
try:
|
||||
self.powers = {
|
||||
"laser": values["dates"][date]["laser"],
|
||||
"dark_fringe": values["dates"][date]["dark_fringe"],
|
||||
}
|
||||
|
@ -137,10 +141,6 @@ class Experiment:
|
|||
+ "wrong: {error}"
|
||||
).format(file=values_file, error=e)
|
||||
)
|
||||
self.powers: dict[str, float] = {
|
||||
"laser": 23,
|
||||
"dark_fringe": 8e-3,
|
||||
}
|
||||
logger.debug(
|
||||
_(
|
||||
"values of powers (laser, dark_fringe): ({laser}, "
|
||||
|
@ -168,8 +168,9 @@ class Experiment:
|
|||
file=self.modelisation_file,
|
||||
)
|
||||
)
|
||||
data_folder_name: str = "/home/demagny/data"
|
||||
try:
|
||||
data_folder_name: str = values["general"]["data_folder"]
|
||||
data_folder_name = values["general"]["data_folder"]
|
||||
except KeyError:
|
||||
logger.error(
|
||||
_(
|
||||
|
@ -178,7 +179,6 @@ class Experiment:
|
|||
+ "set, if its doesn't exist the program will crash"
|
||||
).format(file=values_file)
|
||||
)
|
||||
data_folder_name = "/home/demagny/data"
|
||||
logger.debug(
|
||||
_("data folder containing signal values: {folder}").format(
|
||||
folder=data_folder_name,
|
||||
|
@ -187,41 +187,33 @@ class Experiment:
|
|||
|
||||
data_folder = Path(data_folder_name)
|
||||
|
||||
try:
|
||||
self.signals: dict[str, Signal] = load_signals(
|
||||
data_folder, bench, date
|
||||
)
|
||||
logger.debug(
|
||||
_("excited and reference signal successfully loaded")
|
||||
)
|
||||
except OSError:
|
||||
logger.critical(_("cannot load signals"))
|
||||
raise Exception(_("cannot load signals"))
|
||||
self.data = Data(data_folder, bench, date)
|
||||
|
||||
try:
|
||||
self.movements: dict[str, Signal] = load_movements(
|
||||
data_folder, bench, date
|
||||
)
|
||||
logger.debug(
|
||||
_(
|
||||
"movements of the bench and the mirror "
|
||||
+ "successfully loaded"
|
||||
)
|
||||
)
|
||||
except OSError:
|
||||
logger.critical(_("cannot load movements"))
|
||||
raise Exception(_("cannot load movements"))
|
||||
|
||||
self.movements["true"] = process_movement(
|
||||
self.movements["bench"],
|
||||
self.movements["mirror"],
|
||||
self.reference_movement: None | Signal = process_movement(
|
||||
self.data.reference.movements.bench,
|
||||
self.data.reference.movements.mirror,
|
||||
self.calibrations,
|
||||
vibration_frequency,
|
||||
)
|
||||
self.excited_movement: None | Signal = None
|
||||
try:
|
||||
self.excited_movement = process_movement(
|
||||
self.data.excited.movements.bench,
|
||||
self.data.excited.movements.mirror,
|
||||
self.calibrations,
|
||||
vibration_frequency,
|
||||
)
|
||||
except AttributeError:
|
||||
pass
|
||||
logger.debug(_("movement processed"))
|
||||
|
||||
model_powers: dict[str, float] = {
|
||||
"laser": 0,
|
||||
"dark_fringe": 0,
|
||||
}
|
||||
correct_power: bool = False
|
||||
try:
|
||||
correct_power = bool(values["dates"][date]["correct-power"])
|
||||
correct_power = values["dates"][date]["correct-power"]
|
||||
if correct_power:
|
||||
model_powers = {
|
||||
"laser": values["dates"][date]["coupling"]["laser"],
|
||||
|
@ -229,11 +221,6 @@ class Experiment:
|
|||
"dark_fringe"
|
||||
],
|
||||
}
|
||||
else:
|
||||
model_powers = {
|
||||
"laser": 0,
|
||||
"dark_fringe": 0,
|
||||
}
|
||||
except KeyError as error:
|
||||
print(error)
|
||||
logger.warning(
|
||||
|
@ -244,11 +231,6 @@ class Experiment:
|
|||
+ "applied"
|
||||
)
|
||||
)
|
||||
correct_power = False
|
||||
model_powers = {
|
||||
"laser": 0,
|
||||
"dark_fringe": 0,
|
||||
}
|
||||
|
||||
try:
|
||||
self.coupling = load_coupling(
|
||||
|
@ -273,23 +255,77 @@ class Experiment:
|
|||
)
|
||||
self.factors = self.fit_factors()
|
||||
|
||||
self.projection = self.compute_projection()
|
||||
self.projection_reference = self.compute_projection(
|
||||
AcquisitionType.REFERENCE,
|
||||
)
|
||||
try:
|
||||
self.projection_excited = self.compute_projection(
|
||||
AcquisitionType.EXCITED,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
logger.debug(_("end of experiment initialisation"))
|
||||
|
||||
def get_factors(
|
||||
self,
|
||||
phase: float,
|
||||
*signals: Signal,
|
||||
start: float | None = None,
|
||||
end: float | None = None,
|
||||
) -> tuple[Signal, Signal, Signal, Signal, Signal, Signal, float]:
|
||||
type: AcquisitionType = AcquisitionType.EXCITED,
|
||||
) -> tuple[Signal, ...]:
|
||||
"""
|
||||
Get factor to compute the projection for a given scatter factor
|
||||
"""
|
||||
phase = 4 * pi / self.wavelength
|
||||
factor_n = (self.movements["true"] * phase).sin().psd().sqrt()
|
||||
coupling_n = self.coupling[0]
|
||||
factor_d = (self.movements["true"] * phase).cos().psd().sqrt()
|
||||
coupling_d = self.coupling[1]
|
||||
|
||||
if type is AcquisitionType.EXCITED:
|
||||
if self.excited_movement is not None:
|
||||
factor_n = (
|
||||
(self.excited_movement * phase).sin().psd().sqrt()
|
||||
)
|
||||
factor_d = (
|
||||
(self.excited_movement * phase).cos().psd().sqrt()
|
||||
)
|
||||
else:
|
||||
logger.critical(
|
||||
_(
|
||||
"no excited signal given, cannot get factors "
|
||||
+ "of the excited signal"
|
||||
)
|
||||
)
|
||||
raise Exception(
|
||||
_(
|
||||
"no excited signal given, cannot get factors "
|
||||
+ "of the excited signal"
|
||||
)
|
||||
)
|
||||
elif type is AcquisitionType.REFERENCE:
|
||||
if self.reference_movement is not None:
|
||||
factor_n = (
|
||||
(self.reference_movement * phase).sin().psd().sqrt()
|
||||
)
|
||||
factor_d = (
|
||||
(self.reference_movement * phase).cos().psd().sqrt()
|
||||
)
|
||||
else:
|
||||
logger.critical(
|
||||
_(
|
||||
"no reference signal given, cannot get factors "
|
||||
+ "of the reference signal"
|
||||
)
|
||||
)
|
||||
raise Exception(
|
||||
_(
|
||||
"no reference signal given, cannot get factors "
|
||||
+ "of the reference signal"
|
||||
)
|
||||
)
|
||||
else:
|
||||
logger.critical(_("unknown acquisition type"))
|
||||
raise Exception(_("unknown acquisition type"))
|
||||
|
||||
if start is None:
|
||||
start = coupling_n.x[0]
|
||||
if end is None:
|
||||
|
@ -297,30 +333,12 @@ class Experiment:
|
|||
|
||||
coupling_n = coupling_n.cut(start, end)
|
||||
|
||||
(
|
||||
return interpolate(
|
||||
factor_n,
|
||||
coupling_n,
|
||||
factor_d,
|
||||
coupling_d,
|
||||
excited,
|
||||
reference,
|
||||
) = interpolate(
|
||||
factor_n,
|
||||
coupling_n,
|
||||
factor_d,
|
||||
coupling_d,
|
||||
self.signals["excited"].psd().sqrt(),
|
||||
self.signals["reference"].psd().sqrt(),
|
||||
)
|
||||
|
||||
return (
|
||||
factor_n,
|
||||
coupling_n,
|
||||
factor_d,
|
||||
coupling_d,
|
||||
excited,
|
||||
reference,
|
||||
phase,
|
||||
*signals,
|
||||
)
|
||||
|
||||
def fit_factors(
|
||||
|
@ -330,11 +348,12 @@ class Experiment:
|
|||
start_frequency: float = 15,
|
||||
end_frequency: float = 100,
|
||||
precision: int = 3,
|
||||
) -> dict[str, float]:
|
||||
) -> dict[str, None | float]:
|
||||
"""
|
||||
Find the best factor (first order only) to get the projection
|
||||
that best match the excited signal
|
||||
"""
|
||||
phase = 4 * pi / self.wavelength
|
||||
(
|
||||
factor_n,
|
||||
coupling_n,
|
||||
|
@ -342,8 +361,14 @@ class Experiment:
|
|||
coupling_d,
|
||||
excited,
|
||||
reference,
|
||||
) = self.get_factors(
|
||||
phase,
|
||||
) = self.get_factors(start=start_frequency, end=end_frequency)
|
||||
self.data.excited.sensibility.psd().sqrt(),
|
||||
self.data.reference.sensibility.psd().sqrt(),
|
||||
start=start_frequency,
|
||||
end=end_frequency,
|
||||
type=AcquisitionType.EXCITED,
|
||||
)
|
||||
|
||||
for index in range(precision):
|
||||
logger.debug(_("search for a local minimum"))
|
||||
|
@ -375,17 +400,17 @@ class Experiment:
|
|||
|
||||
if argmin(y) == 0:
|
||||
logger.warning(_("smaller than current range allows"))
|
||||
scatter_max: float = x[1]
|
||||
scatter_max = x[1]
|
||||
elif argmin(y) == len(x) - 1:
|
||||
logger.warning(_("bigger than current range allows"))
|
||||
scatter_min: float = x[-2]
|
||||
scatter_min = x[-2]
|
||||
else:
|
||||
scatter_min: float = x[argmin(y) - 1]
|
||||
scatter_max: float = x[argmin(y) + 1]
|
||||
scatter_min = x[argmin(y) - 1]
|
||||
scatter_max = x[argmin(y) + 1]
|
||||
|
||||
logger.debug(_("local minimum found"))
|
||||
|
||||
pre_scatter_factor: float = scatter_min / 2 + scatter_max / 2
|
||||
pre_scatter_factor = scatter_min / 2 + scatter_max / 2
|
||||
|
||||
logger.info(
|
||||
_("found a scattering factor of {factor}").format(
|
||||
|
@ -398,19 +423,19 @@ class Experiment:
|
|||
"true": pre_scatter_factor * 1e-6,
|
||||
}
|
||||
|
||||
def compute_projection(self) -> Signal:
|
||||
def compute_projection(
|
||||
self, type: AcquisitionType = AcquisitionType.EXCITED
|
||||
) -> Signal:
|
||||
"""
|
||||
Compute the projection of the noise from current values
|
||||
"""
|
||||
phase = 4 * pi / self.wavelength
|
||||
(
|
||||
factor_n,
|
||||
coupling_n,
|
||||
factor_d,
|
||||
coupling_d,
|
||||
_,
|
||||
_,
|
||||
phase,
|
||||
) = self.get_factors()
|
||||
) = self.get_factors(phase, type=type)
|
||||
|
||||
return Signal(
|
||||
factor_n.x,
|
||||
|
|
16
src/backscattering_analyzer/movements.py
Normal file
16
src/backscattering_analyzer/movements.py
Normal file
|
@ -0,0 +1,16 @@
|
|||
from science_signal.signal import Signal
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class Movements:
|
||||
"""
|
||||
Container for the movement of the bench and the mirror
|
||||
"""
|
||||
|
||||
bench: Signal
|
||||
mirror: Signal
|
||||
|
||||
def __init__(self, bench: Signal, mirror: Signal):
|
||||
self.bench = bench * 1e-6
|
||||
self.mirror = mirror * 1e-6
|
|
@ -1,4 +1,4 @@
|
|||
from science_signal.signal import Signal
|
||||
from science_signal.signal import Signal # type: ignore[reportMissingTypeStubs]
|
||||
from backscattering_analyzer import logger
|
||||
|
||||
from numpy.typing import NDArray
|
||||
|
@ -14,7 +14,7 @@ def process_movement(
|
|||
"""
|
||||
Clean and compute relative movement between the bench and the mirror
|
||||
"""
|
||||
logger.info(_("computing relative movement"))
|
||||
logger.debug(_("computing relative movement"))
|
||||
|
||||
return (
|
||||
(
|
||||
|
@ -22,13 +22,13 @@ def process_movement(
|
|||
- mirror * calibrations["mirror"]
|
||||
)
|
||||
.detrend("constant")
|
||||
.filter(end=5 * vibration_frequency)
|
||||
.filter(end=1)
|
||||
.detrend("constant")
|
||||
)
|
||||
|
||||
|
||||
def compute_light(
|
||||
pre_scatter_factor: float,
|
||||
pre_scatter_factor: float | None,
|
||||
factor_n: Signal,
|
||||
coupling_n: Signal,
|
||||
factor_d: Signal,
|
||||
|
@ -38,8 +38,12 @@ def compute_light(
|
|||
"""
|
||||
Optimized computing of light with pre-computed factor
|
||||
"""
|
||||
if pre_scatter_factor is None:
|
||||
raise Exception(_(
|
||||
"Cannot compute projection without backscattering factor"
|
||||
))
|
||||
return (
|
||||
sqrt(pre_scatter_factor)
|
||||
/ phase
|
||||
* (coupling_n * factor_n + coupling_d * factor_d).y
|
||||
)
|
||||
) # type: ignore[reportAny]
|
||||
|
|
Loading…
Reference in a new issue