Refactor, version up
This commit is contained in:
parent
8aaebbf6b7
commit
92d3d4885c
4 changed files with 272 additions and 238 deletions
|
@ -0,0 +1,31 @@
|
|||
from numpy.typing import NDArray
|
||||
from numpy import arange
|
||||
from scipy.interpolate import CubicSpline
|
||||
|
||||
|
||||
def interpolate_abciss(signals: tuple["Signal", ...]) -> NDArray:
|
||||
"""
|
||||
return the axis that would be used by the interpolate function
|
||||
"""
|
||||
rates = [signal.rate for signal in signals]
|
||||
|
||||
start = max([signal.x[0] for signal in signals])
|
||||
end = min([signal.x[-1] for signal in signals])
|
||||
|
||||
return arange(start, end, 1 / max(rates))
|
||||
|
||||
|
||||
def interpolate(signals: tuple["Signal", ...]) -> tuple["Signal", ...]:
|
||||
"""
|
||||
Interpolate multiple signals with a single abciss list, which has
|
||||
the smallest interval and the bigget rate
|
||||
"""
|
||||
splines = [CubicSpline(signal.x, signal.y) for signal in signals]
|
||||
|
||||
x = interpolate_abciss(signals)
|
||||
|
||||
new_signals = [
|
||||
Signal(x, spline(x), signals[0].settings) for spline in splines
|
||||
]
|
||||
|
||||
return tuple(new_signals)
|
|
@ -1,33 +1,13 @@
|
|||
# display
|
||||
from rich.console import Console
|
||||
from rich.theme import Theme
|
||||
from rich.traceback import install as traceback_install
|
||||
|
||||
# utils
|
||||
from sys import argv as options
|
||||
from backscattering_analyzer.settings import Settings
|
||||
from backscattering_analyzer.signal import Signal
|
||||
from backscattering_analyzer import interpolate, interpolate_abciss
|
||||
from numpy import loadtxt, array
|
||||
from scipy.io.matlab import loadmat
|
||||
from numpy.typing import NDArray
|
||||
|
||||
# maths
|
||||
from numpy import (
|
||||
mean,
|
||||
zeros,
|
||||
pi,
|
||||
sin,
|
||||
cos,
|
||||
arange,
|
||||
sqrt,
|
||||
where,
|
||||
real,
|
||||
conj,
|
||||
append,
|
||||
flip,
|
||||
)
|
||||
from scipy.signal import welch as psd
|
||||
from scipy.interpolate import CubicSpline
|
||||
from scipy.fft import fft, ifft, fftfreq
|
||||
from numpy import zeros, pi, sqrt
|
||||
|
||||
|
||||
class Analyzer:
|
||||
|
@ -36,17 +16,7 @@ class Analyzer:
|
|||
"""
|
||||
|
||||
def __init__(self, arguments: list = []) -> None:
|
||||
self.theme = Theme(
|
||||
{
|
||||
"main": "white bold",
|
||||
"option": "grey50 italic",
|
||||
"argument": "red",
|
||||
"description": "green italic",
|
||||
}
|
||||
)
|
||||
self.console = Console(theme=self.theme)
|
||||
traceback_install(console=self.console, show_locals=True)
|
||||
self.settings = Settings(options[1:] + arguments, self.console)
|
||||
self.settings = Settings(options[1:] + arguments)
|
||||
self.load()
|
||||
self.process_movement()
|
||||
|
||||
|
@ -54,8 +24,7 @@ class Analyzer:
|
|||
"""
|
||||
Load all data
|
||||
"""
|
||||
if self.settings.verbose:
|
||||
self.console.log("Load all data")
|
||||
self.settings.log("Load all data")
|
||||
try:
|
||||
self.load_bench()
|
||||
self.load_mirror()
|
||||
|
@ -70,10 +39,11 @@ class Analyzer:
|
|||
Load bench movement
|
||||
"""
|
||||
file = self.settings.bench_file()
|
||||
if self.settings.verbose:
|
||||
self.console.log("loading bench movement")
|
||||
self.settings.log("loading bench movement")
|
||||
try:
|
||||
self.bench_movement = loadtxt(file).T
|
||||
self.bench_movement = Signal(
|
||||
*loadtxt(file).T, self.settings
|
||||
)
|
||||
except OSError:
|
||||
raise Exception("{file} does not exist".format(file=file))
|
||||
|
||||
|
@ -82,10 +52,11 @@ class Analyzer:
|
|||
Load mirror movement
|
||||
"""
|
||||
file = self.settings.mirror_file()
|
||||
if self.settings.verbose:
|
||||
self.console.log("loading mirror movement")
|
||||
self.settings.log("loading mirror movement")
|
||||
try:
|
||||
self.mirror_movement = loadtxt(file).T
|
||||
self.mirror_movement = Signal(
|
||||
*loadtxt(file).T, self.settings
|
||||
)
|
||||
except OSError:
|
||||
raise Exception("{file} does not exist".format(file=file))
|
||||
|
||||
|
@ -94,10 +65,9 @@ class Analyzer:
|
|||
Load excited h(t)
|
||||
"""
|
||||
file = self.settings.data_file()
|
||||
if self.settings.verbose:
|
||||
self.console.log("loading excited h(t)")
|
||||
self.setings.log("loading excited h(t)")
|
||||
try:
|
||||
self.data_signal = loadtxt(file).T
|
||||
self.data_signal = Signal(*loadtxt(file).T, self.settings)
|
||||
except OSError:
|
||||
raise Exception("{file} does not exist".format(file=file))
|
||||
|
||||
|
@ -106,10 +76,11 @@ class Analyzer:
|
|||
Load reference h(t)
|
||||
"""
|
||||
file = self.settings.reference_file()
|
||||
if self.settings.verbose:
|
||||
self.console.log("loading reference h(t)")
|
||||
self.settings.log("loading reference h(t)")
|
||||
try:
|
||||
self.reference_signal = loadtxt(file).T
|
||||
self.reference_signal = Signal(
|
||||
*loadtxt(file).T, self.settings
|
||||
)
|
||||
except OSError:
|
||||
raise Exception("{file} does not exist".format(file=file))
|
||||
|
||||
|
@ -118,10 +89,22 @@ class Analyzer:
|
|||
Load modelisation coupling data
|
||||
"""
|
||||
file = self.settings.modelisation_file()
|
||||
if self.settings.verbose:
|
||||
self.console.log("loading matlab modelisation data")
|
||||
self.settings.log("loading matlab modelisation data")
|
||||
try:
|
||||
self.modelisation = loadmat(file)
|
||||
coupling_values = self.modelisation[
|
||||
self.settings.coupling_name()
|
||||
]
|
||||
self.coupling = array(
|
||||
[
|
||||
Signal(
|
||||
self.modelisation["freq"][0],
|
||||
coupling,
|
||||
self.settings,
|
||||
)
|
||||
for coupling in coupling_values
|
||||
]
|
||||
)
|
||||
except OSError:
|
||||
raise Exception("{file} does not exist".format(file=file))
|
||||
|
||||
|
@ -130,171 +113,43 @@ class Analyzer:
|
|||
Clean and compute relative movement between the bench and the
|
||||
miror
|
||||
"""
|
||||
if self.settings.verbose:
|
||||
self.console.log("computing relative movement")
|
||||
self.settings.log("computing relative movement")
|
||||
|
||||
self.movement = array(
|
||||
[
|
||||
self.bench_movement[0],
|
||||
(self.settings.calib_bench * self.bench_movement[1])
|
||||
- (
|
||||
self.settings.calib_mirror * self.mirror_movement[1]
|
||||
),
|
||||
]
|
||||
)
|
||||
self.movement[1] -= mean(self.movement[1])
|
||||
self.movement = self.low_pass_filter(
|
||||
self.movement,
|
||||
5 * self.settings.vibration_frequency,
|
||||
)
|
||||
self.movement = self.movement[:, :-500000]
|
||||
|
||||
def psd_signal(
|
||||
self, signal: NDArray, fft_length: int = 10
|
||||
) -> NDArray:
|
||||
"""
|
||||
Compute psd of a given signal
|
||||
"""
|
||||
if self.settings.verbose:
|
||||
self.console.log("computing psd of the given signal")
|
||||
rate = 1 / (signal[0, 1] - signal[0, 0])
|
||||
|
||||
return array(
|
||||
psd(
|
||||
signal[1],
|
||||
rate,
|
||||
nperseg=int(fft_length * rate),
|
||||
detrend="linear",
|
||||
self.movement = (
|
||||
(
|
||||
self.bench_movement * self.settings.calib_bench
|
||||
- self.mirror_movement * self.settings.calib_mirror
|
||||
)
|
||||
.detrend("linear")
|
||||
.low_pass_filter(5 * self.settings.vibration_frequency)
|
||||
)
|
||||
|
||||
def diff_psd_ref_data(self):
|
||||
"""
|
||||
Give difference between reference and excited signal psd
|
||||
"""
|
||||
if self.settings.verbose:
|
||||
self.console.log(
|
||||
"compute difference between reference and excited "
|
||||
"signal (on the psd)"
|
||||
)
|
||||
freq, data_psd = self.psd_signal(self.data_signal)
|
||||
_, reference_psd = self.psd_signal(self.reference_signal)
|
||||
return array(
|
||||
[
|
||||
freq,
|
||||
data_psd - reference_psd,
|
||||
]
|
||||
)
|
||||
|
||||
def interpolate_abciss(
|
||||
self, abcisses: tuple[NDArray, ...] | list[NDArray]
|
||||
) -> NDArray:
|
||||
"""
|
||||
Return the axis that would be used by the interpolate method
|
||||
"""
|
||||
rates = 1 / array(
|
||||
[abciss[1] - abciss[0] for abciss in abcisses]
|
||||
)
|
||||
start = max([abciss[0] for abciss in abcisses])
|
||||
end = min([abciss[-1] for abciss in abcisses])
|
||||
|
||||
return arange(start, end, 1 / max(rates))
|
||||
|
||||
def interpolate(
|
||||
self, signals: tuple[NDArray, ...] | list[NDArray]
|
||||
) -> list[NDArray]:
|
||||
"""
|
||||
Interpolate multiple signals with a single abciss list, which
|
||||
has the smallest interval and the biggest rate
|
||||
"""
|
||||
|
||||
splines = [
|
||||
CubicSpline(signal[0], signal[1]) for signal in signals
|
||||
]
|
||||
|
||||
x = self.interpolate_abciss([signal[0] for signal in signals])
|
||||
|
||||
signals = [array([x, spline(x)]) for spline in splines]
|
||||
|
||||
return signals
|
||||
|
||||
def compute_light(self):
|
||||
"""
|
||||
Compute psd of the computed projection with current bench
|
||||
excitation
|
||||
"""
|
||||
coupling = self.modelisation[self.settings.coupling_name()]
|
||||
|
||||
freq_sample = 1 / (self.movement[0, 1] - self.movement[0, 0])
|
||||
nperseg = (
|
||||
int(2 * len(coupling[0])) - 1 + len(coupling[0])
|
||||
) # minimum to keep same length than coupling, but not for
|
||||
# the same frequency range, so more
|
||||
|
||||
psd_args = {
|
||||
"fs": freq_sample,
|
||||
"nperseg": nperseg,
|
||||
"noverlap": nperseg / 2,
|
||||
"nfft": nperseg,
|
||||
}
|
||||
|
||||
result = zeros(
|
||||
len(
|
||||
self.interpolate_abciss(
|
||||
[
|
||||
psd(
|
||||
self.movement[1],
|
||||
**psd_args,
|
||||
)[0],
|
||||
self.modelisation["freq"][0],
|
||||
]
|
||||
)
|
||||
)
|
||||
len(interpolate_abciss(self.movement, self.coupling[0]))
|
||||
)
|
||||
|
||||
# frequencies depends of psd result, which we do not have yet
|
||||
frequencies = 0
|
||||
|
||||
for index in range(len(coupling)):
|
||||
for index in range(len(self.coupling)):
|
||||
phase = (index + 1) * 4 * pi / self.settings.wavelength
|
||||
|
||||
factor_n = array(
|
||||
[
|
||||
*psd(
|
||||
sin(phase * self.movement[1]),
|
||||
**psd_args,
|
||||
)
|
||||
]
|
||||
)
|
||||
coupling_n = array(
|
||||
[
|
||||
self.modelisation["freq"][0],
|
||||
abs(coupling[0]),
|
||||
]
|
||||
)
|
||||
factor_d = array(
|
||||
[
|
||||
*psd(
|
||||
cos(phase * self.movement[1]),
|
||||
**psd_args,
|
||||
)
|
||||
]
|
||||
)
|
||||
coupling_d = array(
|
||||
[
|
||||
self.modelisation["freq"][0],
|
||||
abs(coupling[1]),
|
||||
]
|
||||
)
|
||||
factor_n = (self.movement * phase).sin().psd()
|
||||
coupling_n = self.coupling[0].abs()
|
||||
factor_d = (self.movement * phase).cos().psd()
|
||||
coupling_d = self.coupling[1].abs()
|
||||
|
||||
factor_n, coupling_n, factor_d, coupling_d = (
|
||||
self.interpolate(
|
||||
(factor_n, coupling_n, factor_d, coupling_d)
|
||||
)
|
||||
factor_n, coupling_n, factor_d, coupling_d = interpolate(
|
||||
(factor_n, coupling_n, factor_d, coupling_d)
|
||||
)
|
||||
|
||||
# no need to redefine it each time but simpler here
|
||||
frequencies = factor_n[0]
|
||||
frequencies = factor_n.x
|
||||
"""
|
||||
result += (
|
||||
sqrt(self.settings.scattering_factor[index])
|
||||
|
@ -313,7 +168,7 @@ class Analyzer:
|
|||
* (
|
||||
coupling_n[1] * factor_n[1]
|
||||
+ coupling_d[1] * factor_d[1]
|
||||
)
|
||||
).y
|
||||
)
|
||||
return array(
|
||||
[
|
||||
|
@ -321,42 +176,3 @@ class Analyzer:
|
|||
result,
|
||||
]
|
||||
)
|
||||
|
||||
def low_pass_filter(
|
||||
self, signal: NDArray, cutoff: float
|
||||
) -> NDArray:
|
||||
"""
|
||||
Cut higher frequencies than cutoff for a given temporal signal
|
||||
"""
|
||||
if self.settings.verbose:
|
||||
self.console.log("filtering with a low pass filter")
|
||||
sample_spacing = signal[0, 1] - signal[0, 0]
|
||||
freq_signal = array(fft(signal[1]))
|
||||
frequencies = fftfreq(signal[1].shape[0], sample_spacing)
|
||||
index_to_remove = where(abs(frequencies) > cutoff)
|
||||
freq_signal[index_to_remove] = 0
|
||||
return array(
|
||||
[
|
||||
real(
|
||||
array(
|
||||
ifft(
|
||||
array(
|
||||
[
|
||||
frequencies,
|
||||
conj(flip(frequencies)),
|
||||
]
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
real(
|
||||
array(
|
||||
ifft(
|
||||
array(
|
||||
[freq_signal, conj(flip(freq_signal))]
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
]
|
||||
)
|
||||
|
|
|
@ -1,10 +1,23 @@
|
|||
from pathlib import Path
|
||||
from rich.console import Console
|
||||
from rich.theme import Theme
|
||||
|
||||
from rich.traceback import install as traceback_install
|
||||
|
||||
|
||||
class Settings:
|
||||
def __init__(self, options: list, console: Console):
|
||||
self.version = "0.0.1"
|
||||
def __init__(self, options: list):
|
||||
self.theme = Theme(
|
||||
{
|
||||
"main": "white bold",
|
||||
"option": "grey50 italic",
|
||||
"argument": "red",
|
||||
"description": "green italic",
|
||||
}
|
||||
)
|
||||
self.console = Console(theme=self.theme)
|
||||
traceback_install(console=self.console, show_locals=True)
|
||||
self.version = "0.1.0"
|
||||
self.help = (
|
||||
"[main]display[/main] [option]\\[options][/option]"
|
||||
"\n [argument]-b --bench[/argument] [description]bench of the experiment[/description]"
|
||||
|
@ -89,10 +102,10 @@ class Settings:
|
|||
)
|
||||
if option[:2] == "--":
|
||||
if option == "--help":
|
||||
console.print(self.help)
|
||||
self.console.print(self.help)
|
||||
exit(0)
|
||||
if option == "--version":
|
||||
console.print(self.version)
|
||||
self.console.print(self.version)
|
||||
exit(0)
|
||||
if option == "--verbose":
|
||||
self.verbose = True
|
||||
|
@ -166,3 +179,7 @@ class Settings:
|
|||
|
||||
def coupling_name(self) -> str:
|
||||
return "{bench}coupling".format(bench=self.bench)
|
||||
|
||||
def log(self, message) -> None:
|
||||
if self.verbose:
|
||||
self.console.log(message)
|
||||
|
|
170
src/backscattering_analyzer/signal.py
Normal file
170
src/backscattering_analyzer/signal.py
Normal file
|
@ -0,0 +1,170 @@
|
|||
from __future__ import annotations
|
||||
from numpy.typing import NDArray
|
||||
from backscattering_analyzer.settings import Settings
|
||||
from backscattering_analyzer import interpolate
|
||||
|
||||
from scipy.signal import welch, detrend
|
||||
from scipy.fft import irfft, rfft, rfftfreq
|
||||
from numpy import where, sin, cos, array
|
||||
|
||||
|
||||
class Signal:
|
||||
"""
|
||||
A given signal, with its time/frequency coordinates
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, x: NDArray, value: NDArray, settings: Settings
|
||||
) -> None:
|
||||
if x.shape != value.shape:
|
||||
raise Exception("x and y does not have the same dimension")
|
||||
self.sampling = x[1] - x[0]
|
||||
self.rate = 1 / self.sampling
|
||||
self.x = x
|
||||
self.frequencies = self.x # alias
|
||||
self.time = self.x # alias
|
||||
self.y = value
|
||||
self.value = self.y # alias
|
||||
self.settings = settings
|
||||
self.length = value.shape[0]
|
||||
|
||||
def psd(self, fft_length: int = 10) -> Signal:
|
||||
"""
|
||||
Compute psd of a given signal
|
||||
"""
|
||||
self.settings.log("computing psd of the signal")
|
||||
|
||||
return Signal(
|
||||
*welch(
|
||||
self.value,
|
||||
self.rate,
|
||||
nperseg=int(fft_length * self.rate),
|
||||
detrend="linear",
|
||||
),
|
||||
self.settings,
|
||||
)
|
||||
|
||||
def detrend(self, type: str = "linear") -> Signal:
|
||||
"""
|
||||
Short alias for scipy.detrend with default value
|
||||
"""
|
||||
return Signal(
|
||||
self.x,
|
||||
detrend(self.y, type=type),
|
||||
self.settings,
|
||||
)
|
||||
|
||||
def low_pass_filter(self, cutoff):
|
||||
"""
|
||||
Cut higher frequencies than cutoff for this signal
|
||||
"""
|
||||
self.settings.log("filtering with a low pass filter")
|
||||
|
||||
freq_x = rfftfreq(self.length, self.sampling)
|
||||
freq_y = rfft(self.y)
|
||||
index_to_remove = where(abs(freq_x) > cutoff)
|
||||
freq_y[index_to_remove] = 0
|
||||
|
||||
return Signal(
|
||||
self.x,
|
||||
irfft(freq_y),
|
||||
self.settings,
|
||||
)
|
||||
|
||||
def abs(self) -> Signal:
|
||||
"""
|
||||
Abs of the signal (hacky way)
|
||||
"""
|
||||
return Signal(
|
||||
self.x,
|
||||
abs(self.y),
|
||||
self.settings,
|
||||
)
|
||||
|
||||
def cos(self) -> Signal:
|
||||
"""
|
||||
Cosinus of the signal (hacky way)
|
||||
"""
|
||||
return Signal(
|
||||
self.x,
|
||||
cos(self.y),
|
||||
self.settings,
|
||||
)
|
||||
|
||||
def sin(self) -> Signal:
|
||||
"""
|
||||
Sinus of the signal (hacky way)
|
||||
"""
|
||||
return Signal(
|
||||
self.x,
|
||||
sin(self.y),
|
||||
self.settings,
|
||||
)
|
||||
|
||||
def __sub__(self, other: float | Signal) -> Signal:
|
||||
"""
|
||||
Substract float or a signal to another
|
||||
"""
|
||||
if isinstance(other, Signal):
|
||||
signal_1, signal_2 = interpolate((self, other))
|
||||
|
||||
return Signal(
|
||||
signal_1.frequencies,
|
||||
signal_2.value - signal_1.value,
|
||||
self.settings,
|
||||
)
|
||||
else:
|
||||
return Signal(
|
||||
self.x,
|
||||
self.y - other,
|
||||
self.settings,
|
||||
)
|
||||
|
||||
def __add__(self, other: float | Signal) -> Signal:
|
||||
"""
|
||||
Add a float or a signal to another
|
||||
"""
|
||||
if isinstance(other, Signal):
|
||||
signal_1, signal_2 = interpolate((self, other))
|
||||
|
||||
return Signal(
|
||||
signal_1.x,
|
||||
signal_1 + signal_2,
|
||||
self.settings,
|
||||
)
|
||||
else:
|
||||
return Signal(
|
||||
self.x,
|
||||
self.y + other,
|
||||
self.settings,
|
||||
)
|
||||
|
||||
def __mul__(self, other: float | Signal) -> Signal:
|
||||
"""
|
||||
Multiply a signal by a value or another signal
|
||||
"""
|
||||
if isinstance(other, Signal):
|
||||
signal_1, signal_2 = interpolate((self, other))
|
||||
|
||||
return Signal(
|
||||
signal_1.frequencies,
|
||||
signal_1.value * signal_2.value,
|
||||
self.settings,
|
||||
)
|
||||
else:
|
||||
return Signal(
|
||||
self.x,
|
||||
other * self.value,
|
||||
self.settings,
|
||||
)
|
||||
|
||||
def __array__(self) -> NDArray:
|
||||
"""
|
||||
Convert to a numpy array
|
||||
"""
|
||||
return array(
|
||||
[
|
||||
self.x,
|
||||
self.y,
|
||||
]
|
||||
)
|
Loading…
Reference in a new issue