import numpy as np import matplotlib.pyplot as plt import utils import sys import pathlib import shelve from scipy.signal import convolve as sp_convolve from scipy.signal import find_peaks from scipy.ndimage import rotate cache , filename , output , calibration , verbose , no_cache = '' , None , None , None , False , False if len( sys.argv ) < 2: raise Exception( 'ETA.py: type \'ETA.py -h\' for more information' ) argv , i = sys.argv[ 1 : ] , 0 while i < len( argv ): arg = argv[ i ] if arg[0] == '-': if len( arg ) < 2: raise Exception( 'ETA.py: unknown argument, type \'ETA.py -h\' for more information' ) if arg[1] != '-': if arg == '-h': arg = '--help' elif arg == '-v': arg = '--version' elif arg == '-l': arg = '--verbose' elif arg == '-n': arg == '--no-cache' elif arg == '-c': if i == len( sys.argv ) - 1: raise Exception( 'ETA.py: cache have to take a value' ) argv[ i + 1 ] = '--cache=' + argv[ i + 1 ] i += 1 continue elif arg == '-o': if i == len( sys.argv ) - 1: raise Exception( 'ETA.py: output have to take a value' ) argv[ i + 1 ] = '--output=' + argv[ i + 1 ] i += 1 continue elif arg == '-a': if i == len( sys.argv ) - 1: raise Exception( 'ETA.py: calibration have to take a value' ) argv[ i + 1 ] = '--calibration=' + argv[ i + 1 ] i += 1 continue else: raise Exception( 'ETA.py: unknown argument "' + arg + '", type \'ETA.py -h\' for more information' ) if arg[1] == '-': # not elif because arg can change after last if if arg == '--help': print( 'ETA.py [options...] filename\ \n -a --calibration calibration file, default to no calibration.\ \n No calibration means no wavelength interpolation\ \n -c --cache use given cache\ \n -h --help show this help and quit\ \n -n --no-cache do not use cache and rewrite it\ \n -o --output output file, default to standard output\ \n -v --version show version number and quit\ \n -l --verbose show more information to help debugging\ \n\ \nParse a naroo ETA fits' ) exit() elif arg == '--version': print( '0.2' ) exit() elif arg == '--verbose': verbose = True elif arg == '--no-cache': no_cache = True elif len( arg ) > 8 and arg[ : 8 ] == '--cache=': cache = arg[ 8 : ] elif len( arg ) > 9 and arg[ : 9 ] == '--output=': output = arg[ 9 : ] elif len( arg ) > 14 and arg[ : 14 ] == '--calibration=': calibration = arg[ 14 : ] else: raise Exception( 'ETA.py: unknown argument "' + arg + '", type \'ETA.py -h\' for more information' ) else: raise Exception( 'ETA.py: this exception should never be raised' ) else: filename = arg i += 1 if filename == None: raise Exception( 'ETA.py: filename should be given' ) if verbose: cache, filename, output, calibration, verbose print( f'ETA.py: launching now with parameters:\ \n --filename: {filename}\ \n --cache: {cache} ( default: \'\' )\ \n --calibration: {calibration} ( default to None )\ \n --output: {output} ( default to None )\ \n --verbose: True ( default to False)\ \n\ \n===========================================' ) # TODO: check in advance file to check if exists or writeable data = utils.load( filename ) if verbose: print( 'data loaded' ) cache_file = pathlib.Path( cache ) if cache_file.is_file() and not no_cache: if verbose: print( 'using cache' ) with shelve.open( str( cache_file ) ) as cache: for key in [ 'data' , 'border' , 'calibrations' ]: if key not in cache: raise Exception( 'ETA.py: missing data in cache_file' ) data = cache[ 'data' ] border = cache[ 'border' ] calibrations = cache[ 'calibrations' ] else: if verbose: print( 'not using cache' ) print( 'starting first zoom' ) """ find fill points """ points = [] points += utils.find_point( data[ : , 0 ] , 0 ) # x_min points += utils.find_point( data[ : , data.shape[1] - 1 ], data.shape[1] - 1 , ) # x_max index_min = 0 while data.shape[0] - 1 > index_min: index_min += 1 if len( utils.find_point( data[ index_min , : ] , index_min , 'y' ) ) == 3: break points.append( utils.find_point( data[ index_min , : ] , index_min , 'y' )[1] ) # y_min index_max = data.shape[0] - 1 while index_min < index_max: index_max -= 1 if len( utils.find_point( data[ index_max , : ] , index_max , 'y' ) ) == 3: break points.append( utils.find_point( data[ index_max , : ] , index_max , 'y' )[1] ) # y_max small_data = utils.compress( data , 5 ) points = utils.big_to_small( points , 5 ) # size - 1 points[ 1 ][ 1 ] -= 1 points[ 3 ][ 0 ] -= 1 # little shift to be inside the light points[ 2 ][ 1 ] += 3 points[ 3 ][ 1 ] += 3 """ fill data """ extremum = [] for point in points: if point[0] < points[2][0]: point[0] = points[2][0] if point[1] < points[0][1]: point[1] = points[0][1] taken_points = utils.small_to_big( np.array( [ points[2][0], points[0][1], ] ) + utils.fill( small_data[ points[2][0] : points[3][0] + 1, points[0][1] : points[1][1] + 1 ], [ point[0] - points[2][0], point[1] - points[0][1], ], 1000 ), 5 ) extremum.append( [ np.min( taken_points[ : , 1 ] ), np.max( taken_points[ : , 1 ] ), np.min( taken_points[ : , 0 ] ), np.max( taken_points[ : , 0 ] ), ] ) border = { 'x': { 'min': points[0][1] + extremum[0][1] + 1, 'max': points[0][1] + extremum[1][0] , }, 'y': { 'min': points[2][0] + extremum[2][3] + 1, 'max': points[2][0] + extremum[3][2] , }, } if verbose: print( 'first zoom finished' ) print( 'starting label deletion' ) """ label deletion """ mean_data = np.convolve( np.gradient( np.mean( data[ border[ 'y' ][ 'min' ] : border[ 'y' ][ 'max' ], border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ] ], axis = 0 ) ), np.ones( int( 0.01 * ( border[ 'x' ][ 'max' ] - border[ 'x' ][ 'min' ] ) ) ), 'same' ) mean_data -= np.min( mean_data ) mean_data /= np.max( mean_data ) top = utils.consecutive( np.where( mean_data > 0.75 )[0] ) down = utils.consecutive( np.where( mean_data < 0.25 )[0] ) size_top = [ len( list_ ) for list_ in top ] size_down = [ len( list_ ) for list_ in down ] label_x = { 'min': border[ 'x' ][ 'min' ] + top[ np.argmax( size_top ) ][0] , 'max': border[ 'x' ][ 'min' ] + down[ np.argmax( size_down ) ][-1] } if label_x[ 'min' ] < data.shape[1] // 2: if label_x[ 'max' ] < data.shape[1] // 2: border[ 'x' ][ 'min' ] = label_x[ 'max' ] else: raise Exception( 'the label seems to be in the middle of the picture' ) elif label_x[ 'max' ] > data.shape[1] // 2: border[ 'x' ][ 'max' ] = label_x[ 'min' ] else: raise Exception( 'for an unkown reason, label_x[ \'min\' ] > label_x[ \'max\' ]' ) if verbose: print( 'label section deleted' ) print( 'starting rotation' ) """ Rotation """ index = border[ 'x' ][ 'min' ] gradient = np.gradient( data[ border[ 'y' ][ 'min' ] : border[ 'y' ][ 'min' ] + ( border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ] ) // 2, index ] ) while np.max( gradient ) - np.min( gradient ) > 5500: index += 1 gradient = np.gradient( data[ border[ 'y' ][ 'min' ] : border[ 'y' ][ 'min' ] + ( border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ] ) // 2, index ] ) positions = np.argmax( sp_convolve( np.gradient( data[ border[ 'y' ][ 'min' ] : border[ 'y' ][ 'min' ] + ( border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ] ) // 2 , border[ 'x' ][ 'min' ] : index ] , axis = 0 ) , np.ones( ( 100 , 1 ) ), 'valid' ) , axis = 0 ) list_ = np.arange( 0 , index - border[ 'x' ][ 'min' ] , 1 ) polyval = np.polyfit( list_ , positions , 1 ) angle = np.arctan( polyval[0] ) data = rotate( data , angle * ( 180 / np.pi ) ) # utils.rotate does not keep intensity absolute value ? TODO diff_y = int( np.tan( angle ) * ( border[ 'x' ][ 'max' ] - border[ 'x' ][ 'min' ] ) ) border[ 'y' ][ 'min' ] -= diff_y border[ 'y' ][ 'max' ] -= diff_y if verbose: print( 'rotation finished' ) print( 'starting isolating calibration' ) """ Calibration """ tot_avg = np.mean( data[ border[ 'y' ][ 'min' ] : border[ 'y' ][ 'max' ], border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ] ] ) def indicator( list_ ): if np.mean( list_ ) > 0.75 * tot_avg: return 0 if np.mean( list_ ) < 0.25 * tot_avg: return 1 list_ -= np.min( list_ ) list_ /= np.max( list_ ) positions = np.where( list_ > 0.5 )[0] if len( positions ) < 100: return 2 if len( positions ) > 400: return 3 distance = np.mean( positions[ 1 : ] - positions[ : -1 ] ) if distance < 10: return 4 return 10 indicators = np.array( [ indicator( data[ i , border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ] ].copy() ) for i in range( border[ 'y' ][ 'min' ] , border[ 'y' ][ 'max' ] , 1 ) ] ) calibration_areas = utils.consecutive( np.where( indicators == 10 )[0] ) calibration_areas = [ [ calibration_area for calibration_area in calibration_areas if calibration_area[-1] < ( border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ] ) / 2 ], [ calibration_area for calibration_area in calibration_areas if calibration_area[ 0] > ( border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ] ) / 2 ], ] calibration_sizes = [ [ len( calibration_area ) for calibration_area in calibration_areas[0] ], [ len( calibration_area ) for calibration_area in calibration_areas[1] ], ] y_calibrations = [ calibration_areas[0][ np.argmax( calibration_sizes[0] ) ], calibration_areas[1][ np.argmax( calibration_sizes[1] ) ], ] calibrations = { 'top': { 'x': { 'min': border['x']['min'], 'max': border['x']['max'], }, 'y': { 'min': border['y']['min'] + y_calibrations[0][ 0], 'max': border['y']['min'] + y_calibrations[0][-1], }, }, 'down': { 'x': { 'min': border['x']['min'], 'max': border['x']['max'], }, 'y': { 'min': border['y']['min'] + y_calibrations[1][ 0], 'max': border['y']['min'] + y_calibrations[1][-1], }, }, } if verbose: print( 'calibration lines isolated' ) print( 'starting curving main ETA' ) """ stripes curves detection """ list_ = data[ calibrations[ 'top' ][ 'y' ][ 'max' ] : calibrations[ 'down' ][ 'y' ][ 'min' ], border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ] ].copy() list_min = np.min( list_ , axis = 0 ) list_ -= list_min list_max = np.max( list_ , axis = 0 ) list_ /= list_max size = list_.shape[1] y_stripe = np.argmax( list_ , axis = 0 ) good_x = np.where( y_stripe < 2 * np.mean( y_stripe ) )[0] x_stripe = np.arange( 0 , size , 1 ).astype( int )[ good_x ] y_stripe = y_stripe[ good_x ] stripe = np.polyfit( x_stripe , y_stripe , 3 ) # First deformation list_ = data[ calibrations[ 'top' ][ 'y' ][ 'max' ] : calibrations[ 'down' ][ 'y' ][ 'min' ], border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ] ] y_diff = ( np.polyval( stripe, np.arange( 0 , border[ 'x' ][ 'max' ] - border[ 'x' ][ 'min' ] , 1 ) ) ).astype( int ) y_diff[ np.where( y_diff < 0 ) ] = 0 results = np.zeros( ( list_.shape[0] + np.max( y_diff ) , list_.shape[1] ) ) for i in range( list_.shape[1] ): results[ : , i ] = np.concatenate( ( np.zeros( np.max( y_diff ) - y_diff[ i ] ) , list_[ : , i ] , np.zeros( y_diff[i] ), ) ) data[ calibrations[ 'top' ][ 'y' ][ 'max' ] : calibrations[ 'down' ][ 'y' ][ 'min' ], border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ] ] = results[ : results.shape[0] - np.max( y_diff ), : ] if verbose: print( 'main ETA curved' ) if not cache_file.exists(): if verbose: print( 'writing result in cache' ) with shelve.open( str( cache_file ) ) as cache: cache[ 'data' ] = data cache[ 'border' ] = border cache[ 'calibrations'] = calibrations if verbose: print( 'cache saved' ) size_x = border[ 'x' ][ 'max' ] - border[ 'x' ][ 'min' ] size_y = calibrations[ 'down' ][ 'y' ][ 'min' ] - calibrations[ 'top' ][ 'y' ][ 'max' ] # Calibration if calibration != None: if verbose: print( 'starting calibration' ) import wavelength_calibration as wave_calib peaks_calib = np.loadtxt( calibration ) # sorted list peaks_calib = np.sort( peaks_calib ) mean_up = np.mean( data[ calibrations[ 'top' ][ 'y' ][ 'min' ] : calibrations[ 'top' ][ 'y' ][ 'max' ], calibrations[ 'top' ][ 'x' ][ 'min' ] : calibrations[ 'top' ][ 'x' ][ 'max' ] ] , axis = 0 ) mean_down = np.mean( data[ calibrations[ 'down' ][ 'y' ][ 'min' ] : calibrations[ 'down' ][ 'y' ][ 'max' ], calibrations[ 'down' ][ 'x' ][ 'min' ] : calibrations[ 'down' ][ 'x' ][ 'max' ] ] , axis = 0 ) peaks_up = np.array( utils.retrieve_peaks( mean_up , window_size = 1 , max_window_size = 1, ) ).astype( int ) peaks_down = np.array( utils.retrieve_peaks( mean_down , window_size = 1 , max_window_size = 1, ) ).astype( int ) peakless_up = wave_calib.remove_peaks( mean_up , peaks_up ) peakless_down = wave_calib.remove_peaks( mean_down , peaks_down ) calib = { # hard-coded for now 'first': 3 , 'last' : 20, } first , last = wave_calib.get_extremities( peakless_up , peaks_up ) up = { 'first': first, 'last' : last , } first , last = wave_calib.get_extremities( peakless_down , peaks_down ) down = { 'first': first, 'last' : last , } peaks_up = peaks_up[ up[ 'first' ] : up[ 'last' ] + 1 ] peaks_down = peaks_down[ down[ 'first' ] : down[ 'last' ] + 1 ] peaks_calib = peaks_calib[ calib[ 'first' ] : calib[ 'last' ] + 1 ] peaks_up = wave_calib.only_keep_calib( peaks_up , peaks_calib ) peaks_down = wave_calib.only_keep_calib( peaks_down , peaks_calib ) diff = peaks_up - peaks_down polyval_vert = np.polyfit( # give diff by horizontal pixel peaks_up, diff , 3 , ) if verbose: print( 'x pixel to wavelenth calibration finished' ) print( 'starting y pixel to x pixel calibration' ) def diff_calc( x , list_y ): """ give "good" x list for a given x and y x = 0 start from border[ 'x' ][ 'min' ] y = 0 start from calibrations[ 'top' ][ 'y' ][ 'max' ] """ y_top = 0 y_bot = size_y x_top = x x_bot = x + np.polyval( polyval_vert , x ) a = ( x_top - x_bot ) / ( y_top - y_bot ) b = ( 1 / 2 ) * ( x_top + x_bot - a * ( y_top + y_bot ) ) return ( a * list_y + b ).astype( int ) padding = diff_calc( size_x , size_y ) - size_x new_data = np.zeros( ( size_y , size_x - padding ) ) list_y = np.arange( 0 , size_y , 1 ) for x in range( size_x - padding ): diff = diff_calc( x , list_y ) cons_diff , i = utils.same_value( diff ) , 0 for list_same in cons_diff: new_data[ i : i + len( list_same ) , x ] = data[ calibrations[ 'top' ][ 'y' ][ 'max' ] + i : calibrations[ 'top' ][ 'y' ][ 'max' ] + i + len( list_same ), calibrations[ 'top' ][ 'x' ][ 'min' ] + list_same[0] ] i += len( list_same ) calibrations[ 'top' ][ 'x' ][ 'max' ] -= padding calibrations[ 'down' ][ 'x' ][ 'max' ] -= padding border[ 'x' ][ 'max' ] -= padding if verbose: print( 'y to x pixel calibrated' ) polyval_wavelength = np.polyfit( # x = 0 begins at the start of peaks_up , # calibrations[ 'top' ][ 'x' ][ 'min' ] peaks_calib, 1 , ) wavelength = np.polyval( polyval_wavelength, np.arange( 0 , len( mean_up ), 1 , ) ) if verbose: print( 'end of calibration' ) else: if verbose: print( 'no calibration' ) padding = 0 wavelength = np.arange( size_y ) if verbose: print( 'starting to flatten the ETA' ) list_ = np.convolve( np.gradient( np.mean( data[ calibrations[ 'top' ][ 'y' ][ 'max' ] : calibrations[ 'down' ][ 'y' ][ 'min' ], border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ] ] , axis = 1, ), ) , np.ones( 50 ), 'same' , ) fall = utils.consecutive( np.where( list_ < 0.03 * np.min( list_ ) )[0] ) fall = np.array( [ np.argmax( list_ ) ] + [ consecutive[0] + np.argmin( list_[ consecutive[0] : consecutive[-1] ] ) for consecutive in fall if len( consecutive ) > 20 # the value might change ] ).astype( int ) stairs = np.zeros( ( len( fall ) - 1 , size_x ) ) for x in range( size_x ): stairs[ : fall[0] , x ] = 0 # TODO: put the mean for i in range( len( fall ) - 1 ): stairs[ i , x ] = np.mean( data[ calibrations[ 'top' ][ 'y' ][ 'max' ] + fall[ i ] : calibrations[ 'top' ][ 'y' ][ 'max' ] + fall[ i + 1 ], border[ 'x' ][ 'min' ] + x ] ) if verbose: print( 'ETA flattened' ) print( 'outputting data' ) if output == None: print( stairs , wavelength ) else: output_file = pathlib.Path( output ) with shelve.open( str( output_file ) ) as output: output[ 'data' ] = data output[ 'wavelength' ] = wavelength if verbose: print( 'output sent' ) print( '===========================================\ \nend of ETA.py' )