Compare commits

..

10 commits

Author SHA1 Message Date
5acd52c1ae
Remove old visualization 2023-08-30 13:50:21 +02:00
93d9a94c67
Finish indexes finding of peaks 2023-08-30 13:49:43 +02:00
b85c2d69d6
Update how to find start and stop position of a peak 2023-08-30 01:49:51 +02:00
08e3b850e5
Update how to find start and stop position of a peak 2023-08-30 00:17:13 +02:00
2a072790ea
Update plate compression and add isolation 2023-08-28 21:53:21 +02:00
9c4ec55617
Update minimum size of compressed image 2023-08-28 10:57:16 +02:00
8d09799726
Add fit functions 2023-08-25 17:26:58 +02:00
e75bdddab3
Update find_point
- Update default theshold
- Update how it works
2023-08-25 17:25:33 +02:00
8b604d8ae2
Update to support i18n & logging
- Add --log-file option
- Add internationalization support with gettext
- Add logging featur
- Remove verbosity flag
- Update string formatting
- Fix Plate::set_border method
2023-08-25 17:24:12 +02:00
d51bd3ec9c
Add options & Update to support i18n 2023-08-25 17:22:09 +02:00
6 changed files with 558 additions and 148 deletions

View file

@ -0,0 +1,10 @@
from classes.science.border import Border
class CalibrationSpectrum:
"""
Define spectrum border
"""
def __init__( self , up = Border() , down = Border ):
self.up = up
self.down = down

View file

