backscattering_simulation/main.py
2025-07-16 15:25:05 +02:00

533 lines
16 KiB
Python

from logging import getLogger, basicConfig, WARNING
from argparse import ArgumentParser
from pathlib import Path
from urllib.parse import urlparse
from importlib.metadata import version, PackageNotFoundError
from rich.theme import Theme
from rich.console import Console
from rich.progress import Progress
from rich.logging import RichHandler
from rich.table import Table
from matplotlib.pyplot import figure, show
from numpy import sqrt, geomspace, mean, diff
from finesse.model import Model
from finesse.analysis.actions import (
TemporaryParameters,
Change,
Maximize,
Minimize,
Series,
Noxaxis,
)
from finesse.exceptions import ModelMissingAttributeError
from astropy.units import W, rad, m, mW # pyright: ignore[reportAttributeAccessIssue]
from gwpy.frequencyseries import FrequencySeries
from h5py import File
from git import Repo
from backscattering_simulation_data import Root, Scatterer
from backscattering_simulation_data.metadata import (
MetadataRoot,
MetadataScatterer,
Interferometer,
Git,
)
from utils import fix_dark_fringe, TF
from MaskedReadoutDC import MaskedReadoutDC
theme = Theme(
{
"strong": "cyan underline",
"result": "red bold",
}
)
console = Console(theme=theme)
basicConfig(
level=WARNING,
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler(console=console, rich_tracebacks=True)],
)
logger = getLogger(__name__)
# Parameters
parser = ArgumentParser(
prog="main",
description="Generate a model file containing computed Transfer Function",
usage="python main.py --filename={filename} --laser-power={laser power} --dark-fringe={dark fringe power} --sr-rotation={sr rotation} --max-tem={maxtem} [--precision={precision} | --quiet | --verbose]",
)
parser.add_argument(
"-f",
"--filename",
help="path to the new file containing the transfer function",
type=Path,
required=True,
)
parser.add_argument(
"-p",
"--laser-power",
help="power injected in the interferometer (in W)",
type=float,
required=True,
)
parser.add_argument(
"-d",
"--dark-fringe",
help="power on the dark fringe (dark fringe offset) (in W)",
type=float,
required=True,
)
parser.add_argument(
"-r",
"--sr-rotation",
help="yaw SR misalignement angle (in rad)",
type=float,
required=True,
)
parser.add_argument(
"-m",
"--max-tem",
help="number of modes in the model",
type=int,
required=True,
)
parser.add_argument(
"-n",
"--precision",
help="number of point in the transfer function model",
type=int,
default=1000,
required=False,
)
parser.add_argument(
"-q",
"--quiet",
help="suppress outputs in the terminal",
action="store_true",
)
parser.add_argument(
"-V",
"--verbose",
help="display graph and table on the terminal",
action="store_true",
)
parser.add_argument(
"-a",
"--name",
help="name of the generated result. Should be unique",
type=str,
required=True,
)
args = parser.parse_args()
filename = args.filename # name of the created file
laser_power = args.laser_power * W # power injected
dark_fringe_power = args.dark_fringe * W # power on dark fringe
precision = args.precision # number of point in the simulation
sr_rotation = args.sr_rotation * rad # yaw SR misalignement angle
maxtem = args.max_tem # number of modes in the model
quiet = args.quiet # no progress bar
verbose = args.verbose # display figure and tables
name = args.name # unique name of the generated hdf5 data
if filename.suffix != ".hdf5":
filename = filename.with_suffix(".hdf5")
with Progress(console=console) as progress:
model_building = progress.add_task(
description="building the model",
start=False,
total=None,
visible=False,
)
locking = progress.add_task(
description="locking the model",
start=False,
total=None,
visible=False,
)
set_dark_fringe = progress.add_task(
description="set dark fringe offset",
start=False,
total=None,
visible=False,
)
computing_TF = progress.add_task(
description="computing transfer function",
start=False,
total=8,
visible=False,
)
write_file = progress.add_task(
description="writing file",
start=False,
total=None,
visible=False,
)
C_B1_DETECTOR = "TF" # depends on the code itself (don't change)
# Creating model
progress.start_task(model_building)
progress.update(model_building, visible=not quiet)
model = Model()
model.phase_config(zero_k00=False, zero_tem00_gouy=True)
model.parse(Path("model.kat").read_text())
model.lambda0 = model.get("wavelength")
model.get("fsig").f = 1.0
try:
model.get("SR").xbeta = sr_rotation.to(rad).value
model.get("laser").P = laser_power.to(W).value
except ModelMissingAttributeError:
logger.warning("SR, or laser is not define in the model")
try:
model.get("B1")
logger.info("B1 already exists")
except ModelMissingAttributeError:
try:
model.add(
MaskedReadoutDC(
"B1",
optical_node=model.get("SDB1").p2.o,
output_detectors=True,
pdtype=None,
)
)
logger.info("B1 created")
except ModelMissingAttributeError:
logger.warning("SDB1 does not exist in the model")
progress.remove_task(model_building)
progress.start_task(locking)
progress.update(locking, visible=not quiet)
# Locking
result = model.run(
TemporaryParameters(
Series(
Change(
{
"SR.misaligned": True,
"PR.misaligned": True,
}
),
Maximize(
model.get("NE_p1"),
model.get("NORTH_ARM.DC"),
bounds=[-180, 180],
tol=1e-14,
),
Maximize(
model.get("WE_p1"),
model.get("WEST_ARM.DC"),
bounds=[-180, 180],
tol=1e-14,
),
Minimize(
model.get("SR_p2"),
model.get("MICH.DC"),
bounds=[-180, 180],
tol=1e-14,
),
Change(
{
"PR.misaligned": False,
},
),
Maximize(
model.get("PR_p2"),
model.get("PRCL.DC"),
bounds=[-180, 180],
tol=1e-14,
),
Change(
{
"SR.misaligned": False,
},
),
Maximize(
model.get("B1_DC"),
model.get("SRCL.DC"),
bounds=[-180, 180],
tol=1e-14,
),
Change(
{
"SRCL.DC": -90,
},
relative=True,
),
),
exclude=(
"NE.phi",
"NI.Phi",
"WE.phi",
"WI.phi",
"SR.phi",
"PR.phi",
"NORTH_ARM.DC",
"WEST_ARM.DC",
"DARM.DC",
"MICH.DC",
"PRCL.DC",
"SRCL.DC",
"SR.misaligned",
"PR.misaligned",
),
),
)
# Adding HOMs
model.modes(maxtem=maxtem)
model.get("B1").select_mask(exclude=(0, 0))
progress.remove_task(locking)
if verbose:
solution = model.run(Noxaxis())
if solution is None:
raise Exception("error when modeling the locked model")
table = Table(title="Puissances dans l'interferomètre")
table.add_column("position", justify="left", style="white")
table.add_column("puissance (W)", justify="left", style="cyan")
table.add_row("Injection", str(model.get("laser").P.eval()))
table.add_row("PR", str(solution["PR_p1"])) # pyright: ignore[reportArgumentType,reportCallIssue]
table.add_row(
"cavité de recyclage de puissance",
str(solution["PR_p2"]), # pyright: ignore[reportArgumentType,reportCallIssue]
)
table.add_row("cavité ouest", str(solution["WE_p1"])) # pyright: ignore[reportArgumentType,reportCallIssue]
table.add_row("cavité nord", str(solution["NE_p1"])) # pyright: ignore[reportArgumentType,reportCallIssue]
table.add_row("frange noire", str(solution["SR_p2"])) # pyright: ignore[reportArgumentType,reportCallIssue]
table.add_row("SNEB", str(solution["SNEB_DC"])) # pyright: ignore[reportArgumentType,reportCallIssue]
table.add_row("SWEB", str(solution["SWEB_DC"])) # pyright: ignore[reportArgumentType,reportCallIssue]
table.add_row("SDB1", str(solution["SDB1_DC"])) # pyright: ignore[reportArgumentType,reportCallIssue]
console.print(table)
progress.start_task(set_dark_fringe)
progress.update(set_dark_fringe, visible=not quiet)
number, power = fix_dark_fringe(
model, dark_fringe_power.to(W).value
)
power = power * W
if verbose:
console.print(
"[result]{dof}[/result] with [result]{number} steps[/result] give [result]{power:.6f}[/result] on the [strong]dark fringe[/strong]".format(
number=number,
dof=model.get("DARM").DC,
power=power.to(mW),
),
)
dark_fringe_power = (
power # to keep this result for the h5df metadata
)
progress.remove_task(set_dark_fringe)
progress.start_task(computing_TF)
progress.update(computing_TF, visible=not quiet)
# Computing TF
model.get("SNEB").phi = model.get("NE").phi - 45
model.get("SWEB").phi = model.get("WE").phi - 45
model.get("SDB1").phi = model.get("SR").phi + 45
quad_tf: dict[str, TF] = dict() # bench is in phase quadrature (Kn)
in_tf: dict[str, TF] = dict() # bench is in phase (Kp)
for bench_name in ["SNEB", "SWEB", "SDB1"]:
quad_tf[bench_name] = TF(
model,
["{}_z".format(bench_name)],
[C_B1_DETECTOR],
geomspace(5, 10000, precision), # pyright: ignore[reportArgumentType]
)
progress.update(computing_TF, advance=1)
quad_tf["DARM"] = TF(
model,
["DARM"],
[C_B1_DETECTOR],
geomspace(5, 10000, precision), # pyright: ignore[reportArgumentType]
)
progress.update(computing_TF, advance=1)
model.get("SNEB").phi = model.get("NE").phi
model.get("SWEB").phi = model.get("WE").phi
model.get("SDB1").phi = model.get("SR").phi
for bench_name in ["SNEB", "SWEB", "SDB1"]:
in_tf[bench_name] = TF(
model,
["{}_z".format(bench_name)],
[C_B1_DETECTOR],
geomspace(5, 10000, precision), # pyright: ignore[reportArgumentType]
)
progress.update(computing_TF, advance=1)
in_tf["DARM"] = TF(
model,
["DARM"],
[C_B1_DETECTOR],
geomspace(5, 10000, precision), # pyright: ignore[reportArgumentType]
)
progress.update(computing_TF, advance=1)
bench_names = ["SNEB", "SWEB", "SDB1"]
if verbose:
for bench_name in bench_names:
Figure = figure()
ax = Figure.gca()
ax.set_title(
"finesse {bench_name}".format(bench_name=bench_name)
).figure.gca().loglog(
quad_tf[bench_name].f,
sqrt(
(
abs(quad_tf[bench_name].get(C_B1_DETECTOR))
/ abs(quad_tf["DARM"].get(C_B1_DETECTOR))
/ model.get("space_NI_NE").L.eval()
)
** 2
+ (
abs(in_tf[bench_name].get(C_B1_DETECTOR))
/ abs(in_tf["DARM"].get(C_B1_DETECTOR))
/ model.get("space_NI_NE").L.eval()
)
** 2
),
label="sum of TF ($\\sqrt{{K_n^2 + K_p^2}}$)",
)[0].figure.gca().loglog(
quad_tf[bench_name].f,
abs(quad_tf[bench_name].get(C_B1_DETECTOR))
/ abs(quad_tf["DARM"].get(C_B1_DETECTOR))
/ model.get("space_NI_NE").L.eval(),
label="Kn",
)[0].figure.gca().loglog(
in_tf[bench_name].f,
abs(in_tf[bench_name].get(C_B1_DETECTOR))
/ abs(in_tf["DARM"].get(C_B1_DETECTOR))
/ model.get("space_NI_NE").L.eval(),
label="Kp",
)[0].figure.gca().set_ylabel(
"$\\frac{{m}}{{m}}$"
).figure.gca().legend().figure.gca().set_xlabel(
"Frequencies ($Hz$)"
).figure.gca().grid(True, "both", "both")
show()
progress.remove_task(computing_TF)
progress.start_task(write_file)
progress.update(write_file, visible=not quiet)
repo = Repo(Path(__file__).parent)
current_version = "unknown"
try:
current_version = version(__name__)
except PackageNotFoundError:
pass
data = Root(
MetadataRoot(
name=name,
maxtem=maxtem,
git=Git(
remote=urlparse(repo.remote().url),
commit=repo.active_branch.commit.hexsha,
version=current_version,
branch=repo.active_branch.name,
),
interferometer=Interferometer(
dark_fringe_power=dark_fringe_power,
injection_power=laser_power,
sr_yaw=sr_rotation,
north_arm=model.get("space_NI_NE").L.eval() * m,
west_arm=model.get("space_WI_WE").L.eval() * m,
),
),
[
Scatterer(
MetadataScatterer(
"SDB1",
),
kn=FrequencySeries(
data=quad_tf["SDB1"].get(C_B1_DETECTOR),
unit=m / W,
f0=quad_tf["SDB1"].f[0],
df=mean(diff(quad_tf["SDB1"].f)),
),
kp=FrequencySeries(
data=in_tf["SDB1"].get(C_B1_DETECTOR),
unit=m / W,
f0=in_tf["SDB1"].f[0],
df=mean(diff(in_tf["SDB1"].f)),
),
),
Scatterer(
MetadataScatterer(
"SNEB",
),
kn=FrequencySeries(
data=quad_tf["SNEB"].get(C_B1_DETECTOR),
unit=m / W,
f0=quad_tf["SNEB"].f[0],
df=mean(diff(quad_tf["SNEB"].f)),
),
kp=FrequencySeries(
data=in_tf["SNEB"].get(C_B1_DETECTOR),
unit=m / W,
f0=in_tf["SNEB"].f[0],
df=mean(diff(in_tf["SNEB"].f)),
),
),
Scatterer(
MetadataScatterer(
"SWEB",
),
kn=FrequencySeries(
data=quad_tf["SWEB"].get(C_B1_DETECTOR),
unit=m / W,
f0=quad_tf["SWEB"].f[0],
df=mean(diff(quad_tf["SWEB"].f)),
),
kp=FrequencySeries(
data=in_tf["SWEB"].get(C_B1_DETECTOR),
unit=m / W,
f0=in_tf["SWEB"].f[0],
df=mean(diff(in_tf["SWEB"].f)),
),
),
],
FrequencySeries(
data=quad_tf["DARM"].get(C_B1_DETECTOR),
unit=W / m, # power on B1 to arm displacement
f0=quad_tf["DARM"].f[0],
df=mean(diff(quad_tf["DARM"].f)),
),
)
with File(name=filename, mode="w") as file:
data.to_hdf5(file)
progress.remove_task(write_file)