680 lines
22 KiB
Python
Executable file
680 lines
22 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import utils
|
|
import sys
|
|
import pathlib
|
|
import shelve
|
|
from scipy.signal import convolve as sp_convolve
|
|
from scipy.signal import find_peaks
|
|
from scipy.ndimage import rotate
|
|
|
|
cache , filename , output , calibration , verbose , no_cache = '' , None , None , None , False , False
|
|
if len( sys.argv ) < 2:
|
|
raise Exception( 'ETA.py: type \'ETA.py -h\' for more information' )
|
|
|
|
argv , i = sys.argv[ 1 : ] , 0
|
|
while i < len( argv ):
|
|
arg = argv[ i ]
|
|
if arg[0] == '-':
|
|
if len( arg ) < 2:
|
|
raise Exception( 'ETA.py: unknown argument, type \'ETA.py -h\' for more information' )
|
|
if arg[1] != '-':
|
|
if arg == '-h':
|
|
arg = '--help'
|
|
elif arg == '-V':
|
|
arg = '--version'
|
|
elif arg == '-v':
|
|
arg = '--verbose'
|
|
elif arg == '-n':
|
|
arg == '--no-cache'
|
|
elif arg == '-c':
|
|
if i == len( sys.argv ) - 1:
|
|
raise Exception( 'ETA.py: cache have to take a value' )
|
|
argv[ i + 1 ] = '--cache=' + argv[ i + 1 ]
|
|
i += 1
|
|
continue
|
|
elif arg == '-o':
|
|
if i == len( sys.argv ) - 1:
|
|
raise Exception( 'ETA.py: output have to take a value' )
|
|
argv[ i + 1 ] = '--output=' + argv[ i + 1 ]
|
|
i += 1
|
|
continue
|
|
elif arg == '-a':
|
|
if i == len( sys.argv ) - 1:
|
|
raise Exception( 'ETA.py: calibration have to take a value' )
|
|
argv[ i + 1 ] = '--calibration=' + argv[ i + 1 ]
|
|
i += 1
|
|
continue
|
|
else:
|
|
raise Exception( 'ETA.py: unknown argument "' + arg + '", type \'ETA.py -h\' for more information' )
|
|
if arg[1] == '-': # not elif because arg can change after last if
|
|
if arg == '--help':
|
|
print( 'ETA.py [options...] filename\
|
|
\n -a --calibration calibration file, default to no calibration.\
|
|
\n No calibration means no wavelength interpolation\
|
|
\n -c --cache use given cache\
|
|
\n -h --help show this help and quit\
|
|
\n -n --no-cache do not use cache and rewrite it\
|
|
\n -o --output output file, default to standard output\
|
|
\n -V --version show version number and quit\
|
|
\n -v --verbose show more information to help debugging\
|
|
\n\
|
|
\nParse a naroo ETA fits' )
|
|
exit()
|
|
elif arg == '--version':
|
|
print( '0.3' )
|
|
exit()
|
|
elif arg == '--verbose':
|
|
verbose = True
|
|
elif arg == '--no-cache':
|
|
no_cache = True
|
|
elif len( arg ) > 8 and arg[ : 8 ] == '--cache=':
|
|
cache = arg[ 8 : ]
|
|
elif len( arg ) > 9 and arg[ : 9 ] == '--output=':
|
|
output = arg[ 9 : ]
|
|
elif len( arg ) > 14 and arg[ : 14 ] == '--calibration=':
|
|
calibration = arg[ 14 : ]
|
|
else:
|
|
raise Exception( 'ETA.py: unknown argument "' + arg + '", type \'ETA.py -h\' for more information' )
|
|
else:
|
|
raise Exception( 'ETA.py: this exception should never be raised' )
|
|
else:
|
|
filename = arg
|
|
i += 1
|
|
if filename == None:
|
|
raise Exception( 'ETA.py: filename should be given' )
|
|
|
|
if verbose:
|
|
cache, filename, output, calibration, verbose
|
|
print( f'ETA.py: launching now with parameters:\
|
|
\n --filename: {filename}\
|
|
\n --cache: {cache} ( default: \'\' )\
|
|
\n --calibration: {calibration} ( default to None )\
|
|
\n --output: {output} ( default to None )\
|
|
\n --verbose: True ( default to False)\
|
|
\n\
|
|
\n===========================================' )
|
|
# TODO: check in advance file to check if exists or writeable
|
|
|
|
data = utils.load( filename )
|
|
if verbose:
|
|
print( 'data loaded' )
|
|
|
|
cache_file = pathlib.Path( cache )
|
|
|
|
if cache_file.is_file() and not no_cache:
|
|
if verbose:
|
|
print( 'using cache' )
|
|
with shelve.open( str( cache_file ) ) as cache:
|
|
for key in [ 'data' , 'border' , 'calibrations' ]:
|
|
if key not in cache:
|
|
raise Exception( 'ETA.py: missing data in cache_file' )
|
|
data = cache[ 'data' ]
|
|
border = cache[ 'border' ]
|
|
calibrations = cache[ 'calibrations' ]
|
|
else:
|
|
if verbose:
|
|
print( 'not using cache' )
|
|
print( 'starting first zoom' )
|
|
"""
|
|
find fill points
|
|
"""
|
|
points = []
|
|
|
|
points += utils.find_point( data[ : , 0 ] , 0 ) # x_min
|
|
points += utils.find_point(
|
|
data[ : , data.shape[1] - 1 ],
|
|
data.shape[1] - 1 ,
|
|
) # x_max
|
|
|
|
index_min = 0
|
|
while data.shape[0] - 1 > index_min:
|
|
index_min += 1
|
|
if len( utils.find_point( data[ index_min , : ] , index_min , 'y' ) ) == 3:
|
|
break
|
|
|
|
points.append(
|
|
utils.find_point( data[ index_min , : ] , index_min , 'y' )[1]
|
|
) # y_min
|
|
|
|
index_max = data.shape[0] - 1
|
|
while index_min < index_max:
|
|
index_max -= 1
|
|
if len( utils.find_point( data[ index_max , : ] , index_max , 'y' ) ) == 3:
|
|
break
|
|
|
|
|
|
points.append(
|
|
utils.find_point( data[ index_max , : ] , index_max , 'y' )[1]
|
|
) # y_max
|
|
|
|
small_data = utils.compress( data , 5 )
|
|
|
|
points = utils.big_to_small( points , 5 )
|
|
|
|
# size - 1
|
|
points[ 1 ][ 1 ] -= 1
|
|
points[ 3 ][ 0 ] -= 1
|
|
|
|
# little shift to be inside the light
|
|
points[ 2 ][ 1 ] += 3
|
|
points[ 3 ][ 1 ] += 3
|
|
|
|
"""
|
|
fill data
|
|
"""
|
|
|
|
extremum = []
|
|
for point in points:
|
|
if point[0] < points[2][0]:
|
|
point[0] = points[2][0]
|
|
if point[1] < points[0][1]:
|
|
point[1] = points[0][1]
|
|
taken_points = utils.small_to_big(
|
|
np.array( [
|
|
points[2][0],
|
|
points[0][1],
|
|
] ) + utils.fill(
|
|
small_data[
|
|
points[2][0] : points[3][0] + 1,
|
|
points[0][1] : points[1][1] + 1
|
|
],
|
|
[
|
|
point[0] - points[2][0],
|
|
point[1] - points[0][1],
|
|
],
|
|
1000
|
|
),
|
|
5
|
|
)
|
|
extremum.append( [
|
|
np.min( taken_points[ : , 1 ] ),
|
|
np.max( taken_points[ : , 1 ] ),
|
|
np.min( taken_points[ : , 0 ] ),
|
|
np.max( taken_points[ : , 0 ] ),
|
|
] )
|
|
|
|
border = {
|
|
'x': {
|
|
'min': points[0][1] + extremum[0][1] + 1,
|
|
'max': points[0][1] + extremum[1][0] ,
|
|
},
|
|
'y': {
|
|
'min': points[2][0] + extremum[2][3] + 1,
|
|
'max': points[2][0] + extremum[3][2] ,
|
|
},
|
|
}
|
|
|
|
if verbose:
|
|
print( 'first zoom finished' )
|
|
print( 'starting label deletion' )
|
|
|
|
"""
|
|
label deletion
|
|
"""
|
|
|
|
mean_data = np.convolve(
|
|
np.gradient(
|
|
np.mean(
|
|
data[
|
|
border[ 'y' ][ 'min' ] : border[ 'y' ][ 'max' ],
|
|
border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ]
|
|
],
|
|
axis = 0
|
|
)
|
|
),
|
|
np.ones(
|
|
int( 0.01 * (
|
|
border[ 'x' ][ 'max' ] - border[ 'x' ][ 'min' ]
|
|
) )
|
|
),
|
|
'same'
|
|
)
|
|
|
|
mean_data -= np.min( mean_data )
|
|
mean_data /= np.max( mean_data )
|
|
|
|
top = utils.consecutive( np.where( mean_data > 0.75 )[0] )
|
|
down = utils.consecutive( np.where( mean_data < 0.25 )[0] )
|
|
|
|
size_top = [ len( list_ ) for list_ in top ]
|
|
size_down = [ len( list_ ) for list_ in down ]
|
|
|
|
label_x = {
|
|
'min': border[ 'x' ][ 'min' ] + top[ np.argmax( size_top ) ][0] ,
|
|
'max': border[ 'x' ][ 'min' ] + down[ np.argmax( size_down ) ][-1]
|
|
}
|
|
|
|
if label_x[ 'min' ] < data.shape[1] // 2:
|
|
if label_x[ 'max' ] < data.shape[1] // 2:
|
|
border[ 'x' ][ 'min' ] = label_x[ 'max' ]
|
|
else:
|
|
raise Exception( 'the label seems to be in the middle of the picture' )
|
|
elif label_x[ 'max' ] > data.shape[1] // 2:
|
|
border[ 'x' ][ 'max' ] = label_x[ 'min' ]
|
|
else:
|
|
raise Exception( 'for an unkown reason, label_x[ \'min\' ] > label_x[ \'max\' ]' )
|
|
|
|
if verbose:
|
|
print( 'label section deleted' )
|
|
print( 'starting rotation' )
|
|
|
|
"""
|
|
Rotation
|
|
"""
|
|
|
|
index = border[ 'x' ][ 'min' ]
|
|
gradient = np.gradient(
|
|
data[
|
|
border[ 'y' ][ 'min' ] : border[ 'y' ][ 'min' ] + (
|
|
border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ]
|
|
) // 2,
|
|
index
|
|
]
|
|
)
|
|
while np.max( gradient ) - np.min( gradient ) > 5500:
|
|
index += 1
|
|
gradient = np.gradient(
|
|
data[
|
|
border[ 'y' ][ 'min' ] : border[ 'y' ][ 'min' ] + (
|
|
border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ]
|
|
) // 2,
|
|
index
|
|
]
|
|
)
|
|
|
|
positions = np.argmax(
|
|
sp_convolve(
|
|
np.gradient(
|
|
data[
|
|
border[ 'y' ][ 'min' ] : border[ 'y' ][ 'min' ] + (
|
|
border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ]
|
|
) // 2 ,
|
|
border[ 'x' ][ 'min' ] : index
|
|
] ,
|
|
axis = 0
|
|
) ,
|
|
np.ones( ( 100 , 1 ) ),
|
|
'valid'
|
|
) ,
|
|
axis = 0
|
|
)
|
|
|
|
list_ = np.arange( 0 , index - border[ 'x' ][ 'min' ] , 1 )
|
|
polyval = np.polyfit( list_ , positions , 1 )
|
|
|
|
angle = np.arctan( polyval[0] )
|
|
|
|
data = rotate( data , angle * ( 180 / np.pi ) ) # utils.rotate does not keep intensity absolute value ? TODO
|
|
|
|
diff_y = int( np.tan( angle ) * ( border[ 'x' ][ 'max' ] - border[ 'x' ][ 'min' ] ) )
|
|
|
|
border[ 'y' ][ 'min' ] -= diff_y
|
|
border[ 'y' ][ 'max' ] -= diff_y
|
|
|
|
if verbose:
|
|
print( 'rotation finished' )
|
|
print( 'starting isolating calibration' )
|
|
|
|
"""
|
|
Calibration
|
|
"""
|
|
|
|
tot_avg = np.mean(
|
|
data[
|
|
border[ 'y' ][ 'min' ] : border[ 'y' ][ 'max' ],
|
|
border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ]
|
|
]
|
|
)
|
|
|
|
def indicator( list_ ):
|
|
if np.mean( list_ ) > 0.75 * tot_avg:
|
|
return 0
|
|
if np.mean( list_ ) < 0.25 * tot_avg:
|
|
return 1
|
|
list_ -= np.min( list_ )
|
|
list_ /= np.max( list_ )
|
|
positions = np.where( list_ > 0.5 )[0]
|
|
if len( positions ) < 100:
|
|
return 2
|
|
if len( positions ) > 400:
|
|
return 3
|
|
distance = np.mean( positions[ 1 : ] - positions[ : -1 ] )
|
|
if distance < 10:
|
|
return 4
|
|
return 10
|
|
|
|
indicators = np.array( [ indicator( data[ i , border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ] ].copy() ) for i in range( border[ 'y' ][ 'min' ] , border[ 'y' ][ 'max' ] , 1 ) ] )
|
|
|
|
calibration_areas = utils.consecutive( np.where( indicators == 10 )[0] )
|
|
|
|
calibration_areas = [
|
|
[ calibration_area for calibration_area in calibration_areas if calibration_area[-1] < ( border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ] ) / 2 ],
|
|
[ calibration_area for calibration_area in calibration_areas if calibration_area[ 0] > ( border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ] ) / 2 ],
|
|
]
|
|
|
|
calibration_sizes = [
|
|
[ len( calibration_area ) for calibration_area in calibration_areas[0] ],
|
|
[ len( calibration_area ) for calibration_area in calibration_areas[1] ],
|
|
]
|
|
|
|
y_calibrations = [
|
|
calibration_areas[0][ np.argmax( calibration_sizes[0] ) ],
|
|
calibration_areas[1][ np.argmax( calibration_sizes[1] ) ],
|
|
]
|
|
|
|
calibrations = {
|
|
'top': {
|
|
'x': {
|
|
'min': border['x']['min'],
|
|
'max': border['x']['max'],
|
|
},
|
|
'y': {
|
|
'min': border['y']['min'] + y_calibrations[0][ 0],
|
|
'max': border['y']['min'] + y_calibrations[0][-1],
|
|
},
|
|
},
|
|
'down': {
|
|
'x': {
|
|
'min': border['x']['min'],
|
|
'max': border['x']['max'],
|
|
},
|
|
'y': {
|
|
'min': border['y']['min'] + y_calibrations[1][ 0],
|
|
'max': border['y']['min'] + y_calibrations[1][-1],
|
|
},
|
|
},
|
|
}
|
|
|
|
if verbose:
|
|
print( 'calibration lines isolated' )
|
|
print( 'starting curving main ETA' )
|
|
|
|
"""
|
|
stripes curves detection
|
|
"""
|
|
list_ = data[
|
|
calibrations[ 'top' ][ 'y' ][ 'max' ] : calibrations[ 'down' ][ 'y' ][ 'min' ],
|
|
border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ]
|
|
].copy()
|
|
list_min = np.min( list_ , axis = 0 )
|
|
list_ -= list_min
|
|
list_max = np.max( list_ , axis = 0 )
|
|
list_ /= list_max
|
|
size = list_.shape[1]
|
|
|
|
y_stripe = np.argmax( list_ , axis = 0 )
|
|
good_x = np.where( y_stripe < 2 * np.mean( y_stripe ) )[0]
|
|
x_stripe = np.arange( 0 , size , 1 ).astype( int )[ good_x ]
|
|
y_stripe = y_stripe[ good_x ]
|
|
|
|
stripe = np.polyfit( x_stripe , y_stripe , 3 )
|
|
|
|
# First deformation
|
|
|
|
list_ = data[
|
|
calibrations[ 'top' ][ 'y' ][ 'max' ] : calibrations[ 'down' ][ 'y' ][ 'min' ],
|
|
border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ]
|
|
]
|
|
y_diff = ( np.polyval(
|
|
stripe,
|
|
np.arange( 0 , border[ 'x' ][ 'max' ] - border[ 'x' ][ 'min' ] , 1 )
|
|
) ).astype( int )
|
|
y_diff[ np.where( y_diff < 0 ) ] = 0
|
|
results = np.zeros( ( list_.shape[0] + np.max( y_diff ) , list_.shape[1] ) )
|
|
for i in range( list_.shape[1] ):
|
|
results[ : , i ] = np.concatenate( (
|
|
np.zeros(
|
|
np.max( y_diff ) - y_diff[ i ]
|
|
) ,
|
|
list_[ : , i ] ,
|
|
np.zeros( y_diff[i] ),
|
|
) )
|
|
|
|
data[
|
|
calibrations[ 'top' ][ 'y' ][ 'max' ] : calibrations[ 'down' ][ 'y' ][ 'min' ],
|
|
border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ]
|
|
] = results[
|
|
: results.shape[0] - np.max( y_diff ),
|
|
:
|
|
]
|
|
|
|
if verbose:
|
|
print( 'main ETA curved' )
|
|
|
|
if not cache_file.exists() and not no_cache:
|
|
if verbose:
|
|
print( 'writing result in cache' )
|
|
with shelve.open( str( cache_file ) ) as cache:
|
|
cache[ 'data' ] = data
|
|
cache[ 'border' ] = border
|
|
cache[ 'calibrations'] = calibrations
|
|
if verbose:
|
|
print( 'cache saved' )
|
|
|
|
size_x = border[ 'x' ][ 'max' ] - border[ 'x' ][ 'min' ]
|
|
size_y = calibrations[ 'down' ][ 'y' ][ 'min' ] - calibrations[ 'top' ][ 'y' ][ 'max' ]
|
|
|
|
# Calibration
|
|
|
|
if calibration != None:
|
|
if verbose:
|
|
print( 'starting calibration' )
|
|
import wavelength_calibration as wave_calib
|
|
peaks_calib = np.loadtxt( calibration ) # sorted list
|
|
peaks_calib = np.sort( peaks_calib )
|
|
mean_up = np.mean( data[
|
|
calibrations[ 'top' ][ 'y' ][ 'min' ] : calibrations[ 'top' ][ 'y' ][ 'max' ],
|
|
calibrations[ 'top' ][ 'x' ][ 'min' ] : calibrations[ 'top' ][ 'x' ][ 'max' ]
|
|
] , axis = 0 )
|
|
mean_down = np.mean( data[
|
|
calibrations[ 'down' ][ 'y' ][ 'min' ] : calibrations[ 'down' ][ 'y' ][ 'max' ],
|
|
calibrations[ 'down' ][ 'x' ][ 'min' ] : calibrations[ 'down' ][ 'x' ][ 'max' ]
|
|
] , axis = 0 )
|
|
peaks_up = np.array(
|
|
utils.retrieve_peaks(
|
|
mean_up ,
|
|
window_size = 1 ,
|
|
max_window_size = 1,
|
|
)
|
|
).astype( int )
|
|
peaks_down = np.array(
|
|
utils.retrieve_peaks(
|
|
mean_down ,
|
|
window_size = 1 ,
|
|
max_window_size = 1,
|
|
)
|
|
).astype( int )
|
|
|
|
peakless_up = wave_calib.remove_peaks( mean_up , peaks_up )
|
|
peakless_down = wave_calib.remove_peaks( mean_down , peaks_down )
|
|
calib = { # hard-coded for now
|
|
'first': 3 ,
|
|
'last' : 20,
|
|
}
|
|
first , last = wave_calib.get_extremities( peakless_up , peaks_up )
|
|
up = {
|
|
'first': first,
|
|
'last' : last ,
|
|
}
|
|
first , last = wave_calib.get_extremities( peakless_down , peaks_down )
|
|
down = {
|
|
'first': first,
|
|
'last' : last ,
|
|
}
|
|
peaks_up = peaks_up[ up[ 'first' ] : up[ 'last' ] + 1 ]
|
|
peaks_down = peaks_down[ down[ 'first' ] : down[ 'last' ] + 1 ]
|
|
peaks_calib = peaks_calib[ calib[ 'first' ] : calib[ 'last' ] + 1 ]
|
|
|
|
peaks_up = wave_calib.only_keep_calib( peaks_up , peaks_calib )
|
|
peaks_down = wave_calib.only_keep_calib( peaks_down , peaks_calib )
|
|
|
|
diff = peaks_up - peaks_down
|
|
polyval_vert = np.polyfit( # give diff by horizontal pixel
|
|
peaks_up,
|
|
diff ,
|
|
3 ,
|
|
)
|
|
|
|
if verbose:
|
|
print( 'x pixel to wavelenth calibration finished' )
|
|
print( 'starting y pixel to x pixel calibration' )
|
|
|
|
def diff_calc( x , list_y ):
|
|
"""
|
|
give "good" x list for a given x and y
|
|
x = 0 start from border[ 'x' ][ 'min' ]
|
|
y = 0 start from calibrations[ 'top' ][ 'y' ][ 'max' ]
|
|
"""
|
|
y_top = 0
|
|
y_bot = size_y
|
|
|
|
x_top = x
|
|
x_bot = x + np.polyval( polyval_vert , x )
|
|
|
|
a = ( x_top - x_bot ) / ( y_top - y_bot )
|
|
b = ( 1 / 2 ) * ( x_top + x_bot - a * ( y_top + y_bot ) )
|
|
|
|
return ( a * list_y + b ).astype( int )
|
|
|
|
padding = diff_calc( size_x , size_y ) - size_x
|
|
new_data = np.zeros( ( size_y , size_x - padding ) )
|
|
list_y = np.arange( 0 , size_y , 1 )
|
|
for x in range( size_x - padding ):
|
|
diff = diff_calc( x , list_y )
|
|
cons_diff , i = utils.same_value( diff ) , 0
|
|
for list_same in cons_diff:
|
|
new_data[ i : i + len( list_same ) , x ] = data[
|
|
calibrations[ 'top' ][ 'y' ][ 'max' ] + i : calibrations[ 'top' ][ 'y' ][ 'max' ] + i + len( list_same ),
|
|
calibrations[ 'top' ][ 'x' ][ 'min' ] + list_same[0]
|
|
]
|
|
i += len( list_same )
|
|
calibrations[ 'top' ][ 'x' ][ 'max' ] -= padding
|
|
calibrations[ 'down' ][ 'x' ][ 'max' ] -= padding
|
|
border[ 'x' ][ 'max' ] -= padding
|
|
|
|
if verbose:
|
|
print( 'y to x pixel calibrated' )
|
|
|
|
polyval_wavelength = np.polyfit( # x = 0 begins at the start of
|
|
peaks_up , # calibrations[ 'top' ][ 'x' ][ 'min' ]
|
|
peaks_calib,
|
|
1 ,
|
|
)
|
|
wavelength = np.polyval(
|
|
polyval_wavelength,
|
|
np.arange(
|
|
0 ,
|
|
len( mean_up ),
|
|
1 ,
|
|
)
|
|
)
|
|
|
|
if verbose:
|
|
print( 'end of calibration' )
|
|
|
|
else:
|
|
if verbose:
|
|
print( 'no calibration' )
|
|
padding = 0
|
|
wavelength = np.arange( size_y )
|
|
|
|
if verbose:
|
|
print( 'starting bias substraction' )
|
|
|
|
bias = {
|
|
'top': np.mean(
|
|
data[
|
|
calibrations[ 'top' ][ 'y' ][ 'min' ] - 100 :
|
|
calibrations[ 'top' ][ 'y' ][ 'min' ] ,
|
|
calibrations[ 'top' ][ 'x' ][ 'min' ] :
|
|
calibrations[ 'top' ][ 'x' ][ 'max' ]
|
|
] ,
|
|
axis = 0,
|
|
),
|
|
'down': np.mean(
|
|
data[
|
|
calibrations[ 'down' ][ 'y' ][ 'max' ] :
|
|
calibrations[ 'down' ][ 'y' ][ 'max' ] + 100,
|
|
calibrations[ 'down' ][ 'x' ][ 'min' ] :
|
|
calibrations[ 'down' ][ 'x' ][ 'max' ]
|
|
] ,
|
|
axis = 0,
|
|
),
|
|
}
|
|
|
|
mean_bias = np.mean( [ bias[ 'top' ] , bias[ 'down' ] ] , axis = 0 )
|
|
|
|
data[
|
|
: ,
|
|
border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ]
|
|
] = data[
|
|
: ,
|
|
border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ]
|
|
] - mean_bias
|
|
|
|
if verbose:
|
|
print( 'bias substraction finished' )
|
|
print( 'starting to flatten the ETA' )
|
|
|
|
list_ = np.convolve(
|
|
np.gradient(
|
|
np.mean(
|
|
data[
|
|
calibrations[ 'top' ][ 'y' ][ 'max' ] : calibrations[ 'down' ][ 'y' ][ 'min' ],
|
|
border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ]
|
|
] ,
|
|
axis = 1,
|
|
),
|
|
) ,
|
|
np.ones( 50 ),
|
|
'same' ,
|
|
)
|
|
|
|
fall = utils.consecutive( np.where( list_ < 0.03 * np.min( list_ ) )[0] )
|
|
fall = np.array( [
|
|
np.argmax( list_ )
|
|
] + [
|
|
consecutive[0] + np.argmin(
|
|
list_[ consecutive[0] : consecutive[-1] ]
|
|
) for consecutive in fall if len( consecutive ) > 20 # the value might change
|
|
] ).astype( int )
|
|
|
|
stairs = np.zeros( ( len( fall ) , size_x , 2 ) )
|
|
for x in range( size_x ):
|
|
stairs[ : fall[0] , x ] = 0 # TODO: put the mean
|
|
for i in range( len( fall ) - 1 ):
|
|
stairs[ i , x ] = [
|
|
np.mean( data[
|
|
calibrations[ 'top' ][ 'y' ][ 'max' ] + fall[ i ] : calibrations[ 'top' ][ 'y' ][ 'max' ] + fall[ i + 1 ],
|
|
border[ 'x' ][ 'min' ] + x
|
|
] ),
|
|
fall[i]
|
|
]
|
|
stairs[ -1 , x ] = [
|
|
np.mean( data[
|
|
calibrations[ 'top' ][ 'y' ][ 'max' ] + fall[ -1 ] : calibrations[ 'down' ][ 'y' ][ 'min' ],
|
|
border[ 'x' ][ 'min' ] + x
|
|
] ),
|
|
fall[ - 1 ]
|
|
]
|
|
|
|
if verbose:
|
|
print( 'ETA flattened' )
|
|
print( 'outputting data' )
|
|
|
|
if output == None:
|
|
print( stairs , wavelength )
|
|
else:
|
|
|
|
output_file = pathlib.Path( output )
|
|
with shelve.open( str( output_file ) ) as output:
|
|
output[ 'data' ] = stairs
|
|
output[ 'wavelength' ] = wavelength
|
|
|
|
if verbose:
|
|
print( 'output sent' )
|
|
print( '===========================================\
|
|
\nend of ETA.py' )
|