from logging import getLogger, basicConfig, WARNING from argparse import ArgumentParser from pathlib import Path from urllib.parse import urlparse from importlib.metadata import version, PackageNotFoundError from rich.theme import Theme from rich.console import Console from rich.progress import Progress from rich.logging import RichHandler from rich.table import Table from matplotlib.pyplot import figure, show from numpy import sqrt, geomspace, mean, diff from finesse.model import Model from finesse.analysis.actions import ( TemporaryParameters, Change, Maximize, Minimize, Series, Noxaxis, ) from finesse.exceptions import ModelMissingAttributeError from astropy.units import W, rad, m, mW # pyright: ignore[reportAttributeAccessIssue] from gwpy.frequencyseries import FrequencySeries from h5py import File from git import Repo from backscattering_simulation_data import Root, Scatterer from backscattering_simulation_data.metadata import ( MetadataRoot, MetadataScatterer, Interferometer, Git, ) from utils import fix_dark_fringe, TF from MaskedReadoutDC import MaskedReadoutDC theme = Theme( { "strong": "cyan underline", "result": "red bold", } ) console = Console(theme=theme) basicConfig( level=WARNING, format="%(message)s", datefmt="[%X]", handlers=[RichHandler(console=console, rich_tracebacks=True)], ) logger = getLogger(__name__) # Parameters parser = ArgumentParser( prog="main", description="Generate a model file containing computed Transfer Function", usage="python main.py --filename={filename} --laser-power={laser power} --dark-fringe={dark fringe power} --sr-rotation={sr rotation} --max-tem={maxtem} [--precision={precision} | --quiet | --verbose]", ) parser.add_argument( "-f", "--filename", help="path to the new file containing the transfer function", type=Path, required=True, ) parser.add_argument( "-p", "--laser-power", help="power injected in the interferometer (in W)", type=float, required=True, ) parser.add_argument( "-d", "--dark-fringe", help="power on the dark fringe (dark fringe offset) (in W)", type=float, required=True, ) parser.add_argument( "-r", "--sr-rotation", help="yaw SR misalignement angle (in rad)", type=float, required=True, ) parser.add_argument( "-m", "--max-tem", help="number of modes in the model", type=int, required=True, ) parser.add_argument( "-n", "--precision", help="number of point in the transfer function model", type=int, default=1000, required=False, ) parser.add_argument( "-q", "--quiet", help="suppress outputs in the terminal", action="store_true", ) parser.add_argument( "-V", "--verbose", help="display graph and table on the terminal", action="store_true", ) parser.add_argument( "-a", "--name", help="name of the generated result. Should be unique", type=str, required=True, ) args = parser.parse_args() filename = args.filename # name of the created file laser_power = args.laser_power * W # power injected dark_fringe_power = args.dark_fringe * W # power on dark fringe precision = args.precision # number of point in the simulation sr_rotation = args.sr_rotation * rad # yaw SR misalignement angle maxtem = args.max_tem # number of modes in the model quiet = args.quiet # no progress bar verbose = args.verbose # display figure and tables name = args.name # unique name of the generated hdf5 data if filename.suffix != ".hdf5": filename = filename.with_suffix(".hdf5") with Progress(console=console) as progress: model_building = progress.add_task( description="building the model", start=False, total=None, visible=False, ) locking = progress.add_task( description="locking the model", start=False, total=None, visible=False, ) set_dark_fringe = progress.add_task( description="set dark fringe offset", start=False, total=None, visible=False, ) computing_TF = progress.add_task( description="computing transfer function", start=False, total=8, visible=False, ) write_file = progress.add_task( description="writing file", start=False, total=None, visible=False, ) C_B1_DETECTOR = "TF" # depends on the code itself (don't change) # Creating model progress.start_task(model_building) progress.update(model_building, visible=not quiet) model = Model() model.phase_config(zero_k00=False, zero_tem00_gouy=True) model.parse(Path("model.kat").read_text()) model.lambda0 = model.get("wavelength") model.get("fsig").f = 1.0 try: model.get("SR").xbeta = sr_rotation.to(rad).value model.get("laser").P = laser_power.to(W).value except ModelMissingAttributeError: logger.warning("SR, or laser is not define in the model") try: model.get("B1") logger.info("B1 already exists") except ModelMissingAttributeError: try: model.add( MaskedReadoutDC( "B1", optical_node=model.get("SDB1").p2.o, output_detectors=True, pdtype=None, ) ) logger.info("B1 created") except ModelMissingAttributeError: logger.warning("SDB1 does not exist in the model") progress.remove_task(model_building) progress.start_task(locking) progress.update(locking, visible=not quiet) # Locking result = model.run( TemporaryParameters( Series( Change( { "SR.misaligned": True, "PR.misaligned": True, } ), Maximize( model.get("NE_p1"), model.get("NORTH_ARM.DC"), bounds=[-180, 180], tol=1e-14, ), Maximize( model.get("WE_p1"), model.get("WEST_ARM.DC"), bounds=[-180, 180], tol=1e-14, ), Minimize( model.get("SR_p2"), model.get("MICH.DC"), bounds=[-180, 180], tol=1e-14, ), Change( { "PR.misaligned": False, }, ), Maximize( model.get("PR_p2"), model.get("PRCL.DC"), bounds=[-180, 180], tol=1e-14, ), Change( { "SR.misaligned": False, }, ), Maximize( model.get("B1_DC"), model.get("SRCL.DC"), bounds=[-180, 180], tol=1e-14, ), Change( { "SRCL.DC": -90, }, relative=True, ), ), exclude=( "NE.phi", "NI.Phi", "WE.phi", "WI.phi", "SR.phi", "PR.phi", "NORTH_ARM.DC", "WEST_ARM.DC", "DARM.DC", "MICH.DC", "PRCL.DC", "SRCL.DC", "SR.misaligned", "PR.misaligned", ), ), ) # Adding HOMs model.modes(maxtem=maxtem) model.get("B1").select_mask(exclude=(0, 0)) progress.remove_task(locking) if verbose: solution = model.run(Noxaxis()) if solution is None: raise Exception("error when modeling the locked model") table = Table(title="Puissances dans l'interferomètre") table.add_column("position", justify="left", style="white") table.add_column("puissance (W)", justify="left", style="cyan") table.add_row("Injection", str(model.get("laser").P.eval())) table.add_row("PR", str(solution["PR_p1"])) # pyright: ignore[reportArgumentType,reportCallIssue] table.add_row( "cavité de recyclage de puissance", str(solution["PR_p2"]), # pyright: ignore[reportArgumentType,reportCallIssue] ) table.add_row("cavité ouest", str(solution["WE_p1"])) # pyright: ignore[reportArgumentType,reportCallIssue] table.add_row("cavité nord", str(solution["NE_p1"])) # pyright: ignore[reportArgumentType,reportCallIssue] table.add_row("frange noire", str(solution["SR_p2"])) # pyright: ignore[reportArgumentType,reportCallIssue] table.add_row("SNEB", str(solution["SNEB_DC"])) # pyright: ignore[reportArgumentType,reportCallIssue] table.add_row("SWEB", str(solution["SWEB_DC"])) # pyright: ignore[reportArgumentType,reportCallIssue] table.add_row("SDB1", str(solution["SDB1_DC"])) # pyright: ignore[reportArgumentType,reportCallIssue] console.print(table) progress.start_task(set_dark_fringe) progress.update(set_dark_fringe, visible=not quiet) number, power = fix_dark_fringe( model, dark_fringe_power.to(W).value ) power = power * W if verbose: console.print( "[result]{dof}[/result] with [result]{number} steps[/result] give [result]{power:.6f}[/result] on the [strong]dark fringe[/strong]".format( number=number, dof=model.get("DARM").DC, power=power.to(mW), ), ) dark_fringe_power = ( power # to keep this result for the h5df metadata ) progress.remove_task(set_dark_fringe) progress.start_task(computing_TF) progress.update(computing_TF, visible=not quiet) # Computing TF model.get("SNEB").phi = model.get("NE").phi - 45 model.get("SWEB").phi = model.get("WE").phi - 45 model.get("SDB1").phi = model.get("SR").phi + 45 quad_tf: dict[str, TF] = dict() # bench is in phase quadrature (Kn) in_tf: dict[str, TF] = dict() # bench is in phase (Kp) for bench_name in ["SNEB", "SWEB", "SDB1"]: quad_tf[bench_name] = TF( model, ["{}_z".format(bench_name)], [C_B1_DETECTOR], geomspace(5, 10000, precision), # pyright: ignore[reportArgumentType] ) progress.update(computing_TF, advance=1) quad_tf["DARM"] = TF( model, ["DARM"], [C_B1_DETECTOR], geomspace(5, 10000, precision), # pyright: ignore[reportArgumentType] ) progress.update(computing_TF, advance=1) model.get("SNEB").phi = model.get("NE").phi model.get("SWEB").phi = model.get("WE").phi model.get("SDB1").phi = model.get("SR").phi for bench_name in ["SNEB", "SWEB", "SDB1"]: in_tf[bench_name] = TF( model, ["{}_z".format(bench_name)], [C_B1_DETECTOR], geomspace(5, 10000, precision), # pyright: ignore[reportArgumentType] ) progress.update(computing_TF, advance=1) in_tf["DARM"] = TF( model, ["DARM"], [C_B1_DETECTOR], geomspace(5, 10000, precision), # pyright: ignore[reportArgumentType] ) progress.update(computing_TF, advance=1) bench_names = ["SNEB", "SWEB", "SDB1"] if verbose: for bench_name in bench_names: Figure = figure() ax = Figure.gca() ax.set_title( "finesse {bench_name}".format(bench_name=bench_name) ).figure.gca().loglog( quad_tf[bench_name].f, sqrt( ( abs(quad_tf[bench_name].get(C_B1_DETECTOR)) / abs(quad_tf["DARM"].get(C_B1_DETECTOR)) / model.get("space_NI_NE").L.eval() ) ** 2 + ( abs(in_tf[bench_name].get(C_B1_DETECTOR)) / abs(in_tf["DARM"].get(C_B1_DETECTOR)) / model.get("space_NI_NE").L.eval() ) ** 2 ), label="sum of TF ($\\sqrt{{K_n^2 + K_p^2}}$)", )[0].figure.gca().loglog( quad_tf[bench_name].f, abs(quad_tf[bench_name].get(C_B1_DETECTOR)) / abs(quad_tf["DARM"].get(C_B1_DETECTOR)) / model.get("space_NI_NE").L.eval(), label="Kn", )[0].figure.gca().loglog( in_tf[bench_name].f, abs(in_tf[bench_name].get(C_B1_DETECTOR)) / abs(in_tf["DARM"].get(C_B1_DETECTOR)) / model.get("space_NI_NE").L.eval(), label="Kp", )[0].figure.gca().set_ylabel( "$\\frac{{m}}{{m}}$" ).figure.gca().legend().figure.gca().set_xlabel( "Frequencies ($Hz$)" ).figure.gca().grid(True, "both", "both") show() progress.remove_task(computing_TF) progress.start_task(write_file) progress.update(write_file, visible=not quiet) repo = Repo(Path(__file__).parent) current_version = "unknown" try: current_version = version(__name__) except PackageNotFoundError: pass data = Root( MetadataRoot( name=name, maxtem=maxtem, git=Git( remote=urlparse(repo.remote().url), commit=repo.active_branch.commit.hexsha, version=current_version, branch=repo.active_branch.name, ), interferometer=Interferometer( dark_fringe_power=dark_fringe_power, injection_power=laser_power, sr_yaw=sr_rotation, north_arm=model.get("space_NI_NE").L.eval() * m, west_arm=model.get("space_WI_WE").L.eval() * m, ), ), [ Scatterer( MetadataScatterer( "SDB1", ), kn=FrequencySeries( data=quad_tf["SDB1"].get(C_B1_DETECTOR), unit=m / W, f0=quad_tf["SDB1"].f[0], df=mean(diff(quad_tf["SDB1"].f)), ), kp=FrequencySeries( data=in_tf["SDB1"].get(C_B1_DETECTOR), unit=m / W, f0=in_tf["SDB1"].f[0], df=mean(diff(in_tf["SDB1"].f)), ), ), Scatterer( MetadataScatterer( "SNEB", ), kn=FrequencySeries( data=quad_tf["SNEB"].get(C_B1_DETECTOR), unit=m / W, f0=quad_tf["SNEB"].f[0], df=mean(diff(quad_tf["SNEB"].f)), ), kp=FrequencySeries( data=in_tf["SNEB"].get(C_B1_DETECTOR), unit=m / W, f0=in_tf["SNEB"].f[0], df=mean(diff(in_tf["SNEB"].f)), ), ), Scatterer( MetadataScatterer( "SWEB", ), kn=FrequencySeries( data=quad_tf["SWEB"].get(C_B1_DETECTOR), unit=m / W, f0=quad_tf["SWEB"].f[0], df=mean(diff(quad_tf["SWEB"].f)), ), kp=FrequencySeries( data=in_tf["SWEB"].get(C_B1_DETECTOR), unit=m / W, f0=in_tf["SWEB"].f[0], df=mean(diff(in_tf["SWEB"].f)), ), ), ], FrequencySeries( data=quad_tf["DARM"].get(C_B1_DETECTOR), unit=W / m, # power on B1 to arm displacement f0=quad_tf["DARM"].f[0], df=mean(diff(quad_tf["DARM"].f)), ), ) with File(name=filename, mode="w") as file: data.to_hdf5(file) progress.remove_task(write_file)