Add first code

This commit is contained in:
linarphy 2024-05-13 18:09:15 +02:00
parent fe8e185efb
commit e530961dd2
No known key found for this signature in database
GPG key ID: E61920135EFF2295
4 changed files with 427 additions and 0 deletions

36
pyproject.toml Normal file
View file

@ -0,0 +1,36 @@
[project]
name = "backscattering-analyzer"
version = "0.0.1"
authors = [
{ name="linarphy", email="linarphy@linarphy.com" },
]
description = "Analyze backscattering noise"
readme = "README.md"
requires-python = ">=3.8"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"Intended Audience :: Science/Research",
"Operating System :: OS Independent",
"Development Status :: 2 - Pre-Alpha",
"Topic :: Scientific/Engineering",
]
[project.urls]
Homepage = "https://git.linarphy.net/linarphy/backscattering-analyzer"
Issues = "https://git.linarphy.net/linarphy/backscattering-analyzer/issues"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/backscattering-analyzer"]
[tool.ruff]
line-length = 72
[tool.ruff.format]
quote-style = "double"
indent-style = "space"
docstring-code-format = true

View file

View file

@ -0,0 +1,245 @@
# display
from rich.console import Console
from rich.theme import Theme
from rich.traceback import install as traceback_install
# utils
from sys import argv as options
from settings import Settings
from numpy import loadtxt, array
from scipy.io.matlab import loadmat
# maths
from numpy import mean, zeros, pi, sin, cos, arange
from scipy.signal import welch as psd
from scipy.interpolate import CubicSpline
class Analyzer:
"""
Utility class to study backscattering light in VIRGO
"""
def __init__(self, arguments=[]):
self.theme = Theme(
{
"main": "white bold",
"option": "grey50 italic",
"argument": "red",
"description": "green italic",
}
)
self.console = Console(theme=self.theme)
traceback_install(console=self.console, show_locals=True)
self.settings = Settings(options[1:] + arguments, self.console)
self.load()
self.process_movement()
def load(self):
"""
Load all data
"""
if self.settings.verbose:
self.console.log("Load all data")
try:
self.load_bench()
self.load_mirror()
self.load_data()
self.load_reference()
self.load_coupling()
except Exception:
raise Exception("An error occured during data loading")
def load_bench(self):
"""
Load bench movement
"""
file = self.settings.bench_file()
if self.settings.verbose:
self.console.log("loading bench movement")
try:
self.bench_movement = loadtxt(file).T
except OSError:
raise Exception("{file} does not exist".format(file=file))
def load_mirror(self):
"""
Load mirror movement
"""
file = self.settings.mirror_file()
if self.settings.verbose:
self.console.log("loading mirror movement")
try:
self.mirror_movement = loadtxt(file).T
except OSError:
raise Exception("{file} does not exist".format(file=file))
def load_data(self):
"""
Load excited h(t)
"""
file = self.settings.data_file()
if self.settings.verbose:
self.console.log("loading excited h(t)")
try:
self.data_signal = loadtxt(file).T
except OSError:
raise Exception("{file} does not exist".format(file=file))
def load_reference(self):
"""
Load reference h(t)
"""
file = self.settings.reference_file()
if self.settings.verbose:
self.console.log("loading reference h(t)")
try:
self.reference_signal = loadtxt(file).T
except OSError:
raise Exception("{file} does not exist".format(file=file))
def load_coupling(self):
"""
Load modelisation coupling data
"""
file = self.settings.modelisation_file()
if self.settings.verbose:
self.console.log("loading matlab modelisation data")
try:
self.modelisation = loadmat(file)
except OSError:
raise Exception("{file} does not exist".format(file=file))
def process_movement(self):
"""
Clean and compute relative movement between the bench and the
miror
"""
if self.settings.verbose:
self.console.log("computing relative movement")
self.movement = array(
[
self.bench_movement[0],
self.bench_movement[1] - self.mirror_movement[1],
]
)
self.movement[1] -= mean(self.movement[1])
def psd_signal(self, signal, fft_length=10):
"""
Compute psd of a given signal
"""
if self.settings.verbose:
self.console.log("computing psd of the given signal")
rate = 1 / (signal[0, 1] - signal[0, 0])
return array(
psd(
signal[1],
rate,
nperseg=int(fft_length * rate),
detrend="linear",
)
)
def diff_psd_ref_data(self):
"""
Give difference between reference and excited signal psd
"""
if self.settings.verbose:
self.console.log(
"compute difference between reference and excited "
"signal (on the psd)"
)
freq, data_psd = self.psd_signal(self.data_signal)
_, reference_psd = self.psd_signal(self.reference_signal)
return array(
[
freq,
data_psd - reference_psd,
]
)
def interpolate(self, signals):
"""
Interpolate multiple signals with a single abciss list, which
has the smallest interval and the biggest rate
"""
rates = 1 / (signals[:, 0, 1] - signals[:, 0, 0])
start = max(signals[:, 0, 0])
end = min(signals[:, 0, -1])
splines = [
CubicSpline(signal[0], signal[1]) for signal in signals
]
x = arange(start, end, 1 / max(rates))
signals = [array([x, spline(x)]) for spline in splines]
return signals
def compute_light(self):
"""
Compute psd of the computed projection with current bench
excitation
"""
coupling = self.modelisation[self.settings.coupling_name()]
coupling_1, coupling_2 = (
array(
[
self.modelisation["freq"][0],
coupling[0],
]
),
array(
[
self.modelisation["freq"][0],
coupling[1],
]
),
)
rate = 1 / (self.movement[0, 1] - self.movement[0, 0])
result = zeros(self.bench_movement.shape[1])
scattering_factor = [0, 0]
for index in range(
len(self.modelisation[self.settings.coupling_name()])
):
argument = (
self.movement[1]
* 4
* pi
* (index + 1)
/ self.settings.wavelength
)
result += (
scattering_factor[index]
* (self.settings.wavelength / 4 * pi) ** 2
* (
(
psd(
sin(argument),
fs=rate,
nperseg=2999,
)[1]
* abs(coupling[0])
)
** 2
+ (
psd(
cos(argument),
fs=rate,
nperseg=2999,
)[1]
* abs(coupling[1])
)
** 2
)
)
return result

