import numpy as np import matplotlib.pyplot as plt import utils import sys import pathlib import shelve from scipy.signal import convolve as sp_convolve from scipy.signal import find_peaks from scipy.ndimage import rotate cache , filename , output , calibration = '' , None , None , None if len( sys.argv ) < 2: raise Exception( 'ETA.py: type \'ETA.py -h\' for more information' ) argv , i = sys.argv[ 1 : ] , 0 while i < len( argv ): arg = argv[ i ] if arg[0] == '-': if len( arg ) < 2: raise Exception( 'ETA.py: unknown argument, type \'ETA.py -h\' for more information' ) if arg[1] != '-': if arg == '-h': arg = '--help' elif arg == '-v': arg = '--version' elif arg == '-n': arg == '--no-cache' elif arg == '-c': if i == len( sys.argv ) - 1: raise Exception( 'ETA.py: cache have to take a value' ) argv[ i + 1 ] = '--cache=' + argv[ i + 1 ] i += 1 continue elif arg == '-o': if i == len( sys.argv ) - 1: raise Exception( 'ETA.py: output have to take a value' ) argv[ i + 1 ] = '--output=' + argv[ i + 1 ] i += 1 continue elif arg == '-a': if i == len( sys.argv ) - 1: raise Exception( 'ETA.py: calibration have to take a value' ) argv[ i + 1 ] = '--calibration=' + argv[ i + 1 ] i += 1 continue else: raise Exception( 'ETA.py: unknown argument "' + arg + '", type \'ETA.py -h\' for more information' ) if arg[1] == '-': # not elif because arg can change after last if if arg == '--help': print( 'ETA.py [options...] filename\ \n -a --calibration calibration file, default to no calibration.\ \n No calibration means no wavelength interpolation\ \n -c --cache use given cache\ \n -h --help show this help and quit\ \n -n --no-cache do not use cache and rewrite it\ \n -o --output output file, default to standard output\ \n -v --version show version number and quit\ \n\ \nParse a naroo ETA fits' ) exit() elif arg == '--version': print( '0.2' ) exit() elif arg == '--no-cache': cache = '' # hack, empty string mean this dir, so not a file elif len( arg ) > 8 and arg[ : 8 ] == '--cache=': cache = arg[ 8 : ] elif len( arg ) > 9 and arg[ : 9 ] == '--output=': output = arg[ 9 : ] elif len( arg ) > 14 and arg[ : 14 ] == '--calibration=': calibration = arg[ 14 : ] else: raise Exception( 'ETA.py: unknown argument "' + arg + '", type \'ETA.py -h\' for more information' ) else: raise Exception( 'ETA.py: this exception should never be raised' ) else: filename = arg i += 1 if filename == None: raise Exception( 'ETA.py: filename should be given' ) # TODO: check in advance file to check if exists or writeable data = utils.load( filename ) cache_file = pathlib.Path( cache ) if cache_file.is_file(): with shelve.open( str( cache_file ) ) as cache: for key in [ 'rotated_data' , 'border' , 'calibrations' ]: if key not in cache: raise Exception( 'ETA.py: missing data in cache_file' ) data = cache[ 'rotated_data' ] border = cache[ 'border' ] calibrations = cache[ 'calibrations' ] stripes = cache[ 'stripes' ] else: """ find fill points """ points = [] points += utils.find_point( data[ : , 0 ] , 0 ) # x_min points += utils.find_point( data[ : , data.shape[1] - 1 ], data.shape[1] - 1 , ) # x_max index_min = 0 while data.shape[0] - 1 > index_min: index_min += 1 if len( utils.find_point( data[ index_min , : ] , index_min , 'y' ) ) == 3: break points.append( utils.find_point( data[ index_min , : ] , index_min , 'y' )[1] ) # y_min index_max = data.shape[0] - 1 while index_min < index_max: index_max -= 1 if len( utils.find_point( data[ index_max , : ] , index_max , 'y' ) ) == 3: break points.append( utils.find_point( data[ index_max , : ] , index_max , 'y' )[1] ) # y_max small_data = utils.compress( data , 5 ) points = utils.big_to_small( points , 5 ) # size - 1 points[ 1 ][ 1 ] -= 1 points[ 3 ][ 0 ] -= 1 # little shift to be inside the light points[ 2 ][ 1 ] += 3 points[ 3 ][ 1 ] += 3 """ fill data """ extremum = [] for point in points: if point[0] < points[2][0]: point[0] = points[2][0] if point[1] < points[0][1]: point[1] = points[0][1] taken_points = utils.small_to_big( np.array( [ points[2][0], points[0][1], ] ) + utils.fill( small_data[ points[2][0] : points[3][0] + 1, points[0][1] : points[1][1] + 1 ], [ point[0] - points[2][0], point[1] - points[0][1], ], 1000 ), 5 ) extremum.append( [ np.min( taken_points[ : , 1 ] ), np.max( taken_points[ : , 1 ] ), np.min( taken_points[ : , 0 ] ), np.max( taken_points[ : , 0 ] ), ] ) border = { 'x': { 'min': points[0][1] + extremum[0][1] + 1, 'max': points[0][1] + extremum[1][0] , }, 'y': { 'min': points[2][0] + extremum[2][3] + 1, 'max': points[2][0] + extremum[3][2] , }, } """ label deletion """ mean_data = np.convolve( np.gradient( np.mean( data[ border[ 'y' ][ 'min' ] : border[ 'y' ][ 'max' ], border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ] ], axis = 0 ) ), np.ones( int( 0.01 * ( border[ 'x' ][ 'max' ] - border[ 'x' ][ 'min' ] ) ) ), 'same' ) mean_data -= np.min( mean_data ) mean_data /= np.max( mean_data ) top = utils.consecutive( np.where( mean_data > 0.75 )[0] ) down = utils.consecutive( np.where( mean_data < 0.25 )[0] ) size_top = [ len( list_ ) for list_ in top ] size_down = [ len( list_ ) for list_ in down ] label_x = { 'min': border[ 'x' ][ 'min' ] + top[ np.argmax( size_top ) ][0] , 'max': border[ 'x' ][ 'min' ] + down[ np.argmax( size_down ) ][-1] } if label_x[ 'min' ] < data.shape[1] // 2: if label_x[ 'max' ] < data.shape[1] // 2: border[ 'x' ][ 'min' ] = label_x[ 'max' ] else: raise Exception( 'the label seems to be in the middle of the picture' ) elif label_x[ 'max' ] > data.shape[1] // 2: border[ 'x' ][ 'max' ] = label_x[ 'min' ] else: raise Exception( 'for an unkown reason, label_x[ \'min\' ] > label_x[ \'max\' ]' ) """ Rotation """ index = border[ 'x' ][ 'min' ] gradient = np.gradient( data[ border[ 'y' ][ 'min' ] : border[ 'y' ][ 'min' ] + ( border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ] ) // 2, index ] ) while np.max( gradient ) - np.min( gradient ) > 5500: index += 1 gradient = np.gradient( data[ border[ 'y' ][ 'min' ] : border[ 'y' ][ 'min' ] + ( border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ] ) // 2, index ] ) positions = np.argmax( sp_convolve( np.gradient( data[ border[ 'y' ][ 'min' ] : border[ 'y' ][ 'min' ] + ( border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ] ) // 2 , border[ 'x' ][ 'min' ] : index ] , axis = 0 ) , np.ones( ( 100 , 1 ) ), 'valid' ) , axis = 0 ) list_ = np.arange( 0 , index - border[ 'x' ][ 'min' ] , 1 ) polyval = np.polyfit( list_ , positions , 1 ) angle = np.arctan( polyval[0] ) data = rotate( data , angle * ( 180 / np.pi ) ) # utils.rotate does not keep intensity absolute value ? TODO diff_y = int( np.tan( angle ) * ( border[ 'x' ][ 'max' ] - border[ 'x' ][ 'min' ] ) ) border[ 'y' ][ 'min' ] -= diff_y border[ 'y' ][ 'max' ] -= diff_y """ Calibration """ tot_avg = np.mean( data[ border[ 'y' ][ 'min' ] : border[ 'y' ][ 'max' ], border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ] ] ) def indicator( list_ ): if np.mean( list_ ) > 0.75 * tot_avg: return 0 if np.mean( list_ ) < 0.25 * tot_avg: return 1 list_ -= np.min( list_ ) list_ /= np.max( list_ ) positions = np.where( list_ > 0.5 )[0] if len( positions ) < 100: return 2 if len( positions ) > 400: return 3 distance = np.mean( positions[ 1 : ] - positions[ : -1 ] ) if distance < 10: return 4 return 10 indicators = np.array( [ indicator( data[ i , border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ] ].copy() ) for i in range( border[ 'y' ][ 'min' ] , border[ 'y' ][ 'max' ] , 1 ) ] ) calibration_areas = utils.consecutive( np.where( indicators == 10 )[0] ) calibration_areas = [ [ calibration_area for calibration_area in calibration_areas if calibration_area[-1] < ( border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ] ) / 2 ], [ calibration_area for calibration_area in calibration_areas if calibration_area[ 0] > ( border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ] ) / 2 ], ] calibration_sizes = [ [ len( calibration_area ) for calibration_area in calibration_areas[0] ], [ len( calibration_area ) for calibration_area in calibration_areas[1] ], ] y_calibrations = [ calibration_areas[0][ np.argmax( calibration_sizes[0] ) ], calibration_areas[1][ np.argmax( calibration_sizes[1] ) ], ] calibrations = { 'top': { 'x': { 'min': border['x']['min'], 'max': border['x']['max'], }, 'y': { 'min': border['y']['min'] + y_calibrations[0][ 0], 'max': border['y']['min'] + y_calibrations[0][-1], }, }, 'down': { 'x': { 'min': border['x']['min'], 'max': border['x']['max'], }, 'y': { 'min': border['y']['min'] + y_calibrations[1][ 0], 'max': border['y']['min'] + y_calibrations[1][-1], }, }, } """ stripes curves detection """ list_ = data[ calibrations[ 'top' ][ 'y' ][ 'max' ] : calibrations[ 'down' ][ 'y' ][ 'min' ], border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ] ].copy() list_min = np.min( list_ , axis = 0 ) list_ -= list_min list_max = np.max( list_ , axis = 0 ) list_ /= list_max size = list_.shape[1] y_stripe = np.argmax( list_ , axis = 0 ) good_x = np.where( y_stripe < 2 * np.mean( y_stripe ) )[0] x_stripe = np.arange( 0 , size , 1 ).astype( int )[ good_x ] y_stripe = y_stripe[ good_x ] stripes = [ # list of polyval result for each stripe np.polyfit( x_stripe , y_stripe , 3 ) ] if not cache_file.exists(): with shelve.open( str( cache_file ) ) as cache: cache[ 'rotated_data' ] = data cache[ 'border' ] = border cache[ 'calibrations'] = calibrations cache[ 'stripes' ] = stripes # Calibration if calibration != None: calib_peaks = np.loadtxt( calibration ) mean_calib_up = np.mean( data[ calibrations[ 'top' ][ 'y' ][ 'min' ] : calibrations[ 'top' ][ 'y' ][ 'max' ], calibrations[ 'top' ][ 'x' ][ 'min' ] : calibrations[ 'top' ][ 'x' ][ 'max' ] ] , axis = 0 ) mean_calib_up -= np.min( mean_calib_up ) mean_calib_up /= np.max( mean_calib_up ) peaks_up = find_peaks( mean_calib_up , height = 0.5 )[0] + calibrations[ 'top' ][ 'x' ][ 'min' ] diff = np.array( [ np.inf ] ) polyval = np.polyfit( peaks_up[ : len( calib_peaks ) ] , calib_peaks[ : len( peaks_up ) ] , 1 ) while np.max( diff ) > 1000: # we do not have the exact same number of peak (and the same peaks) in calibration model diff = abs( np.polyval( polyval , peaks_up[ : len( calib_peaks ) ] ) - calib_peaks[ : len( peaks_up ) ] ) point_to_delete = np.argmax( diff ) # get the "worst" point peaks_up_new = np.delete( peaks_up , [ point_to_delete ] ) # remove from data value ( other peak detected ) calib_peaks_new = np.delete( calib_peaks , [ point_to_delete ] ) # remove from model ( peak not detected ) polyfull_up = np.polyfit( peaks_up_new[ : len( calib_peaks ) ] , calib_peaks[ : len( peaks_up_new ) ] , 1 , full = True ) polyfull_calib = np.polyfit( peaks_up[ : len( calib_peaks_new ) ] , calib_peaks_new[ : len( peaks_up ) ] , 1 , full = True ) if polyfull_up[ 1 ][ 0 ] < polyfull_calib[ 1 ][ 0 ]: # which one is a better fit ? peaks_up = peaks_up_new polyval = polyfull_up[ 0 ] else: calib_peaks = calib_peaks_new polyval = polyfull_calib[ 0 ] x = np.arange( calibrations[ 'top' ][ 'x' ][ 'min' ] , calibrations[ 'top' ][ 'x' ][ 'max' ] , 1 ) x = np.polyval( polyval , x ) plt.margins( x = 0 ) plt.plot( x , mean_calib_up ) plt.show() exit() # First deformation list_ = data[ calibrations[ 'top' ][ 'y' ][ 'max' ] : calibrations[ 'down' ][ 'y' ][ 'min' ], border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ] ] y_diff = ( np.polyval( stripes[0] , np.arange( 0 , size , 1 ) ) ).astype( int ) y_diff[ np.where( y_diff < 0 ) ] = 0 results = np.zeros( ( list_.shape[0] + np.max( y_diff ) , list_.shape[1] ) ) for i in range( list_.shape[1] ): results[ : , i ] = np.concatenate( ( np.zeros( np.max( y_diff ) - y_diff[ i ] ) , list_[ : , i ] , np.zeros( y_diff[i] ) ) ) list_results = np.convolve( np.gradient( np.mean( results , axis = 1 ), ) , np.ones( 50 ), 'same' , ) fall = utils.consecutive( np.where( list_results < 0.03 * np.min( list_results ) )[0] ) fall = np.array( [ np.argmax( list_results ) ] + [ consecutive[0] + np.argmin( list_results[ consecutive[0] : consecutive[-1] ] ) for consecutive in fall ] ).astype( int ) stairs = np.zeros_like( results ) for x in range( size ): stairs[ : , x ] = results[ : , x ] # can be modified, but no used anymore so it's fine stairs[ : fall[0] , x ] = 0 # TODO: put the mean for i in range( len( fall ) - 1 ): stairs[ fall[ i ] : fall[ i + 1 ] , x ] = np.mean( stairs[ fall[ i ] : fall[ i + 1 ] ] ) stairs[ fall[ -1 ] : , x ] = 0 if output == None: print( stairs ) else: np.savetxt( output , stairs )