diff --git a/spectrum.py b/spectrum.py new file mode 100644 index 0000000..fd7abdb --- /dev/null +++ b/spectrum.py @@ -0,0 +1,437 @@ +import numpy as np +import matplotlib.pyplot as plt +import scipy.signal as signal +import utils +import sys +import shelve +import pathlib + +cache , filename , output , calibration , verbose , no_cache = '' , None , None , None , False, False +if len( sys.argv ) < 2: + raise Exception( 'spectrum.py: type \'spectrum.py -h\' for more information' ) + +argv , i = sys.argv[ 1 : ] , 0 +while i < len( argv ): + arg = argv[ i ] + if arg[0] == '-': + if len( arg ) < 2: + raise Exception( 'spectrum.py: unknown argument, type \'ETA.py -h\' for more information' ) + if arg[1] != '-': + if arg == '-h': + arg = '--help' + elif arg == '-v': + arg = '--version' + elif arg == '-l': + arg = '--verbose' + elif arg == '-n': + arg == '--no-cache' + elif arg == '-c': + if i == len( sys.argv ) - 1: + raise Exception( 'spectrum.py: cache have to take a value' ) + argv[ i + 1 ] = '--cache=' + argv[ i + 1 ] + i += 1 + continue + elif arg == '-o': + if i == len( sys.argv ) - 1: + raise Exception( 'spectrum.py: output have to take a value' ) + argv[ i + 1 ] = '--output=' + argv[ i + 1 ] + i += 1 + continue + elif arg == '-a': + if i == len( sys.argv ) - 1: + raise Exception( 'spectrum.py: calibration have to take a value' ) + argv[ i + 1 ] = '--calibration=' + argv[ i + 1 ] + i += 1 + continue + else: + raise Exception( 'spectrum.py: unknown argument "' + arg + '", type \'spectrum.py -h\' for more information' ) + if arg[1] == '-': # not elif because arg can change after last if + if arg == '--help': + print( 'spectrum.py [options...] filename\ + \n -a --calibration calibration file, default to no calibration.\ + \n No calibration means no wavelength interpolation\ + \n -c --cache use given cache\ + \n -h --help show this help and quit\ + \n -n --no-cache do not use cache and rewrite it\ + \n -o --output output file, default to standard output\ + \n -v --version show version number and quit\ + \n -l --verbose show more information to help debugging\ + \n\ + \nParse a naroo ETA fits' ) + exit() + elif arg == '--version': + print( '0.2' ) + exit() + elif arg == '--verbose': + verbose = True + elif arg == '--no-cache': + no_cache = True + elif len( arg ) > 8 and arg[ : 8 ] == '--cache=': + cache = arg[ 8 : ] + elif len( arg ) > 9 and arg[ : 9 ] == '--output=': + output = arg[ 9 : ] + elif len( arg ) > 14 and arg[ : 14 ] == '--calibration=': + calibration = arg[ 14 : ] + else: + raise Exception( 'spectrum.py: unknown argument "' + arg + '", type \'ETA.py -h\' for more information' ) + else: + raise Exception( 'spectrum.py: this exception should never be raised' ) + else: + filename = arg + i += 1 +if filename == None: + raise Exception( 'spectrum.py: filename should be given' ) + +if verbose: + cache, filename, output, calibration, verbose + print( f'spectrum.py: launching now with parameters:\ + \n --filename: {filename}\ + \n --cache: {cache} ( default: \'\' )\ + \n --calibration: {calibration} ( default to None )\ + \n --output: {output} ( default to None )\ + \n --verbose: True ( default to False)\ + \n\ + \n===========================================' ) +# TODO: check in advance file to check if exists or writeable + +data = utils.load( filename ) +if verbose: + print( 'data loaded' ) + +cache_file = pathlib.Path( cache ) + +if cache_file.is_file() and not no_cache: + if verbose: + print( 'using cache' ) + with shelve.optn( str( cache_file ) ) as cache: + for key in [ 'data' , 'border' , 'calibrations' ]: + if key not in cache: + raise Exception( 'spectrum.py: missing data in cache file' ) + data = cache[ 'data' ] + border = cache[ 'border'] + spectrum = cache[ 'spectrum' ] + calibrations = cache[ 'calibrations' ] +else: + if verbose: + print( 'not using cache' ) + print( 'starting first zoom' ) + """ + find fill point + """ + points = [] + + points += utils.find_point( data[ : , 0 ] , 0 ) # x_min + points += utils.find_point( + data[ : , data.shape[1] - 1 ], + data.shape[1] - 1 + ) # x_max + + index_min = 0 + while data.shape[0] - 1 > index_min: + index_min += 1 + if len( utils.find_point( + data[ index_min , : ], + index_min , + 'y' , + ) ) == 3: + break + points.append( + utils.find_point( + data[ index_min , : ], + index_min , + 'y' , + )[1] + ) # y_min + + index_max = data.shape[0] - 1 + while index_min < index_max: + index_max -= 1 + if len( utils.find_point( + data[ index_max , : ], + index_max , + 'y' , + ) ) == 3: + break + + points.append( + utils.find_point( + data[ index_max , : ], + index_max , + 'y' , + )[1] + ) + + small_data = utils.compress( data , 5 ) + points = utils.big_to_small( points , 5 ) + + # size - 1 + points[ 1 ][ 1 ] -= 1 + points[ 3 ][ 0 ] -= 1 + + # little shift to be inside the light + points[ 2 ][ 1 ] += 3 + points[ 3 ][ 1 ] += 3 + + """ + fill data + """ + + extremum = [] + for point in points: + if point[0] < points[2][0]: + point[0] = points[2][0] + if point[1] < points[0][1]: + point[1] = points[0][1] + taken_points = utils.small_to_big( + np.array( [ + points[2][0], + points[0][1], + ] ) + utils.fill( + small_data[ + points[2][0] : points[3][0] + 1, + points[0][1] : points[1][1] + 1, + ] , + [ + point[0] - points[2][0], + point[1] - points[0][1], + ] , + 1000, + ), + 5, + ) + extremum.append( [ + np.min( taken_points[ : , 1 ] ), + np.max( taken_points[ : , 1 ] ), + np.min( taken_points[ : , 0 ] ), + np.max( taken_points[ : , 0 ] ), + ] ) + + border = { + 'x': { + 'min': points[0][1] + extremum[0][1] + 1, + 'max': points[0][1] + extremum[1][0] , + }, + 'y': { + 'min': points[2][0] + extremum[2][3] + 1, + 'max': points[2][0] + extremum[3][2] , + }, + } + + if verbose: + print( 'first zoom finished' ) + print( 'starting spectrum isolation' ) + print( 'starting y border detection' ) + + """ + Spectrum y + """ + + list_ = np.mean( + data[ + border[ 'y' ][ 'min' ] : border[ 'y' ][ 'max' ], + border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ], + ] , + axis = 1, + ) + indexes = utils.near_value( + list_ , + np.mean( list_ ) + ( np.max( list_ ) - np.mean( list_ ) ) / 10, + ) + border[ 'y' ][ 'min' ] + + spectrum = { + 'y': { + 'min': indexes[0], + 'max': indexes[1], + }, + } + + if verbose: + print( 'y border detection finished' ) + print( 'starting x border detection' ) + + """ + Spectrum x + """ + + list_ = np.convolve( + np.mean( + data[ + spectrum[ 'y' ][ 'min' ] : spectrum[ 'y' ][ 'max' ], + border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ] , + ] , + axis = 0, + ) , + np.ones( 200 ), + 'valid' , + ) + abciss = np.arange( + border[ 'x' ][ 'min' ] + 100, + border[ 'x' ][ 'max' ] - 99 , + 1 , + ) + indexes = utils.near_value( + list_ , + np.min( list_ ) + ( np.mean( list_ ) - np.min( list_ ) ), + ) + factor = 1 + while len( indexes ) == 2: + factor += 1 + indexes = utils.near_value( + list_ , + np.min( list_ ) + ( np.mean( list_ ) - np.min( list_ ) ) / factor, + ) + factor -= 1 + indexes = utils.near_value( + list_ , + np.min( list_ ) + ( np.mean( list_ ) - np.min( list_ ) ) / factor, + ) + border[ 'x' ][ 'min' ] + 100 # valid convolution only + + spectrum[ 'x' ] = { + 'min': indexes[ 0 ], + 'max': indexes[ 1 ], + } + + if verbose: + print( 'x border detection finished' ) + print( 'spectrum isolation finished' ) + print( 'starting calibration isolation' ) + + """ + Calibration + """ + + def indicator( list_ ): + """ + define an indicator which define if the horizontal slice has a + chance to be a part of a calibration + """ + list_ = list_.copy() # do not change data + list_ -= np.min( list_ ) # min 0 + list_ /= np.max( list_ ) # max 1 + + amplitude = np.mean( list_ ) # the lower the better + peaks = signal.find_peaks( + list_ , + height = np.mean( list_ ) + ( 1 - np.mean( list_ ) ) / 2, + )[0] + number = 0.01 + abs( len( peaks ) - 90 ) # the lower the better + intensity = np.sum( list_[ peaks ] ) + return intensity / ( amplitude ** 2 * number ) + + indicators = np.convolve( + np.array( [ + indicator( + data[ + i , + spectrum[ 'x' ][ 'min' ] : spectrum[ 'x' ][ 'max' ], + ], + ) for i in range( + border[ 'y' ][ 'min' ], + border[ 'y' ][ 'max' ], + 1 , + ) + ] ) , + np.ones( 10 ), + 'valid' , + ) + indicators /= np.max( indicators ) + calibration_areas = utils.consecutive( np.where( indicators > 1 / 1000 )[0] ) + calibration_areas = [ + [ calibration_area for calibration_area in calibration_areas if ( + calibration_area[0] < ( + border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ] + ) / 2 + ) ], + [ calibration_area for calibration_area in calibration_areas if ( + calibration_area[0] > ( + border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ] + ) / 2 + ) ], + ] + calibration_sizes = [ + [ len( calibration_area ) for calibration_area in calibration_areas[0] ], + [ len( calibration_area ) for calibration_area in calibration_areas[1] ], + ] + calibrations_y = [ + calibration_areas[0][ + np.argmax( calibration_sizes[0] ) + ], + calibration_areas[1][ + np.argmax( calibration_sizes[1] ) + ], + ] + + calibrations = { + 'top': { + 'x': { + 'min': spectrum[ 'x' ][ 'min' ], + 'max': spectrum[ 'x' ][ 'max' ], + }, + 'y': { + 'min': border[ 'y' ][ 'min' ] + calibrations_y[0][0], + 'max': border[ 'y' ][ 'min' ] + calibrations_y[0][-1], + }, + }, + 'down': { + 'x': { + 'min': spectrum[ 'x' ][ 'min' ], + 'max': spectrum[ 'x' ][ 'max' ], + }, + 'y': { + 'min': border[ 'y' ][ 'min' ] + calibrations_y[1][0], + 'max': border[ 'y' ][ 'min' ] + calibrations_y[1][-1], + }, + } + } + + if verbose: + print( 'calibration isolation finished' ) + + if not cache_file.exists() and not no_cache: + if verbose: + print( 'writing result in cache' ) + with shelve.open( str( cache_file ) ) as cache: + cache[ 'data' ] = data + cache[ 'border' ] = border + cache[ 'spectrum' ] = spectrum + cache[ 'calibrations' ] = calibrations + if verbose: + print( 'cache saved' ) + +""" +Calibration +""" + +if calibration != None: + if verbose: + print( 'starting calibration' ) + if verbose: + print( 'calibration finished' ) + +if verbose: + print( 'starting output writing' ) + +if output == None: + print( np.mean( + data[ + spectrum[ 'y' ][ 'min' ] : spectrum[ 'y' ][ 'max' ], + specturm[ 'x' ][ 'min' ] : spectrum[ 'x' ][ 'max' ], + ] , + axis = 0, + ) ) +else: + if verbose: + print( 'storing result in ' + output ) + output_file = pathlib.Path( output ) + with shelve.open( str( output_file ) ) as output: + output[ 'data' ] = np.mean( + data[ + spectrum[ 'y' ][ 'min' ] : spectrum[ 'y' ][ 'max' ], + spectrum[ 'x' ][ 'min' ] : spectrum[ 'x' ][ 'max' ], + ] , + axis = 0, + ) +if verbose: + print( 'output writing finished' ) + print( '===========================================\ + \nend of spectrum.py' )