View file

@ -0,0 +1,146 @@
from pathlib import Path
class Settings:
def __init__(self, options, console):
self.version = "0.0.1"
self.help = (
"[main]display[/main] [option]\\[options][/option]"
"\n [argument]-b --bench[argument] [description]bench of the experiment[/description]"
"\n [argument]-d --date[argument] [description]date of the experiment[/description]"
"\n [argument]-h --help[argument] [description]print this help and exit[/description]"
"\n [argument]-v --verbose[argument] [description]be verbose[/description]"
"\n [argument]-V --version[argument] [description]print version number and exit[/description]"
)
self.verbose = False
self.bench = "SWEB"
self.date = "2023_03_24"
self.folder = Path("/home/demagny/data")
self.modelisation = "scatterCouplingO4.mat"
self.wavelength = 1.064e-6
index = 0
while index < len(options):
option = options[index]
try:
if option[0] != "-":
if option == "h":
option = "--help"
if option == "V":
option = "--version"
if "v" in option:
options[index] = option.replace("v", "")
option = "--verbose"
index -= 1
if option == "b":
index += 1
try:
options[index] = "--bench={}".format(
options[index]
)
except IndexError:
raise Exception(
"bench name needed after b option"
)
if option == "d":
index += 1
try:
options[index] = "--date={}".format(
options[index]
)
except IndexError:
raise Exception(
"date needed after d option"
)
if option[0] == "-":
if option[1] != "-":
if option == "-h":
option = "--help"
if option == "-V":
option = "--version"
if "v" in option:
options[index] = option.replace("v", "")
option = "--verbose"
index -= 1
if option == "-b":
index += 1
try:
options[index] = "--bench={}".format(
options[index]
)
except IndexError:
raise Exception(
"bench name needed after -b option"
)
if option == "-d":
index += 1
try:
options[index] = "--date={}".format(
options[index]
)
except IndexError:
raise Exception(
"date needed after -d option"
)
if option[:2] == "--":
if option == "--help":
console.print(self.help)
exit(0)
if option == "--version":
console.print(self.version)
exit(0)
if option == "--verbose":
self.verbose = True
try:
if option[:8] == "--bench=":
self.bench = str(option[8:]).upper()
except IndexError:
continue
try:
if option[:7] == "--date=":
self.date = option[7:]
except IndexError:
continue
else:
raise Exception("unknown option {}".format(option))
except IndexError:
pass
index += 1
def bench_file(self):
return self.folder / (
"{bench}_{date}_ben.csv".format(
bench=self.bench,
date=self.date,
)
)
def mirror_file(self):
return self.folder / (
"{bench}_{date}_mir.csv".format(
bench=self.bench,
date=self.date,
)
)
def data_file(self):
return self.folder / (
"{bench}_{date}_dat.csv".format(
bench=self.bench,
date=self.date,
)
)
def reference_file(self):
return self.folder / (
"{bench}_{date}_ref.csv".format(
bench=self.bench,
date=self.date,
)
)
def modelisation_file(self):
return self.folder / (self.modelisation)
def coupling_name(self):
return "{bench}coupling".format(bench=self.bench)