Compare commits

..

No commits in common. "5acd52c1ae49544ed7e8eb50806a3753dbd32f6a" and "292ea9906999f2c8c895207391ee815729057c67" have entirely different histories.

6 changed files with 148 additions and 558 deletions

View file

@ -1,10 +0,0 @@
from classes.science.border import Border
class CalibrationSpectrum:
"""
Define spectrum border
"""
def __init__( self , up = Border() , down = Border ):
self.up = up
self.down = down

View file

@ -1,14 +1,7 @@
from numpy import ndarray, ones, argmax, arange, arctan, tan, pi, mean, max, min from numpy import ndarray, argmax, max, quantile, arange, where, convolve, ones
from scipy.optimize import curve_fit from scipy.optimize import curve_fit
from scipy.signal import convolve, find_peaks
from scipy.ndimage import rotate
from classes.science.border import Border from classes.science.border import Border
from classes.science.calibration_spectrum import CalibrationSpectrum from function.utils import find_point, fill
from function.utils import find_point, fill, find_peak_low_high
from function.fit import linear
from logging import getLogger
from gettext import gettext as _
class Plate: class Plate:
""" """
@ -17,43 +10,62 @@ class Plate:
def __init__( self , data ): def __init__( self , data ):
if not isinstance( data , ndarray ): if not isinstance( data , ndarray ):
raise TypeError( _( 'data must be a ndarray' ) ) raise TypeError( 'data must be a ndarray' )
if len( data.shape ) != 2:
raise ValueError( _( 'data must be a 2d matrix' ) )
self.data = data self.data = data
self.set_border()\ self.set_border()
.rotate()\
.set_spectrum()\
.set_calibration_spectrum()
def compress( self ): def set_border( self , factor = 10 ):
""" """
Compress the plate data to fit the biggest dimension in a 2000 Set current border (without area outside the plate)
pixels axis and the smallest in a 200 pixels axis at minimum.
Return the compressed data and the compression factor used.
""" """
min_factor = max( self.data.shape ) // 2000 # min factor to have a side compressed = self.compress( factor )
# with a maximum of 1000 pixels
max_factor = min( self.data.shape ) // 200 # max factor to have points = self.get_points( compressed )
# a side with a minimum of 100 pixel self.border = Border()
if min_factor < max_factor: self.border.x.min = 0
factor = int( mean( ( max_factor , min_factor ) ) ) self.border.x.max = compressed.shape[1] - 1
else: # the smallest side will be less than 100 pixels with the self.border.y.min = 0
# minimum compression factor self.border.y.max = compressed.shape[0] - 1
logger = getLogger( 'naroo reader' )
logger.warning( extremum = []
_( (
'slow compression: ratio between height and width' x_half = compressed.shape[1] // 2
' is greater than 10 ({ratio:.2f})' y_half = compressed.shape[0] // 2
) ).format( for index in range( len( points ) ):
ratio = max( self.size() ) / min( self.size() ) point = points[ index ]
point[0] -= int( compressed.shape[0] == point[0] ) # keep in
point[1] -= int( compressed.shape[1] == point[1] ) # range
taken_points = fill(
compressed,
point ,
1000 , # intensity threshold
) )
)
factor = max_factor x = [ taken_point[1] for taken_point in taken_points ]
return self.data[ y = [ taken_point[0] for taken_point in taken_points ]
: : factor,
: : factor, if max( x ) < x_half:
] , factor if self.border.x.min < max( x ):
self.border.x.min = max( x ) # biggest min
elif min( x ) > x_half: # elif to only accept one side
if self.border.x.max > min( x ):
self.border.x.max = min( x ) # smallest max
elif max( y ) < y_half:
if self.border.y.min < max( y ):
self.border.y.min = max( y ) # same
elif min( y ) > y_half:
if self.border.y.max > min( y ):
self.border.y.max = min( y ) # same
offset = 3
self.border.x.min += offset
self.border.y.min += offset
self.border.x.max -= offset
self.border.y.min -= offset
self.border.scale( factor )
def get_points( self , compressed ): def get_points( self , compressed ):
first_column = find_point( first_column = find_point(
@ -104,194 +116,36 @@ class Plate:
return first_column + last_column + first_line + last_line return first_column + last_column + first_line + last_line
def compress( self , factor ):
return self.data[
: : factor,
: : factor,
]
def rotate( self ): def rotate( self ):
""" """
Auto-rotate to be vertically and horizontally aligned Auto-rotate to be vertically and horizontally aligned
""" """
indexes_max = argmax( maxes = max(
convolve(
self.data[
1 * self.border.y.size() // 4:
3 * self.border.y.size() // 4,
1 * self.border.x.size() // 4:
3 * self.border.x.size() // 4
] ,
ones( ( 500 , 1 ) ),
'valid' ,
) ,
axis = 0,
)
abciss = arange(
1 * self.border.x.size() // 4,
3 * self.border.x.size() // 4
)
fit_result = curve_fit(
linear ,
abciss ,
indexes_max,
)[0]
angle = arctan( fit_result[0] ) * pi / 180 # rad
diff = int( # adjust height border
tan( angle ) * ( self.border.x.size() )
)
self.data = rotate(
self.data,
angle ,
)
self.border.y.min -= diff
self.border.y.max -= diff
return self
def set_border( self ):
"""
Set current border (without area outside the plate)
"""
compressed , factor = self.compress()
points = self.get_points( compressed )
self.border = Border()
self.border.x.min = 0
self.border.x.max = compressed.shape[1] - 1
self.border.y.min = 0
self.border.y.max = compressed.shape[0] - 1
extremum = []
x_half = compressed.shape[1] // 2
y_half = compressed.shape[0] // 2
for index in range( len( points ) ):
point = points[ index ]
point[0] -= int( compressed.shape[0] == point[0] ) # keep in
point[1] -= int( compressed.shape[1] == point[1] ) # range
taken_points = fill(
compressed,
point ,
2000 , # intensity threshold
)
x = [ taken_point[1] for taken_point in taken_points ]
y = [ taken_point[0] for taken_point in taken_points ]
if len( x ) > 5 and len( y ) > 5:
if max( x ) < x_half:
if self.border.x.min < max( x ):
self.border.x.min = max( x ) # biggest min
elif min( x ) > x_half:
# elif to only accept one side
if self.border.x.max > min( x ):
self.border.x.max = min( x ) # smallest max
elif max( y ) < y_half:
if self.border.y.min < max( y ):
self.border.y.min = max( y ) # same
elif min( y ) > y_half:
if self.border.y.max > min( y ):
self.border.y.max = min( y ) # same
offset = 3
self.border.x.min += offset
self.border.y.min += offset
self.border.x.max -= offset
self.border.y.min -= offset
self.border.scale( factor )
return self
def set_calibration_spectrum( self ):
"""
Set calibration sprectrum area
"""
self.calibration_spectrum = CalibrationSpectrum()
def indicator( list_ , matrix ):
"""
Define an indicator which define if the horizontal slice has
a chance to be a part of a calibration
"""
avg = mean( matrix )
if mean( list_ ) > 0.75 * avg:
return 0
if mean( list_ ) < 0.25 * avg:
return 1
positions = where( list_ > mean( list_ ) )[0]
if len( positions ) < 100:
return 2
if len( positions ) > 400:
return 3
distance = mean( positions[ 1 : ] - positions[ : - 1 ] )
if distance < 10:
return 4
return 10
list_ = [
indicator(
self.data[
i ,
self.border.slice()[1],
] ,
self.data[ self.border.slice() ],
) for i in range(
self.border.y.min,
self.border.y.max,
)
]
import matplotlib.pyplot as plt
plt.plot( list_ )
plt.show()
return self
def set_spectrum( self ):
"""
Set spectrum area
"""
self.spectrum = Border()
list_ = convolve(
mean(
self.data[ self.border.slice() ],
axis = 1 ,
) ,
ones( 200 ),
'valid' ,
)
indexes = find_peak_low_high(
list_ ,
( max( list_ ) + mean( list_ ) ) / 2,
)[0]
self.spectrum.y.min = indexes[0] + self.border.y.min + 100
self.spectrum.y.max = indexes[1] + self.border.y.min + 100
import matplotlib.pyplot as plt
plt.imshow( self.data[ self.border.slice() ] , aspect = 'auto' )
plt.show()
list_ = convolve(
mean(
self.data[ self.border.slice() ], self.data[ self.border.slice() ],
axis = 0 , axis = 0 ,
) ,
ones( 200 ),
'valid' ,
) )
indexes = find_peak_low_high( indexes = where(
list_ , maxes > quantile( maxes , 0.5 )
mean( list_ ) + max( list_ ) / 2, )
)[0] abciss = arange(
self.border.x.min,
self.spectrum.x.min = indexes[0] + self.border.x.min + 100 self.border.x.max
self.spectrum.x.max = indexes[1] + self.border.x.min + 100 )[ indexes ]
indexes_max = argmax(
return self self.data[ self.border.slice() ],
axis = 0 ,
def size( self ): )[ indexes ]
""" indexes_max = convolve(
get plate size indexes_max ,
""" ones( 100 ) ,
return self.data.shape 'same' ,
) / 100
import matplotlib.pyplot as plt
plt.plot( abciss , indexes_max )
plt.show()

View file

@ -1,14 +1,6 @@
from sys import stdout, argv, executable from warnings import warn
from gettext import gettext as _
from logging import getLogger, StreamHandler, FileHandler, Formatter
from pathlib import Path from pathlib import Path
_formatter = Formatter(
fmt = '${levelname}:${name}:${message}',
style = '$' ,
)
_program_cmd = executable + ' ' + argv[0]
class Settings: class Settings:
""" """
Settings manager Settings manager
@ -25,16 +17,12 @@ class Settings:
self.set_no_cache() self.set_no_cache()
self.set_output() self.set_output()
self.set_verbose() self.set_verbose()
self.set_log_file()
self.set_wave_cal() self.set_wave_cal()
# configuration change # configuration change
if len( arguments ) < 1: if len( arguments ) < 1:
raise Exception( raise Exception(
_( ( 'type \'' + __file__ + ' -h\' for more information'
'no argument given, type \'{program} -g\' for more'
' information'
) ).format( program = _program_cmd )
) )
index = 0 index = 0
@ -43,25 +31,22 @@ class Settings:
if argument[0] == '-': if argument[0] == '-':
if len( argument ) < 2: if len( argument ) < 2:
raise Exception( raise Exception(
_( ( 'unknown argument, type \' + __file__ + \' -h' +
'unknown argument {argument}, type ' ' for more information'
'\'{program} -h\' for more information'
) ).format(
program = _program_cmd,
argument = argument ,
)
) )
if argument[1] != '-': if argument[1] != '-':
if argument == '-h': if argument == '-h':
argument = '--help' argument = '--help'
elif argument == '-V': elif argument == '-V':
argument = '--version' argument = '--version'
elif argument == '-v':
argument = '--verbose'
elif argument == '-n': elif argument == '-n':
argument = '--no-cache' argument = '--no-cache'
elif argument == '-c': elif argument == '-c':
if index == len( arguments ) - 1: if index == len( arguments ) - 1:
raise Exception( raise Exception(
_( 'cache have to take a value' ) 'cache have to take a value'
) )
arguments[ index + 1 ] = '--cache=' + \ arguments[ index + 1 ] = '--cache=' + \
arguments[ index + 1 ] arguments[ index + 1 ]
@ -70,7 +55,7 @@ class Settings:
elif argument == '-o': elif argument == '-o':
if index == len( arguments ) - 1: if index == len( arguments ) - 1:
raise Exception( raise Exception(
_( 'output have to take a value' ) 'output have to take a value'
) )
arguments[ index + 1 ] = '--output=' + \ arguments[ index + 1 ] = '--output=' + \
arguments[ index + 1 ] arguments[ index + 1 ]
@ -79,10 +64,7 @@ class Settings:
elif argument == '-w': elif argument == '-w':
if index == len( arguments ) - 1: if index == len( arguments ) - 1:
raise Exception( raise Exception(
_( ( 'wavelength calibration have to take a value'
'wavelength calibration have to take'
' a value'
) )
) )
arguments[ index + 1 ] = '--wavelength=' + \ arguments[ index + 1 ] = '--wavelength=' + \
arguments[ index + 1 ] arguments[ index + 1 ]
@ -91,44 +73,17 @@ class Settings:
elif argument == '-i': elif argument == '-i':
if index == len( arguments ) - 1: if index == len( arguments ) - 1:
raise Exception( raise Exception(
_( ( 'intensity calibration have to take a value'
'intensity calibration have to take'
' a value'
) )
) )
arguments[ index + 1 ] = '--intensity=' + \ arguments[ index + 1 ] = '--intensity=' + \
arguments[ index + 1 ] arguments[ index + 1 ]
index += 1 index += 1
continue continue
elif argument == '-l':
if index == len( arguments ) - 1:
raise Exception(
_( (
'log file have to take a value'
) )
)
arguments[ index + 1 ] = '--log-file=' + \
arguments[ index + 1 ]
index += 1
continue
elif argument == '-v':
if index == len( arguments ) - 1:
raise Exception(
_( 'verbosity level have to take a value' )
)
arguments[ index + 1 ] = '--verbose=' + \
arguments[ index + 1 ]
index += 1
continue
else: else:
raise Exception( raise Exception(
_( ( 'unknown argument "' + argument + \
'unknown argument {argument}, type' '", type \'' + __file__ + \
'\'{program} -h\' for more information' ' -h\' for more information'
) ).format(
program = _program_cmd,
argument = argument ,
)
) )
if argument[1] == '-': # not elif because argument if argument[1] == '-': # not elif because argument
# can change in the last if # can change in the last if
@ -138,13 +93,10 @@ class Settings:
elif argument == '--version': elif argument == '--version':
print( self.version() ) print( self.version() )
exit() exit()
elif argument == '--verbose':
self.set_verbose( True )
elif argument == '--no-cache': elif argument == '--no-cache':
self.set_no_cache( True ) self.set_no_cache( True )
elif (
len( argument ) > 10 and
argument[ : 10 ] == '--verbose='
):
self.set_verbose( argument[ 10 : ] )
elif ( elif (
len( argument ) > 8 and len( argument ) > 8 and
argument[ : 8 ] == '--cache=' argument[ : 8 ] == '--cache='
@ -165,34 +117,18 @@ class Settings:
argument[ : 12 ] == '--intensity=' argument[ : 12 ] == '--intensity='
): ):
self.set_inte_cal( argument[ 12 : ] ) self.set_inte_cal( argument[ 12 : ] )
elif (
len( argument ) > 11 and
argument[ : 11 ] == '--log-file='
):
self.set_log_file( argument[ 11 : ] )
else: else:
raise Exception( raise Exception(
_( ( 'unknown argument "' + argument + \
'unknown argument {argument}, type' '", type \'' + __file__ + ' -h\' ' + \
'\'{program} -h\' for more information' 'for more information'
) )
) )
else: else:
self.set_input( argument ) self.set_input( argument )
index += 1 index += 1
logger = getLogger( 'naroo reader' )
if not logger.hasHandlers():
handler = StreamHandler( stdout )
handler.setFormatter(
_formatter,
)
logger.addHandler(
handler,
)
if self.input == None: if self.input == None:
raise Exception( _( 'input should be given' ) ) raise Exception( 'input should be given' )
def set_cache( self , cache = None ): def set_cache( self , cache = None ):
""" """
@ -206,7 +142,7 @@ class Settings:
self.cache = cache self.cache = cache
else: else:
raise TypeError( raise TypeError(
_( 'cache should be a path' ) 'cache should be a path'
) )
def set_input( self , input = None ): def set_input( self , input = None ):
""" """
@ -220,13 +156,11 @@ class Settings:
self.input = input self.input = input
else: else:
raise TypeError( raise TypeError(
_( 'input should be a path' ) 'input should be a path'
) )
if self.input != None and not self.input.is_file(): if self.input != None and not self.input.is_file():
raise IOError( raise IOError(
_( 'could not open {input}' ).format( 'could not open ' + str( self.input )
input = self.input,
)
) )
def set_inte_cal( self , intensity_calibration = None ): def set_inte_cal( self , intensity_calibration = None ):
@ -241,41 +175,11 @@ class Settings:
self.inte_cal = intensity_calibration self.inte_cal = intensity_calibration
else: else:
raise TypeError( raise TypeError(
_( 'intensity calibration should be a path' ) 'intensity calibration should be a path'
) )
if self.inte_cal != None and not self.inte_cal.is_file(): if self.inte_cal != None and not self.inte_cal.is_file():
raise IOError( raise IOError(
_( 'could not open {intensity_file}' ).format( 'could not open ' + str( self.inte_cal )
intensity_file = self.inte_cal,
)
)
def set_log_file( self , log_file = None ):
"""
Setter for log file (None or path)
"""
if isinstance( log_file , str ):
self.log_file = Path( log_file )
elif isinstance( log_file , Path ):
self.log_file = log_file
elif log_file == None:
self.log_file = log_file
else:
raise TypeError(
_( 'log file should be a path' )
)
if self.log_file != None:
logger = getLogger( 'naroo reader' )
handler = FileHandler(
filename = str( log_file ),
mode = 'a' ,
)
handler.setFormatter(
_formatter,
)
logger.addHandler(
handler,
) )
def set_no_cache( self , no_cache = False ): def set_no_cache( self , no_cache = False ):
@ -286,7 +190,7 @@ class Settings:
self.no_cache = no_cache self.no_cache = no_cache
else: else:
raise TypeError( raise TypeError(
_( 'no_cache option should be a boolean' ) 'no_cache option should be a boolean'
) )
def set_output( self , output = 'data.fits' ): def set_output( self , output = 'data.fits' ):
@ -299,42 +203,25 @@ class Settings:
self.output = output self.output = output
else: else:
raise TypeError( raise TypeError(
_( 'output should be a path' ) 'output should be a path'
) )
if self.output.is_file(): if self.output.is_file():
logger = getLogger( 'naroo reader' ) warn(
logger.warning( str( self.output ) + ' already exists, it will be ' +
_( 'overwritten',
'{output} already exists, it will be overwritten' UserWarning ,
).format( output = self.output )
) )
def set_verbose( self , verbose = 'WARNING' ): def set_verbose( self , verbose = False ):
""" """
Setter for verbose level (string) Setter for verbosity flag (bool)
""" """
if isinstance( verbose , str ): if isinstance( verbose , bool ):
self.verbose = verbose.upper() self.verbose = verbose
if verbose not in [
'CRITICAL',
'ERROR' ,
'WARNING' ,
'INFO' ,
'DEBUG' ,
'NOTSET' ,
]:
raise ValueError(
_( (
'verbose level must be one of the listed ones'
'in python documentation'
) )
)
else: else:
raise TypeError( raise TypeError(
_( 'verbose level should be a string' ) 'verbose should be a boolean'
) )
logger = getLogger( 'naroo reader' )
logger.setLevel( self.verbose )
def set_wave_cal( self , wavelength_calibration = None ): def set_wave_cal( self , wavelength_calibration = None ):
""" """
@ -348,40 +235,40 @@ class Settings:
self.wave_cal = wavelength_calibration self.wave_cal = wavelength_calibration
else: else:
raise TypeError( raise TypeError(
_( 'wavelength calibration should be a path' ) 'wavelength calibration should be a path'
) )
if self.wave_cal != None and not self.wave_cal.is_file(): if self.wave_cal != None and not self.wave_cal.is_file():
raise IOError( raise IOError(
_( 'could not open {wavelength_file}' ).format( 'could not open ' + str( self.wave_cal )
wavelength_file = self.wave_cal
)
) )
def help( self ): def help( self ):
return 'naroo_reader [options...] input\ return '\
naroo_reader [options...] input\
\n -w wavelength wavelength calibration file, default to None.\
\n None means no wavelength interpolation\
\n -i intensity intensity calibration file, default to None.\
\n None means no intensity interpolation\
\n -c --cache use given cache file to store new temporary\ \n -c --cache use given cache file to store new temporary\
\n data or use old ones.\ \n data or use old ones.\
\n -h --help show this help and quit.\ \n -h --help show this help and quit\
\n -i --intensity intensity calibration file, default to None.\
\n None means no intensity interpolation\
\n -l --log-file file where to log event.\
\n -n --no-cache do not use already existing cache file.\ \n -n --no-cache do not use already existing cache file.\
\n If cache is defined, the cache file will be\ \n If cache is defined, the cache file will be\
\n overwrited.\ \n overwrited.\
\n If cache is None, it does nothing.\ \n If cache is None, it does nothing\
\n -o --output Output file. Default to data.fits.\ \n -o --output Output file. Default to data.fits\
\n -V --version show version number and quit.\ \n -V --version show version number and quit\
\n -v --verbose set logging level help debugging.\ \n -v --verbose show more information to help debugging\
\n -w wavelength wavelength calibration file, default to None.\ '
\n None means no wavelength interpolation.'
def __str__( self ): def __str__( self ):
return 'current settings:\ return '\
current settings:\
\n cache: ' + str( self.cache ) + '\ \n cache: ' + str( self.cache ) + '\
\n input: ' + str( self.input ) + '\ \n input: ' + str( self.input ) + '\
\n intensity calibration: ' + str( self.inte_cal ) + '\ \n intensity calibration: ' + str( self.inte_cal ) + '\
\n no cache flag: ' + str( self.no_cache ) + '\ \n no cache flag: ' + str( self.no_cache ) + '\
\n output: ' + str( self.output ) + '\ \n output: ' + str( self.output ) + '\
\n logging level: ' + str( self.verbose ) + '\ \n verbose flag: ' + str( self.verbose ) + '\
\n log-file: ' + str( self.log_file ) + '\ \n wavelength calibration: ' + str( self.wave_cal ) + '\
\n wavelength calibration: ' + str( self.wave_cal ) '

View file

@ -1,2 +0,0 @@
def linear( x , a , b ):
return a * x + b

View file

@ -1,3 +1,4 @@
import cv2
import numpy as np import numpy as np
def check_side( data , point , tolerance ): def check_side( data , point , tolerance ):
@ -64,7 +65,7 @@ def point( index_1 , index_2 , axis = 'x' ):
return [ index_2 , index_1 ] return [ index_2 , index_1 ]
return [ index_1 , index_2 ] return [ index_1 , index_2 ]
def find_point( list_ , index , axis = 'x' , threshold = 0.95 ): def find_point( list_ , index , axis = 'x' , threshold = 0.5 ):
""" """
find the index where to fill in a side find the index where to fill in a side
""" """
@ -79,21 +80,22 @@ def find_point( list_ , index , axis = 'x' , threshold = 0.95 ):
if not isinstance( threshold , float ): if not isinstance( threshold , float ):
raise ValueError( 'threshold must be a float, ' + type( threshold ) + ' given' ) raise ValueError( 'threshold must be a float, ' + type( threshold ) + ' given' )
mean = np.mean( list_ )
ampl = np.max( list_ ) - np.min( list_ ) ampl = np.max( list_ ) - np.min( list_ )
if ampl < np.mean( list_ ) / 2: if ampl < mean / 2:
return [ point( index , 0 , axis ) ] return [ point( index , 0 , axis ) ]
else: else:
points = [] points = []
list_ = list_.copy() list_ = np.convolve( list_ , np.ones( 100 ) , 'same' )
list_ -= np.min( list_ ) list_ -= np.min( list_ )
list_ /= np.max( list_ ) list_ /= np.max( list_ )
i , inside , size = 0 , False , 0 i , inside , size = 0 , False , 0
while i < len( list_ ): while i < len( list_ ):
if list_[ i ] > threshold and not inside: if list_[ i ] > threshold and not inside:
points.append( point( index , i, axis ) ) points.append( point( index , i , axis ) )
inside = True inside = True
size = 0 size = 0
elif list_[ i ] < threshold and inside: elif list_[ i ] < threshold and inside:
@ -152,6 +154,14 @@ def last_same_value( list_ ):
raise ValueError( 'list_ must be a list, ' + type( list_ ) + ' given' ) raise ValueError( 'list_ must be a list, ' + type( list_ ) + ' given' )
value = list_[0] value = list_[0]
return np.argwhere( list_ == value ).max() return np.argwhere( list_ == value ).max()
def rotate( image , angle ):
"""
rotate the following image by the given angle
"""
height , width = image.shape[ : 2 ]
cX , cY = ( width // 2 , height // 2 )
matrix = cv2.getRotationMatrix2D( ( cX , cY ) , angle , 1 )
return cv2.warpAffine( image , matrix , ( width , height ) , flags = cv2.INTER_NEAREST )
def retrieve_peaks( data , window_size = 5 , error_coef = 1.05 , max_window_size = 30 , min_successive = 2 ): def retrieve_peaks( data , window_size = 5 , error_coef = 1.05 , max_window_size = 30 , min_successive = 2 ):
""" """
get peak position from a 1D data get peak position from a 1D data
@ -207,154 +217,3 @@ def near_value( list_ , value ):
) # interpolation ) # interpolation
index = np.append( index , np.where( list_ == value ) ) index = np.append( index , np.where( list_ == value ) )
return np.round( np.sort( index ) ).astype( int ) # triage return np.round( np.sort( index ) ).astype( int ) # triage
def find_peak_low_high( list_ , value ):
"""
Return index of start and end of rise and descent of peaks crossing
a given value in a list
"""
indexes = near_value(
list_,
value,
)
old_list_ = list_
list_ = np.gradient( list_ )
if list_[ indexes[0] ] < 0:
indexes.insert( 0 , 0 )
if list_[ indexes[0] ] < 0:
indexes.insert( 0 , 0 ) # start with a descent
if list_[ indexes[-1] ] > 0:
indexes.append( len( list_ ) - 1 )
if list_[ indexes[-1] ] > 0:
indexes.append( len( list_ ) - 1 ) # end with a rise
if len( indexes ) % 2 == 1:
raise Exception(
'number of peaks doesn\'t match what it should be'
)
rises = [
indexes[ i ] for i in range( 0 , len( indexes ) , 2 )
]
descents = [
indexes[ i ] for i in range( 1 , len( indexes ) , 2 )
]
i = 0
start_rise = np.where( list_[ : rises[i] ] < 0 )
end_rise = rises[i] + np.where(
list_[ rises[i] : descents[i] ] < 0
)
start_descent = rises[i] + np.where(
list_[ rises[i] : descents[i] ] > 0
)
if len( rises ) == 1:
end_descent = descents[i] + np.where(
list_[ descents[i] : ] > 0
)
else:
end_descent = descents[i] + np.where(
list_[ descents[i] : rises[i + 1] ] > 0
)
if len( start_rise[0] ) == 0:
rise_starts = [ 0 ]
else:
rise_starts = [ start_rise[0][-1] ]
if len( end_rise[0] ) == 0:
rise_ends = [ rise_starts[0] ] # if first is a descent
else:
rise_ends = [ end_rise[0][0] ]
if len( start_descent[0] ) == 0:
descent_starts = [ rise_ends[0] ] # same
else:
descent_starts = [ start_descent[0][-1] ]
if len( end_descent[0] ) == 0: # edge case:
descent_ends = [ descent_starts[0] ] # one pixel decrease
else:
descent_ends = [ end_descent[0][0] ]
while i < len( rises ) - 2: # last is i == len( rises ) - 2, works
# if len( rises ) = 1 or 2
i += 1
start_rise = descents[i - 1 ] + np.where(
list_[ descents[i - 1] : rises[i] ] < 0
)
end_rise = rises[i] + np.where(
list_[ rises[i] : descents[i] ] < 0
)
start_descent = rises[i] + np.where(
list_[ rises[i] : descents[i] ] > 0
)
end_descent = descents[i] + np.where(
list_[ descents[i] : rises[i + 1] ] > 0
)
if len( start_rise[0] ) == 0:
rise_starts.append( descent_ends[-1] )
else:
rise_starts.append(
start_rise[0][-1] # last pixel that decrease
)
if len( end_rise[0] ) == 0:
rise_ends.append( rise_starts[-1] )
else:
rise_ends.append(
end_rise[0][0] # first pixel that decrease
)
if len( start_descent[0] ) == 0:
descent_starts.append( rises_ends[-1] )
else:
descent_starts.append(
start_descent[0][-1] # last pixel that increase
)
if len( end_descent[0] ) == 0:
descent_ends.append( descent_starts[-1] )
else:
descent_ends.append(
end_descent[0][0] # first pixel that increase
)
if i != 0 or len( rises ) != 1:
i += 1
start_rise = descents[i - 1] + np.where(
list_[ descents[i - 1] : rises[i] ] < 0
)
end_rise = rises[i] + np.where(
list_[ rises[i] : descents[i] ] < 0
)
start_descent = rises[i] + np.where(
list_[ rises[i] : descents[i] ] > 0
)
end_descent = descents[i] + np.where(
list_[ descents[i] : ] > 0
)
if len( start_rise[0] ) == 0:
rise_starts.append( descent_ends[-1] )
else:
rise_starts.append(
start_rise[0][-1] # last pixel that decrease
)
if len( end_rise[0] ) == 0:
rise_ends.append( rise_starts[-1] )
else:
rise_ends.append(
end_rise[0][0] # first pixel that decrease
)
if len( start_descent[0] ) == 0:
descent_starts.append( rises_ends[-1] )
else:
descent_starts.append(
start_descent[0][-1] # last pixel that increase
)
if len( end_descent[0] ) == 0:
descent_ends.append( descent_starts[-1] )
else:
descent_ends.append(
end_descent[0][0] # first pixel that increase
)
return [
rise_starts ,
rise_ends ,
descent_starts,
descent_ends ,
]

View file

@ -9,4 +9,6 @@ settings = Settings( arguments[ 1 : ] ) # remove the "main.py" part
hdul = open( settings.input ) hdul = open( settings.input )
plate = Plate( hdul[0].data ) plate = Plate( hdul[0].data )
plate.rotate()
hdul.close() hdul.close()