Fix movement data with low pass filter
This commit is contained in:
parent
d1e649d61e
commit
8aaebbf6b7
2 changed files with 94 additions and 24 deletions
|
@ -8,9 +8,23 @@ from sys import argv as options
|
|||
from backscattering_analyzer.settings import Settings
|
||||
from numpy import loadtxt, array
|
||||
from scipy.io.matlab import loadmat
|
||||
from numpy.typing import NDArray
|
||||
|
||||
# maths
|
||||
from numpy import mean, zeros, pi, sin, cos, arange, sqrt, where, real
|
||||
from numpy import (
|
||||
mean,
|
||||
zeros,
|
||||
pi,
|
||||
sin,
|
||||
cos,
|
||||
arange,
|
||||
sqrt,
|
||||
where,
|
||||
real,
|
||||
conj,
|
||||
append,
|
||||
flip,
|
||||
)
|
||||
from scipy.signal import welch as psd
|
||||
from scipy.interpolate import CubicSpline
|
||||
from scipy.fft import fft, ifft, fftfreq
|
||||
|
@ -21,7 +35,7 @@ class Analyzer:
|
|||
Utility class to study backscattering light in VIRGO
|
||||
"""
|
||||
|
||||
def __init__(self, arguments=[]):
|
||||
def __init__(self, arguments: list = []) -> None:
|
||||
self.theme = Theme(
|
||||
{
|
||||
"main": "white bold",
|
||||
|
@ -129,9 +143,15 @@ class Analyzer:
|
|||
]
|
||||
)
|
||||
self.movement[1] -= mean(self.movement[1])
|
||||
self.movement = self.low_pass_filter(
|
||||
self.movement,
|
||||
5 * self.settings.vibration_frequency,
|
||||
)
|
||||
self.movement = self.movement[:, :-500000]
|
||||
|
||||
def psd_signal(self, signal, fft_length=10):
|
||||
def psd_signal(
|
||||
self, signal: NDArray, fft_length: int = 10
|
||||
) -> NDArray:
|
||||
"""
|
||||
Compute psd of a given signal
|
||||
"""
|
||||
|
@ -166,7 +186,9 @@ class Analyzer:
|
|||
]
|
||||
)
|
||||
|
||||
def interpolate_abciss(self, abcisses):
|
||||
def interpolate_abciss(
|
||||
self, abcisses: tuple[NDArray, ...] | list[NDArray]
|
||||
) -> NDArray:
|
||||
"""
|
||||
Return the axis that would be used by the interpolate method
|
||||
"""
|
||||
|
@ -178,7 +200,9 @@ class Analyzer:
|
|||
|
||||
return arange(start, end, 1 / max(rates))
|
||||
|
||||
def interpolate(self, signals):
|
||||
def interpolate(
|
||||
self, signals: tuple[NDArray, ...] | list[NDArray]
|
||||
) -> list[NDArray]:
|
||||
"""
|
||||
Interpolate multiple signals with a single abciss list, which
|
||||
has the smallest interval and the biggest rate
|
||||
|
@ -207,14 +231,20 @@ class Analyzer:
|
|||
) # minimum to keep same length than coupling, but not for
|
||||
# the same frequency range, so more
|
||||
|
||||
psd_args = {
|
||||
"fs": freq_sample,
|
||||
"nperseg": nperseg,
|
||||
"noverlap": nperseg / 2,
|
||||
"nfft": nperseg,
|
||||
}
|
||||
|
||||
result = zeros(
|
||||
len(
|
||||
self.interpolate_abciss(
|
||||
[
|
||||
psd(
|
||||
self.movement[1],
|
||||
fs=freq_sample,
|
||||
nperseg=nperseg,
|
||||
**psd_args,
|
||||
)[0],
|
||||
self.modelisation["freq"][0],
|
||||
]
|
||||
|
@ -222,6 +252,7 @@ class Analyzer:
|
|||
)
|
||||
)
|
||||
|
||||
# frequencies depends of psd result, which we do not have yet
|
||||
frequencies = 0
|
||||
|
||||
for index in range(len(coupling)):
|
||||
|
@ -231,8 +262,7 @@ class Analyzer:
|
|||
[
|
||||
*psd(
|
||||
sin(phase * self.movement[1]),
|
||||
fs=freq_sample,
|
||||
nperseg=nperseg,
|
||||
**psd_args,
|
||||
)
|
||||
]
|
||||
)
|
||||
|
@ -246,8 +276,7 @@ class Analyzer:
|
|||
[
|
||||
*psd(
|
||||
cos(phase * self.movement[1]),
|
||||
fs=freq_sample,
|
||||
nperseg=nperseg,
|
||||
**psd_args,
|
||||
)
|
||||
]
|
||||
)
|
||||
|
@ -264,15 +293,26 @@ class Analyzer:
|
|||
)
|
||||
)
|
||||
|
||||
# no need to redefine it each time but simpler here
|
||||
frequencies = factor_n[0]
|
||||
"""
|
||||
result += (
|
||||
sqrt(self.settings.scattering_factor[index])
|
||||
* self.settings.wavelength / ( 4 * pi )
|
||||
* sqrt(
|
||||
coupling_n[1]**2 * factor_n[1]
|
||||
+ coupling_d[1]**2 * factor_d[1]
|
||||
)**2
|
||||
)
|
||||
"""
|
||||
|
||||
result += (
|
||||
sqrt(self.settings.scattering_factor[index])
|
||||
* self.settings.power_in
|
||||
/ self.settings.power_out
|
||||
* (
|
||||
sqrt(coupling_n[1]) * sqrt(factor_n[1])
|
||||
+ sqrt(coupling_d[1]) * sqrt(factor_d[1])
|
||||
coupling_n[1] * factor_n[1]
|
||||
+ coupling_d[1] * factor_d[1]
|
||||
)
|
||||
)
|
||||
return array(
|
||||
|
@ -282,13 +322,41 @@ class Analyzer:
|
|||
]
|
||||
)
|
||||
|
||||
def low_pass_filter(self, signal, cutoff):
|
||||
def low_pass_filter(
|
||||
self, signal: NDArray, cutoff: float
|
||||
) -> NDArray:
|
||||
"""
|
||||
Cut higher frequencies than cutoff for a given temporal signal
|
||||
"""
|
||||
if self.settings.verbose:
|
||||
self.console.log("filtering with a low pass filter")
|
||||
sample_spacing = signal[0, 1] - signal[0, 0]
|
||||
freq_signal = array(fft(signal[1]))
|
||||
frequencies = fftfreq(signal[1].shape[0], sample_spacing)
|
||||
index_to_remove = where(abs(frequencies) > cutoff)
|
||||
freq_signal[index_to_remove] = 0
|
||||
return array([signal[0], real(array(ifft(freq_signal)))])
|
||||
return array(
|
||||
[
|
||||
real(
|
||||
array(
|
||||
ifft(
|
||||
array(
|
||||
[
|
||||
frequencies,
|
||||
conj(flip(frequencies)),
|
||||
]
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
real(
|
||||
array(
|
||||
ifft(
|
||||
array(
|
||||
[freq_signal, conj(flip(freq_signal))]
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
]
|
||||
)
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
from pathlib import Path
|
||||
from rich.console import Console
|
||||
|
||||
|
||||
class Settings:
|
||||
def __init__(self, options, console):
|
||||
def __init__(self, options: list, console: Console):
|
||||
self.version = "0.0.1"
|
||||
self.help = (
|
||||
"[main]display[/main] [option]\\[options][/option]"
|
||||
|
@ -13,14 +14,15 @@ class Settings:
|
|||
"\n [argument]-V --version[/argument] [description]print version number and exit[/description]"
|
||||
)
|
||||
self.verbose = False
|
||||
self.bench = "SWEB"
|
||||
self.bench = "SDB1"
|
||||
self.date = "2023_03_24"
|
||||
self.folder = Path("/home/demagny/data")
|
||||
self.wavelength = 1.064e-6 # m
|
||||
|
||||
self.calib_bench = 1.15
|
||||
self.calib_mirror = 1.15
|
||||
self.scattering_factor = [1e-17, 0] # parameter to change
|
||||
self.scattering_factor = [1e-3, 0] # parameter to change
|
||||
self.vibration_frequency = 0.2
|
||||
|
||||
index = 0
|
||||
while index < len(options):
|
||||
|
@ -127,7 +129,7 @@ class Settings:
|
|||
self.power_in = 23 # W
|
||||
self.power_out = 8e-3 # W
|
||||
|
||||
def bench_file(self):
|
||||
def bench_file(self) -> Path:
|
||||
return self.folder / (
|
||||
"{bench}_{date}_ben.csv".format(
|
||||
bench=self.bench,
|
||||
|
@ -135,7 +137,7 @@ class Settings:
|
|||
)
|
||||
)
|
||||
|
||||
def mirror_file(self):
|
||||
def mirror_file(self) -> Path:
|
||||
return self.folder / (
|
||||
"{bench}_{date}_mir.csv".format(
|
||||
bench=self.bench,
|
||||
|
@ -143,7 +145,7 @@ class Settings:
|
|||
)
|
||||
)
|
||||
|
||||
def data_file(self):
|
||||
def data_file(self) -> Path:
|
||||
return self.folder / (
|
||||
"{bench}_{date}_dat.csv".format(
|
||||
bench=self.bench,
|
||||
|
@ -151,7 +153,7 @@ class Settings:
|
|||
)
|
||||
)
|
||||
|
||||
def reference_file(self):
|
||||
def reference_file(self) -> Path:
|
||||
return self.folder / (
|
||||
"{bench}_{date}_ref.csv".format(
|
||||
bench=self.bench,
|
||||
|
@ -159,8 +161,8 @@ class Settings:
|
|||
)
|
||||
)
|
||||
|
||||
def modelisation_file(self):
|
||||
def modelisation_file(self) -> Path:
|
||||
return self.folder / (self.modelisation)
|
||||
|
||||
def coupling_name(self):
|
||||
def coupling_name(self) -> str:
|
||||
return "{bench}coupling".format(bench=self.bench)
|
||||
|
|
Loading…
Reference in a new issue