Compare commits

...

10 commits

Author SHA1 Message Date
2324943653
Update types & Add more signal generation for unit testing 2024-06-18 17:56:09 +02:00
d9f51de422
Update env name 2024-06-09 16:34:06 +02:00
cc6f311433
Small changes 2024-06-05 17:48:12 +02:00
641b8962eb
Add logger 2024-05-31 15:35:30 +02:00
6dabd05d97
Update developement statu 2024-05-31 13:53:42 +02:00
c9b28804da
Add project url 2024-05-31 13:52:11 +02:00
cb3828d1e2
Fix dependencies 2024-05-31 13:49:53 +02:00
0db3e2cba7
Update dependencies 2024-05-31 13:49:03 +02:00
d20a903050
Remove detrend 2024-05-31 13:46:06 +02:00
e7f71a61e1
Fix import 2024-05-27 17:40:30 +02:00
5 changed files with 110 additions and 21 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
env
.env .env
__pycache__/ __pycache__/

View file

@ -9,7 +9,7 @@ readme = "README.md"
requires-python = ">=3.8" requires-python = ">=3.8"
classifiers = [ classifiers = [
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Development Status :: 2 - Pre-Alpha", "Development Status :: 4 - Beta",
"Intended Audience :: Science/Research", "Intended Audience :: Science/Research",
"Operating System :: OS Independent", "Operating System :: OS Independent",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
@ -17,8 +17,13 @@ classifiers = [
] ]
dependencies = [ dependencies = [
"scipy", "scipy",
"numpy",
] ]
[project.urls]
Homepage = "https://git.linarphy.net/linarphy/science_signal"
Issues = "https://git.linarphy.net/linarphy/science_signal/issues"
[build-system] [build-system]
requires = ["hatchling", "hatch-gettext"] requires = ["hatchling", "hatch-gettext"]
build-backend = "hatchling.build" build-backend = "hatchling.build"
@ -31,6 +36,7 @@ show-report = true
[tool.ruff] [tool.ruff]
line-length = 72 line-length = 72
builtins = ["_"]
[tool.basedpyright] [tool.basedpyright]
venvPath = ".env" venvPath = ".env"

View file

@ -2,7 +2,11 @@ from numpy.typing import NDArray
from numpy import arange, float64 from numpy import arange, float64
from gettext import install from gettext import install
install("science_signal") from logging import getLogger
install(__name__)
logger = getLogger(__name__)
def interpolate_abciss(*signals: "Signal") -> NDArray[float64]: def interpolate_abciss(*signals: "Signal") -> NDArray[float64]:
@ -21,6 +25,7 @@ def interpolate(*signals: "Signal") -> tuple["Signal", ...]:
Return each signal with a common frequency/time range (smallest Return each signal with a common frequency/time range (smallest
range and highest rate) range and highest rate)
""" """
logger.debug(_("interpolate {n} signals").format(n=len(signals)))
splines = [signal.spline() for signal in signals] splines = [signal.spline() for signal in signals]
x_range = interpolate_abciss(*signals) x_range = interpolate_abciss(*signals)

View file

@ -1,4 +1,4 @@
from numpy import arange, pi, sin as np_sin from numpy import arange, pi, sign, sin as np_sin
from numpy.random import default_rng from numpy.random import default_rng
from science_signal.signal import Signal from science_signal.signal import Signal
@ -8,6 +8,7 @@ def sin(
rate: float = 10, rate: float = 10,
frequency: float = 1, frequency: float = 1,
amplitude: float = 1, amplitude: float = 1,
offset: float = 0,
phase: float = 0, phase: float = 0,
) -> Signal: ) -> Signal:
""" """
@ -16,10 +17,11 @@ def sin(
duration: duration of the signal in second duration: duration of the signal in second
rate: rate of the signal in Hz (s-1) rate: rate of the signal in Hz (s-1)
frequency: frequency of the wanted sinus frequency: frequency of the wanted sinus
offset: mean of the wanted sinus
phase: between 0 and 2 pi (if more, congruate) phase: between 0 and 2 pi (if more, congruate)
""" """
x = arange(0, duration, 1 / rate) x = arange(0, duration, 1 / rate)
y = amplitude * np_sin(2 * pi * frequency * x + phase) y = offset + amplitude * np_sin(2 * pi * frequency * x + phase)
return Signal( return Signal(
x, x,
@ -32,6 +34,7 @@ def cos(
rate: float = 10, rate: float = 10,
frequency: float = 1, frequency: float = 1,
amplitude: float = 1, amplitude: float = 1,
offset: float = 0,
phase: float = 0, phase: float = 0,
) -> Signal: ) -> Signal:
""" """
@ -42,7 +45,36 @@ def cos(
frequency: frequency of the wanted sinus frequency: frequency of the wanted sinus
phase: between 0 and 2 pi (if more, congruate) phase: between 0 and 2 pi (if more, congruate)
""" """
return sin(duration, rate, frequency, amplitude, phase + pi / 2) return sin(
duration, rate, frequency, amplitude, offset, phase + pi / 2
)
def square(
duration: float = 10,
rate: float = 10,
frequency: float = 1,
amplitude: float = 1,
offset: float = 0,
phase: float = 0,
) -> Signal:
"""
Generate square signal
duration: duration of the signal in second
rate: rate of the signal in Hz (s-1)
frequency: frequency of the wanted square signal
phase: between 0 and 2 pi (if more, congruate)
"""
x = arange(0, duration, 1 / rate)
y = offset + amplitude * sign(
np_sin(2 * pi * frequency * x + phase)
)
return Signal(
x,
y,
)
def gaussian_noise( def gaussian_noise(
@ -56,12 +88,36 @@ def gaussian_noise(
Generate a gaussian noise signal Generate a gaussian noise signal
duration: duration of the signal in second duration: duration of the signal in second
rate: rate of the signal in hz (s-1) rate: rate of the signal in Hz (s-1)
sigma: standard deviation of the wanted distribution sigma: standard deviation of the wanted distribution
mu: mean of the distribution mu: mean of the distribution
seed: optional, allowd to specify a seed for testing purpose
""" """
x = arange(0, duration, 1 / rate) x = arange(0, duration, 1 / rate)
return Signal( return Signal(
x, x,
default_rng(seed).normal(mu, sigma, len(x)), default_rng(seed).normal(mu, sigma, len(x)),
) )
def white_noise(
duration: float = 10,
rate: float = 10,
minimum: float = -1,
maximum: float = 1,
seed: None | int = None,
) -> Signal:
"""
Generate a white noise signal
duration: duration of the signal in seconds
rate: rate of the signal in hz (s-1)
minimum: minimum value of the noise
maximum: maximum value of the noise
seed: optional, allowd to specify a seed for testing purpose
"""
x = arange(0, duration, 1 / rate)
return Signal(
x,
default_rng(seed).uniform(minimum, maximum, len(x)),
)

View file

@ -1,4 +1,4 @@
from scipy.interpolate import CubicSpline # pyright: ignore[reportMissingTypeStubs] from scipy.interpolate import CubicSpline # type: ignore[reportMissingTypeStubs]
from numpy.typing import ArrayLike, NDArray from numpy.typing import ArrayLike, NDArray
from numpy import ( from numpy import (
append, append,
@ -6,14 +6,14 @@ from numpy import (
float64, float64,
linspace, linspace,
array, array,
logical_or, logical_and,
sin, sin,
where, where,
arange, arange,
zeros, zeros,
) )
from scipy.signal import detrend, welch from scipy.signal import detrend, welch # type: ignore[reportMissingTypeStubs]
from scipy.fft import rfftfreq, rfft, irfft from scipy.fft import rfftfreq, rfft, irfft # type: ignore[reportUnknownVariableType]
from science_signal import interpolate from science_signal import interpolate
@ -66,7 +66,9 @@ class Signal:
cos(self.y), cos(self.y),
) )
def cut(self, start: None | float = None, end: None | float = None) -> "Signal": def cut(
self, start: None | float = None, end: None | float = None
) -> "Signal":
""" """
Cut signal from a start x to an end x Cut signal from a start x to an end x
""" """
@ -74,7 +76,7 @@ class Signal:
start = min(self.x) start = min(self.x)
if end is None: if end is None:
end = max(self.x) end = max(self.x)
indexes = where(logical_and(self.x >= start, self.x <= end)) indexes = where(logical_and(self.x >= start, self.x <= end)) # type: ignore[reportOperatorIssue]
return Signal( return Signal(
self.x[indexes], self.x[indexes],
self.y[indexes], self.y[indexes],
@ -111,33 +113,52 @@ class Signal:
def filter( def filter(
self, start: None | float = None, end: None | float = None self, start: None | float = None, end: None | float = None
) -> "Signal": ) -> "Signal":
freq_x = rfftfreq(len(self), self.sampling) freq_x = rfftfreq(len(self), self.sampling) # type: ignore[reportUnknownVariableType]
freq_y = rfft(self.y) freq_y = rfft(self.y)
rate = 1 / (freq_x[1] - freq_x[0]) # type: ignore[reportUnknownVariableType]
if start is None: if start is None:
start = min(abs(freq_x)) start = min(abs(freq_x))
if end is None: if end is None:
end = max(abs(freq_x)) end = max(abs(freq_x))
index_to_remove = where( index_start = where(abs(freq_x) < start) # type: ignore[reportOperatorIssue]
logical_or(abs(freq_x) <= start, abs(freq_x) >= end) index_end = where(abs(freq_x) > end) # type: ignore[reportOperatorIssue]
if len(index_start) != 0:
"""
damping_start = 10 ** -( # type: ignore[reportUnknownVariableType]
arange(1, len(index_start[0]) + 1, 1, dtype=float64)
/ rate
) )
freq_y[index_to_remove] = 0 freq_y[index_start] = freq_y[index_start] * damping_start # type: ignore[reportIndexIssue]
"""
freq_y[index_start] = 0
if len(index_end) != 0:
"""
damping_end = 10 ** -( # type: ignore[reportUnknownVariableType]
arange(1, len(index_end[0]) + 1, 1, dtype=float64)
/ rate
)
freq_y[index_end] = freq_y[index_end] * damping_end # type: ignore[reportAny]
"""
freq_y[index_end] = 0
y = irfft(freq_y) y = irfft(freq_y)
return Signal( return Signal(
self.x[: len(y)], self.x[: len(y)],
y[: len(self.x)], y[: len(self.x)], # type: ignore[reportArgumentType]
) )
def psd(self, fft_length: int = 10) -> "Signal": def psd(self, fft_length: int = 10) -> "Signal":
""" """
Compute psd of a given signal Compute psd of a given signal
""" """
freq, psd = welch( freq, psd = welch( # type: ignore[reportUnknownVariableType]
self.y, self.y,
self.rate, self.rate,
nperseg=fft_length * self.rate, nperseg=fft_length * self.rate,
detrend="linear",
) )
return Signal(freq, psd) return Signal(freq, psd)
@ -189,7 +210,7 @@ class Signal:
# suppose same range but different rates # suppose same range but different rates
return self.operator_signal( return self.operator_signal(
Signal( Signal(
linspace(self.x[0], self.x[-1], len(others)), linspace(self.x[0], self.x[-1], len(other)),
array(other), array(other),
), ),
operator, operator,