@ -1,7 +1,14 @@
from numpy import ndarray, argmax, max, quantile, arange, where, convolve, ones from numpy import ndarray, ones, argmax, arange, arctan, tan, pi, mean, max, min
from scipy.optimize import curve_fit from scipy.optimize import curve_fit
from scipy.signal import convolve, find_peaks
from scipy.ndimage import rotate
from classes.science.border import Border from classes.science.border import Border
from function.utils import find_point, fill from classes.science.calibration_spectrum import CalibrationSpectrum
from function.utils import find_point, fill, find_peak_low_high
from function.fit import linear
from logging import getLogger
from gettext import gettext as _
class Plate: class Plate:
""" """
@ -10,62 +17,43 @@ class Plate:
def __init__( self , data ): def __init__( self , data ):
if not isinstance( data , ndarray ): if not isinstance( data , ndarray ):
raise TypeError( 'data must be a ndarray' ) raise TypeError( _( 'data must be a ndarray' ) )
if len( data.shape ) != 2:
raise ValueError( _( 'data must be a 2d matrix' ) )
self.data = data self.data = data
self.set_border() self.set_border()\
.rotate()\
.set_spectrum()\
.set_calibration_spectrum()
def set_border( self , factor = 10 ): def compress( self ):
""" """
Set current border (without area outside the plate) Compress the plate data to fit the biggest dimension in a 2000
pixels axis and the smallest in a 200 pixels axis at minimum.
Return the compressed data and the compression factor used.
""" """
compressed = self.compress( factor ) min_factor = max( self.data.shape ) // 2000 # min factor to have a side
# with a maximum of 1000 pixels
points = self.get_points( compressed ) max_factor = min( self.data.shape ) // 200 # max factor to have
self.border = Border() # a side with a minimum of 100 pixel
self.border.x.min = 0 if min_factor < max_factor:
self.border.x.max = compressed.shape[1] - 1 factor = int( mean( ( max_factor , min_factor ) ) )
self.border.y.min = 0 else: # the smallest side will be less than 100 pixels with the
self.border.y.max = compressed.shape[0] - 1 # minimum compression factor
logger = getLogger( 'naroo reader' )
extremum = [] logger.warning(
_( (
x_half = compressed.shape[1] // 2 'slow compression: ratio between height and width'
y_half = compressed.shape[0] // 2 ' is greater than 10 ({ratio:.2f})'
for index in range( len( points ) ): ) ).format(
point = points[ index ] ratio = max( self.size() ) / min( self.size() )
point[0] -= int( compressed.shape[0] == point[0] ) # keep in )
point[1] -= int( compressed.shape[1] == point[1] ) # range
taken_points = fill(
compressed,
point ,
1000 , # intensity threshold
) )
factor = max_factor
x = [ taken_point[1] for taken_point in taken_points ] return self.data[
y = [ taken_point[0] for taken_point in taken_points ] : : factor,
: : factor,
if max( x ) < x_half: ] , factor
if self.border.x.min < max( x ):
self.border.x.min = max( x ) # biggest min
elif min( x ) > x_half: # elif to only accept one side
if self.border.x.max > min( x ):
self.border.x.max = min( x ) # smallest max
elif max( y ) < y_half:
if self.border.y.min < max( y ):
self.border.y.min = max( y ) # same
elif min( y ) > y_half:
if self.border.y.max > min( y ):
self.border.y.max = min( y ) # same
offset = 3
self.border.x.min += offset
self.border.y.min += offset
self.border.x.max -= offset
self.border.y.min -= offset
self.border.scale( factor )
def get_points( self , compressed ): def get_points( self , compressed ):
first_column = find_point( first_column = find_point(
@ -116,36 +104,194 @@ class Plate:
return first_column + last_column + first_line + last_line return first_column + last_column + first_line + last_line
def compress( self , factor ):
return self.data[
: : factor,
: : factor,
]
def rotate( self ): def rotate( self ):
""" """
Auto-rotate to be vertically and horizontally aligned Auto-rotate to be vertically and horizontally aligned
""" """
maxes = max( indexes_max = argmax(
self.data[ self.border.slice() ], convolve(
axis = 0 , self.data[
) 1 * self.border.y.size() // 4:
indexes = where( 3 * self.border.y.size() // 4,
maxes > quantile( maxes , 0.5 ) 1 * self.border.x.size() // 4:
3 * self.border.x.size() // 4
] ,
ones( ( 500 , 1 ) ),
'valid' ,
) ,
axis = 0,
) )
abciss = arange( abciss = arange(
self.border.x.min, 1 * self.border.x.size() // 4,
self.border.x.max 3 * self.border.x.size() // 4
)[ indexes ] )
indexes_max = argmax( fit_result = curve_fit(
self.data[ self.border.slice() ], linear ,
axis = 0 , abciss ,
)[ indexes ] indexes_max,
indexes_max = convolve( )[0]
indexes_max ,
ones( 100 ) , angle = arctan( fit_result[0] ) * pi / 180 # rad
'same' , diff = int( # adjust height border
) / 100 tan( angle ) * ( self.border.x.size() )
)
self.data = rotate(
self.data,
angle ,
)
self.border.y.min -= diff
self.border.y.max -= diff
return self
def set_border( self ):
"""
Set current border (without area outside the plate)
"""
compressed , factor = self.compress()
points = self.get_points( compressed )
self.border = Border()
self.border.x.min = 0
self.border.x.max = compressed.shape[1] - 1
self.border.y.min = 0
self.border.y.max = compressed.shape[0] - 1
extremum = []
x_half = compressed.shape[1] // 2
y_half = compressed.shape[0] // 2
for index in range( len( points ) ):
point = points[ index ]
point[0] -= int( compressed.shape[0] == point[0] ) # keep in
point[1] -= int( compressed.shape[1] == point[1] ) # range
taken_points = fill(
compressed,
point ,
2000 , # intensity threshold
)
x = [ taken_point[1] for taken_point in taken_points ]
y = [ taken_point[0] for taken_point in taken_points ]
if len( x ) > 5 and len( y ) > 5:
if max( x ) < x_half:
if self.border.x.min < max( x ):
self.border.x.min = max( x ) # biggest min
elif min( x ) > x_half:
# elif to only accept one side
if self.border.x.max > min( x ):
self.border.x.max = min( x ) # smallest max
elif max( y ) < y_half:
if self.border.y.min < max( y ):
self.border.y.min = max( y ) # same
elif min( y ) > y_half:
if self.border.y.max > min( y ):
self.border.y.max = min( y ) # same
offset = 3
self.border.x.min += offset
self.border.y.min += offset
self.border.x.max -= offset
self.border.y.min -= offset
self.border.scale( factor )
return self
def set_calibration_spectrum( self ):
"""
Set calibration sprectrum area
"""
self.calibration_spectrum = CalibrationSpectrum()
def indicator( list_ , matrix ):
"""
Define an indicator which define if the horizontal slice has
a chance to be a part of a calibration
"""
avg = mean( matrix )
if mean( list_ ) > 0.75 * avg:
return 0
if mean( list_ ) < 0.25 * avg:
return 1
positions = where( list_ > mean( list_ ) )[0]
if len( positions ) < 100:
return 2
if len( positions ) > 400:
return 3
distance = mean( positions[ 1 : ] - positions[ : - 1 ] )
if distance < 10:
return 4
return 10
list_ = [
indicator(
self.data[
i ,
self.border.slice()[1],
] ,
self.data[ self.border.slice() ],
) for i in range(
self.border.y.min,
self.border.y.max,
)
]
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
plt.plot( abciss , indexes_max ) plt.plot( list_ )
plt.show() plt.show()
return self
def set_spectrum( self ):
"""
Set spectrum area
"""
self.spectrum = Border()
list_ = convolve(
mean(
self.data[ self.border.slice() ],
axis = 1 ,
) ,
ones( 200 ),
'valid' ,
)
indexes = find_peak_low_high(
list_ ,
( max( list_ ) + mean( list_ ) ) / 2,
)[0]
self.spectrum.y.min = indexes[0] + self.border.y.min + 100
self.spectrum.y.max = indexes[1] + self.border.y.min + 100
import matplotlib.pyplot as plt
plt.imshow( self.data[ self.border.slice() ] , aspect = 'auto' )
plt.show()
list_ = convolve(
mean(
self.data[ self.border.slice() ],
axis = 0 ,
) ,
ones( 200 ),
'valid' ,
)
indexes = find_peak_low_high(
list_ ,
mean( list_ ) + max( list_ ) / 2,
)[0]
self.spectrum.x.min = indexes[0] + self.border.x.min + 100
self.spectrum.x.max = indexes[1] + self.border.x.min + 100
return self
def size( self ):
"""
get plate size
"""
return self.data.shape

View file

@ -1,6 +1,14 @@
from warnings import warn from sys import stdout, argv, executable
from gettext import gettext as _
from logging import getLogger, StreamHandler, FileHandler, Formatter
from pathlib import Path from pathlib import Path
_formatter = Formatter(
fmt = '${levelname}:${name}:${message}',
style = '$' ,
)
_program_cmd = executable + ' ' + argv[0]
class Settings: class Settings:
""" """
Settings manager Settings manager
@ -17,12 +25,16 @@ class Settings:
self.set_no_cache() self.set_no_cache()
self.set_output() self.set_output()
self.set_verbose() self.set_verbose()
self.set_log_file()
self.set_wave_cal() self.set_wave_cal()
# configuration change # configuration change
if len( arguments ) < 1: if len( arguments ) < 1:
raise Exception( raise Exception(
'type \'' + __file__ + ' -h\' for more information' _( (
'no argument given, type \'{program} -g\' for more'
' information'
) ).format( program = _program_cmd )
) )
index = 0 index = 0
@ -31,22 +43,25 @@ class Settings:
if argument[0] == '-': if argument[0] == '-':
if len( argument ) < 2: if len( argument ) < 2:
raise Exception( raise Exception(
'unknown argument, type \' + __file__ + \' -h' + _( (
' for more information' 'unknown argument {argument}, type '
'\'{program} -h\' for more information'
) ).format(
program = _program_cmd,
argument = argument ,
)
) )
if argument[1] != '-': if argument[1] != '-':
if argument == '-h': if argument == '-h':
argument = '--help' argument = '--help'
elif argument == '-V': elif argument == '-V':
argument = '--version' argument = '--version'
elif argument == '-v':
argument = '--verbose'
elif argument == '-n': elif argument == '-n':
argument = '--no-cache' argument = '--no-cache'
elif argument == '-c': elif argument == '-c':
if index == len( arguments ) - 1: if index == len( arguments ) - 1:
raise Exception( raise Exception(
'cache have to take a value' _( 'cache have to take a value' )
) )
arguments[ index + 1 ] = '--cache=' + \ arguments[ index + 1 ] = '--cache=' + \
arguments[ index + 1 ] arguments[ index + 1 ]
@ -55,7 +70,7 @@ class Settings:
elif argument == '-o': elif argument == '-o':
if index == len( arguments ) - 1: if index == len( arguments ) - 1:
raise Exception( raise Exception(
'output have to take a value' _( 'output have to take a value' )
) )
arguments[ index + 1 ] = '--output=' + \ arguments[ index + 1 ] = '--output=' + \
arguments[ index + 1 ] arguments[ index + 1 ]
@ -64,7 +79,10 @@ class Settings:
elif argument == '-w': elif argument == '-w':
if index == len( arguments ) - 1: if index == len( arguments ) - 1:
raise Exception( raise Exception(
'wavelength calibration have to take a value' _( (
'wavelength calibration have to take'
' a value'
) )
) )
arguments[ index + 1 ] = '--wavelength=' + \ arguments[ index + 1 ] = '--wavelength=' + \
arguments[ index + 1 ] arguments[ index + 1 ]
@ -73,17 +91,44 @@ class Settings:
elif argument == '-i': elif argument == '-i':
if index == len( arguments ) - 1: if index == len( arguments ) - 1:
raise Exception( raise Exception(
'intensity calibration have to take a value' _( (
'intensity calibration have to take'
' a value'
) )
) )
arguments[ index + 1 ] = '--intensity=' + \ arguments[ index + 1 ] = '--intensity=' + \
arguments[ index + 1 ] arguments[ index + 1 ]
index += 1 index += 1
continue continue
elif argument == '-l':
if index == len( arguments ) - 1:
raise Exception(
_( (
'log file have to take a value'
) )
)
arguments[ index + 1 ] = '--log-file=' + \
arguments[ index + 1 ]
index += 1
continue
elif argument == '-v':
if index == len( arguments ) - 1:
raise Exception(
_( 'verbosity level have to take a value' )
)
arguments[ index + 1 ] = '--verbose=' + \
arguments[ index + 1 ]
index += 1
continue
else: else:
raise Exception( raise Exception(
'unknown argument "' + argument + \ _( (
'", type \'' + __file__ + \ 'unknown argument {argument}, type'
' -h\' for more information' '\'{program} -h\' for more information'
) ).format(
program = _program_cmd,
argument = argument ,
)
) )
if argument[1] == '-': # not elif because argument if argument[1] == '-': # not elif because argument
# can change in the last if # can change in the last if
@ -93,10 +138,13 @@ class Settings:
elif argument == '--version': elif argument == '--version':
print( self.version() ) print( self.version() )
exit() exit()
elif argument == '--verbose':
self.set_verbose( True )
elif argument == '--no-cache': elif argument == '--no-cache':
self.set_no_cache( True ) self.set_no_cache( True )
elif (
len( argument ) > 10 and
argument[ : 10 ] == '--verbose='
):
self.set_verbose( argument[ 10 : ] )
elif ( elif (
len( argument ) > 8 and len( argument ) > 8 and
argument[ : 8 ] == '--cache=' argument[ : 8 ] == '--cache='
@ -117,18 +165,34 @@ class Settings:
argument[ : 12 ] == '--intensity=' argument[ : 12 ] == '--intensity='
): ):
self.set_inte_cal( argument[ 12 : ] ) self.set_inte_cal( argument[ 12 : ] )
elif (
len( argument ) > 11 and
argument[ : 11 ] == '--log-file='
):
self.set_log_file( argument[ 11 : ] )
else: else:
raise Exception( raise Exception(
'unknown argument "' + argument + \ _( (
'", type \'' + __file__ + ' -h\' ' + \ 'unknown argument {argument}, type'
'for more information' '\'{program} -h\' for more information'
) )
) )
else: else:
self.set_input( argument ) self.set_input( argument )
index += 1 index += 1
logger = getLogger( 'naroo reader' )
if not logger.hasHandlers():
handler = StreamHandler( stdout )
handler.setFormatter(
_formatter,
)
logger.addHandler(
handler,
)
if self.input == None: if self.input == None:
raise Exception( 'input should be given' ) raise Exception( _( 'input should be given' ) )
def set_cache( self , cache = None ): def set_cache( self , cache = None ):
""" """
@ -142,7 +206,7 @@ class Settings:
self.cache = cache self.cache = cache
else: else:
raise TypeError( raise TypeError(
'cache should be a path' _( 'cache should be a path' )
) )
def set_input( self , input = None ): def set_input( self , input = None ):
""" """
@ -156,11 +220,13 @@ class Settings:
self.input = input self.input = input
else: else:
raise TypeError( raise TypeError(
'input should be a path' _( 'input should be a path' )
) )
if self.input != None and not self.input.is_file(): if self.input != None and not self.input.is_file():
raise IOError( raise IOError(
'could not open ' + str( self.input ) _( 'could not open {input}' ).format(
input = self.input,
)
) )
def set_inte_cal( self , intensity_calibration = None ): def set_inte_cal( self , intensity_calibration = None ):
@ -175,11 +241,41 @@ class Settings:
self.inte_cal = intensity_calibration self.inte_cal = intensity_calibration
else: else:
raise TypeError( raise TypeError(
'intensity calibration should be a path' _( 'intensity calibration should be a path' )
) )
if self.inte_cal != None and not self.inte_cal.is_file(): if self.inte_cal != None and not self.inte_cal.is_file():
raise IOError( raise IOError(
'could not open ' + str( self.inte_cal ) _( 'could not open {intensity_file}' ).format(
intensity_file = self.inte_cal,
)
)
def set_log_file( self , log_file = None ):
"""
Setter for log file (None or path)
"""
if isinstance( log_file , str ):
self.log_file = Path( log_file )
elif isinstance( log_file , Path ):
self.log_file = log_file
elif log_file == None:
self.log_file = log_file
else:
raise TypeError(
_( 'log file should be a path' )
)
if self.log_file != None:
logger = getLogger( 'naroo reader' )
handler = FileHandler(
filename = str( log_file ),
mode = 'a' ,
)
handler.setFormatter(
_formatter,
)
logger.addHandler(
handler,
) )
def set_no_cache( self , no_cache = False ): def set_no_cache( self , no_cache = False ):
@ -190,7 +286,7 @@ class Settings:
self.no_cache = no_cache self.no_cache = no_cache
else: else:
raise TypeError( raise TypeError(
'no_cache option should be a boolean' _( 'no_cache option should be a boolean' )
) )
def set_output( self , output = 'data.fits' ): def set_output( self , output = 'data.fits' ):
@ -203,25 +299,42 @@ class Settings:
self.output = output self.output = output
else: else:
raise TypeError( raise TypeError(
'output should be a path' _( 'output should be a path' )
) )
if self.output.is_file(): if self.output.is_file():
warn( logger = getLogger( 'naroo reader' )
str( self.output ) + ' already exists, it will be ' + logger.warning(
'overwritten', _(
UserWarning , '{output} already exists, it will be overwritten'
).format( output = self.output )
) )
def set_verbose( self , verbose = False ): def set_verbose( self , verbose = 'WARNING' ):
""" """
Setter for verbosity flag (bool) Setter for verbose level (string)
""" """
if isinstance( verbose , bool ): if isinstance( verbose , str ):
self.verbose = verbose self.verbose = verbose.upper()
if verbose not in [
'CRITICAL',
'ERROR' ,
'WARNING' ,
'INFO' ,
'DEBUG' ,
'NOTSET' ,
]:
raise ValueError(
_( (
'verbose level must be one of the listed ones'
'in python documentation'
) )
)
else: else:
raise TypeError( raise TypeError(
'verbose should be a boolean' _( 'verbose level should be a string' )
) )
logger = getLogger( 'naroo reader' )
logger.setLevel( self.verbose )
def set_wave_cal( self , wavelength_calibration = None ): def set_wave_cal( self , wavelength_calibration = None ):
""" """
@ -235,40 +348,40 @@ class Settings:
self.wave_cal = wavelength_calibration self.wave_cal = wavelength_calibration
else: else:
raise TypeError( raise TypeError(
'wavelength calibration should be a path' _( 'wavelength calibration should be a path' )
) )
if self.wave_cal != None and not self.wave_cal.is_file(): if self.wave_cal != None and not self.wave_cal.is_file():
raise IOError( raise IOError(
'could not open ' + str( self.wave_cal ) _( 'could not open {wavelength_file}' ).format(
wavelength_file = self.wave_cal
)
) )
def help( self ): def help( self ):
return '\ return 'naroo_reader [options...] input\
naroo_reader [options...] input\
\n -w wavelength wavelength calibration file, default to None.\
\n None means no wavelength interpolation\
\n -i intensity intensity calibration file, default to None.\
\n None means no intensity interpolation\
\n -c --cache use given cache file to store new temporary\ \n -c --cache use given cache file to store new temporary\
\n data or use old ones.\ \n data or use old ones.\
\n -h --help show this help and quit\ \n -h --help show this help and quit.\
\n -i --intensity intensity calibration file, default to None.\
\n None means no intensity interpolation\
\n -l --log-file file where to log event.\
\n -n --no-cache do not use already existing cache file.\ \n -n --no-cache do not use already existing cache file.\
\n If cache is defined, the cache file will be\ \n If cache is defined, the cache file will be\
\n overwrited.\ \n overwrited.\
\n If cache is None, it does nothing\ \n If cache is None, it does nothing.\
\n -o --output Output file. Default to data.fits\ \n -o --output Output file. Default to data.fits.\
\n -V --version show version number and quit\ \n -V --version show version number and quit.\
\n -v --verbose show more information to help debugging\ \n -v --verbose set logging level help debugging.\
' \n -w wavelength wavelength calibration file, default to None.\
\n None means no wavelength interpolation.'
def __str__( self ): def __str__( self ):
return '\ return 'current settings:\
current settings:\
\n cache: ' + str( self.cache ) + '\ \n cache: ' + str( self.cache ) + '\
\n input: ' + str( self.input ) + '\ \n input: ' + str( self.input ) + '\
\n intensity calibration: ' + str( self.inte_cal ) + '\ \n intensity calibration: ' + str( self.inte_cal ) + '\
\n no cache flag: ' + str( self.no_cache ) + '\ \n no cache flag: ' + str( self.no_cache ) + '\
\n output: ' + str( self.output ) + '\ \n output: ' + str( self.output ) + '\
\n verbose flag: ' + str( self.verbose ) + '\ \n logging level: ' + str( self.verbose ) + '\
\n wavelength calibration: ' + str( self.wave_cal ) + '\ \n log-file: ' + str( self.log_file ) + '\
' \n wavelength calibration: ' + str( self.wave_cal )

2
function/fit.py Normal file
View file

@ -0,0 +1,2 @@
def linear( x , a , b ):
return a * x + b

View file

@ -1,4 +1,3 @@
import cv2
import numpy as np import numpy as np
def check_side( data , point , tolerance ): def check_side( data , point , tolerance ):
@ -65,7 +64,7 @@ def point( index_1 , index_2 , axis = 'x' ):
return [ index_2 , index_1 ] return [ index_2 , index_1 ]
return [ index_1 , index_2 ] return [ index_1 , index_2 ]
def find_point( list_ , index , axis = 'x' , threshold = 0.5 ): def find_point( list_ , index , axis = 'x' , threshold = 0.95 ):
""" """
find the index where to fill in a side find the index where to fill in a side
""" """
@ -80,22 +79,21 @@ def find_point( list_ , index , axis = 'x' , threshold = 0.5 ):
if not isinstance( threshold , float ): if not isinstance( threshold , float ):
raise ValueError( 'threshold must be a float, ' + type( threshold ) + ' given' ) raise ValueError( 'threshold must be a float, ' + type( threshold ) + ' given' )
mean = np.mean( list_ )
ampl = np.max( list_ ) - np.min( list_ ) ampl = np.max( list_ ) - np.min( list_ )
if ampl < mean / 2: if ampl < np.mean( list_ ) / 2:
return [ point( index , 0 , axis ) ] return [ point( index , 0 , axis ) ]
else: else:
points = [] points = []
list_ = np.convolve( list_ , np.ones( 100 ) , 'same' ) list_ = list_.copy()
list_ -= np.min( list_ ) list_ -= np.min( list_ )
list_ /= np.max( list_ ) list_ /= np.max( list_ )
i , inside , size = 0 , False , 0 i , inside , size = 0 , False , 0
while i < len( list_ ): while i < len( list_ ):
if list_[ i ] > threshold and not inside: if list_[ i ] > threshold and not inside:
points.append( point( index , i , axis ) ) points.append( point( index , i, axis ) )
inside = True inside = True
size = 0 size = 0
elif list_[ i ] < threshold and inside: elif list_[ i ] < threshold and inside:
@ -154,14 +152,6 @@ def last_same_value( list_ ):
raise ValueError( 'list_ must be a list, ' + type( list_ ) + ' given' ) raise ValueError( 'list_ must be a list, ' + type( list_ ) + ' given' )
value = list_[0] value = list_[0]
return np.argwhere( list_ == value ).max() return np.argwhere( list_ == value ).max()
def rotate( image , angle ):
"""
rotate the following image by the given angle
"""
height , width = image.shape[ : 2 ]
cX , cY = ( width // 2 , height // 2 )
matrix = cv2.getRotationMatrix2D( ( cX , cY ) , angle , 1 )
return cv2.warpAffine( image , matrix , ( width , height ) , flags = cv2.INTER_NEAREST )
def retrieve_peaks( data , window_size = 5 , error_coef = 1.05 , max_window_size = 30 , min_successive = 2 ): def retrieve_peaks( data , window_size = 5 , error_coef = 1.05 , max_window_size = 30 , min_successive = 2 ):
""" """
get peak position from a 1D data get peak position from a 1D data
@ -217,3 +207,154 @@ def near_value( list_ , value ):
) # interpolation ) # interpolation
index = np.append( index , np.where( list_ == value ) ) index = np.append( index , np.where( list_ == value ) )
return np.round( np.sort( index ) ).astype( int ) # triage return np.round( np.sort( index ) ).astype( int ) # triage
def find_peak_low_high( list_ , value ):
"""
Return index of start and end of rise and descent of peaks crossing
a given value in a list
"""
indexes = near_value(
list_,
value,
)
old_list_ = list_
list_ = np.gradient( list_ )
if list_[ indexes[0] ] < 0:
indexes.insert( 0 , 0 )
if list_[ indexes[0] ] < 0:
indexes.insert( 0 , 0 ) # start with a descent
if list_[ indexes[-1] ] > 0:
indexes.append( len( list_ ) - 1 )
if list_[ indexes[-1] ] > 0:
indexes.append( len( list_ ) - 1 ) # end with a rise
if len( indexes ) % 2 == 1:
raise Exception(
'number of peaks doesn\'t match what it should be'
)
rises = [
indexes[ i ] for i in range( 0 , len( indexes ) , 2 )
]
descents = [
indexes[ i ] for i in range( 1 , len( indexes ) , 2 )
]
i = 0
start_rise = np.where( list_[ : rises[i] ] < 0 )
end_rise = rises[i] + np.where(
list_[ rises[i] : descents[i] ] < 0
)
start_descent = rises[i] + np.where(
list_[ rises[i] : descents[i] ] > 0
)
if len( rises ) == 1:
end_descent = descents[i] + np.where(
list_[ descents[i] : ] > 0
)
else:
end_descent = descents[i] + np.where(
list_[ descents[i] : rises[i + 1] ] > 0
)
if len( start_rise[0] ) == 0:
rise_starts = [ 0 ]
else:
rise_starts = [ start_rise[0][-1] ]
if len( end_rise[0] ) == 0:
rise_ends = [ rise_starts[0] ] # if first is a descent
else:
rise_ends = [ end_rise[0][0] ]
if len( start_descent[0] ) == 0:
descent_starts = [ rise_ends[0] ] # same
else:
descent_starts = [ start_descent[0][-1] ]
if len( end_descent[0] ) == 0: # edge case:
descent_ends = [ descent_starts[0] ] # one pixel decrease
else:
descent_ends = [ end_descent[0][0] ]
while i < len( rises ) - 2: # last is i == len( rises ) - 2, works
# if len( rises ) = 1 or 2
i += 1
start_rise = descents[i - 1 ] + np.where(
list_[ descents[i - 1] : rises[i] ] < 0
)
end_rise = rises[i] + np.where(
list_[ rises[i] : descents[i] ] < 0
)
start_descent = rises[i] + np.where(
list_[ rises[i] : descents[i] ] > 0
)
end_descent = descents[i] + np.where(
list_[ descents[i] : rises[i + 1] ] > 0
)
if len( start_rise[0] ) == 0:
rise_starts.append( descent_ends[-1] )
else:
rise_starts.append(
start_rise[0][-1] # last pixel that decrease
)
if len( end_rise[0] ) == 0:
rise_ends.append( rise_starts[-1] )
else:
rise_ends.append(
end_rise[0][0] # first pixel that decrease
)
if len( start_descent[0] ) == 0:
descent_starts.append( rises_ends[-1] )
else:
descent_starts.append(
start_descent[0][-1] # last pixel that increase
)
if len( end_descent[0] ) == 0:
descent_ends.append( descent_starts[-1] )
else:
descent_ends.append(
end_descent[0][0] # first pixel that increase
)
if i != 0 or len( rises ) != 1:
i += 1
start_rise = descents[i - 1] + np.where(
list_[ descents[i - 1] : rises[i] ] < 0
)
end_rise = rises[i] + np.where(
list_[ rises[i] : descents[i] ] < 0
)
start_descent = rises[i] + np.where(
list_[ rises[i] : descents[i] ] > 0
)
end_descent = descents[i] + np.where(
list_[ descents[i] : ] > 0
)
if len( start_rise[0] ) == 0:
rise_starts.append( descent_ends[-1] )
else:
rise_starts.append(
start_rise[0][-1] # last pixel that decrease
)
if len( end_rise[0] ) == 0:
rise_ends.append( rise_starts[-1] )
else:
rise_ends.append(
end_rise[0][0] # first pixel that decrease
)
if len( start_descent[0] ) == 0:
descent_starts.append( rises_ends[-1] )
else:
descent_starts.append(
start_descent[0][-1] # last pixel that increase
)
if len( end_descent[0] ) == 0:
descent_ends.append( descent_starts[-1] )
else:
descent_ends.append(
end_descent[0][0] # first pixel that increase
)
return [
rise_starts ,
rise_ends ,
descent_starts,
descent_ends ,
]

View file

@ -9,6 +9,4 @@ settings = Settings( arguments[ 1 : ] ) # remove the "main.py" part
hdul = open( settings.input ) hdul = open( settings.input )
plate = Plate( hdul[0].data ) plate = Plate( hdul[0].data )
plate.rotate()
hdul.close() hdul.close()