818 lines
24 KiB
Python
Executable file
818 lines
24 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
from scipy.signal import medfilt , find_peaks
|
|
from scipy.signal import convolve as sp_convolve
|
|
from scipy.optimize import curve_fit
|
|
import utils
|
|
import sys
|
|
import shelve
|
|
import pathlib
|
|
from scipy.ndimage import rotate
|
|
from astropy.io import fits
|
|
|
|
cache , filename , output , calibration , intensity_calibration
|
|
, verbose , no_cache = None , None , None , None , None , False, False
|
|
if len( sys.argv ) < 2:
|
|
raise Exception( 'spectrum.py: type \'spectrum.py -h\' for more information' )
|
|
|
|
argv , i = sys.argv[ 1 : ] , 0
|
|
while i < len( argv ):
|
|
arg = argv[ i ]
|
|
if arg[0] == '-':
|
|
if len( arg ) < 2:
|
|
raise Exception( 'spectrum.py: unknown argument, type \'ETA.py -h\' for more information' )
|
|
if arg[1] != '-':
|
|
if arg == '-h':
|
|
arg = '--help'
|
|
elif arg == '-V':
|
|
arg = '--version'
|
|
elif arg == '-v':
|
|
arg = '--verbose'
|
|
elif arg == '-n':
|
|
arg == '--no-cache'
|
|
elif arg == '-c':
|
|
if i == len( sys.argv ) - 1:
|
|
raise Exception( 'spectrum.py: cache have to take a value' )
|
|
argv[ i + 1 ] = '--cache=' + argv[ i + 1 ]
|
|
i += 1
|
|
continue
|
|
elif arg == '-o':
|
|
if i == len( sys.argv ) - 1:
|
|
raise Exception( 'spectrum.py: output have to take a value' )
|
|
argv[ i + 1 ] = '--output=' + argv[ i + 1 ]
|
|
i += 1
|
|
continue
|
|
elif arg == '-w':
|
|
if i == len( sys.argv ) - 1:
|
|
raise Exception( 'spectrum.py: wavelength have to take a value' )
|
|
argv[ i + 1 ] = '--wavelength=' + argv[ i + 1 ]
|
|
i += 1
|
|
continue
|
|
elif arg == '-i':
|
|
if i == len( sys.argv ) - 1:
|
|
raise Exception( 'spectrum.py: intensity have to take a value' )
|
|
argv[ i + 1 ] = '--intensity=' + argv[ i + 1 ]
|
|
i += 1
|
|
continue
|
|
else:
|
|
raise Exception( 'spectrum.py: unknown argument "' + arg + '", type \'spectrum.py -h\' for more information' )
|
|
if arg[1] == '-': # not elif because arg can change after last if
|
|
if arg == '--help':
|
|
print( 'spectrum.py [options...] filename\
|
|
\n -w --wavelength wavelength calibration file, default to no calibration.\
|
|
\n No calibration means no wavelength interpolation\
|
|
\n -i --intensity intensity calibration file, default to no calibration.\
|
|
\n No calibration means no intensity interpolation\
|
|
\n -c --cache use given cache\
|
|
\n -h --help show this help and quit\
|
|
\n -n --no-cache do not use cache and rewrite it\
|
|
\n -o --output output file, default to standard output\
|
|
\n -V --version show version number and quit\
|
|
\n -v --verbose show more information to help debugging\
|
|
\n\
|
|
\nParse a naroo spectrum fits' )
|
|
exit()
|
|
elif arg == '--version':
|
|
print( '0.3' )
|
|
exit()
|
|
elif arg == '--verbose':
|
|
verbose = True
|
|
elif arg == '--no-cache':
|
|
no_cache = True
|
|
elif len( arg ) > 8 and arg[ : 8 ] == '--cache=':
|
|
cache = arg[ 8 : ]
|
|
elif len( arg ) > 9 and arg[ : 9 ] == '--output=':
|
|
output = arg[ 9 : ]
|
|
elif len( arg ) > 13 and arg[ : 13 ] == '--wavelength=':
|
|
calibration = arg[ 13 : ]
|
|
elif len( arg ) > 12 and arg[ : 12 ] == '--intensity=':
|
|
intensity = arg[ 12 : ]
|
|
else:
|
|
raise Exception( 'spectrum.py: unknown argument "' + arg + '", type \'ETA.py -h\' for more information' )
|
|
else:
|
|
raise Exception( 'spectrum.py: this exception should never be raised' )
|
|
else:
|
|
filename = arg
|
|
i += 1
|
|
if filename == None:
|
|
raise Exception( 'spectrum.py: filename should be given' )
|
|
|
|
if verbose:
|
|
cache, filename, output, calibration, verbose
|
|
print( f'spectrum.py: launching now with parameters:\
|
|
\n --filename: {filename}\
|
|
\n --cache: {cache} ( default: \'\' )\
|
|
\n --wavelength: {calibration} ( default to None )\
|
|
\n --intensity: {intensity} ( default to None )\
|
|
\n --output: {output} ( default to None )\
|
|
\n --verbose: True ( default to False)\
|
|
\n\
|
|
\n===========================================' )
|
|
# TODO: check in advance file to check if exists or writeable
|
|
|
|
files = {}
|
|
if calibration != None:
|
|
files[ 'wavelength' ] = pathlib.Path( calibration )
|
|
if intensity != None:
|
|
files[ 'intensity' ] = pathlib.Path( intensity )
|
|
if cache != None:
|
|
files[ 'cache' ] = pathlib.Path( cache )
|
|
|
|
files[ 'filename' ] = pathlib.Path( filename )
|
|
|
|
for name , path in files.items():
|
|
if name in [
|
|
'wavelength',
|
|
'intensity' ,
|
|
'spectrum' ,
|
|
] and not path.is_file():
|
|
raise Exception(
|
|
'spectrum.py: could not open the ' + name + ' file'
|
|
)
|
|
|
|
hdul = fits.open( filename )
|
|
data = hdul[0].data
|
|
head = hdul[0].header
|
|
hdul.close()
|
|
if verbose:
|
|
print( 'data loaded' )
|
|
|
|
if 'cache' in files and files[ 'cache' ].is_file() and not no_cache:
|
|
if verbose:
|
|
print( 'using cache' )
|
|
with shelve.open( str( files[ 'cache' ] ) ) as cache:
|
|
for key in [ 'data' , 'border' , 'calibrations' ]:
|
|
if key not in cache:
|
|
raise Exception(
|
|
'spectrum.py: missing data in cache file'
|
|
)
|
|
data = cache[ 'data' ]
|
|
border = cache[ 'border']
|
|
spectrum = cache[ 'spectrum' ]
|
|
calibrations = cache[ 'calibrations' ]
|
|
else:
|
|
if verbose:
|
|
print( 'not using cache' )
|
|
print( 'starting first zoom' )
|
|
"""
|
|
find fill point
|
|
"""
|
|
points = []
|
|
|
|
points += utils.find_point( data[ : , 0 ] , 0 ) # x_min
|
|
points += utils.find_point(
|
|
data[ : , data.shape[1] - 1 ],
|
|
data.shape[1] - 1
|
|
) # x_max
|
|
|
|
index_min = 0
|
|
while data.shape[0] - 1 > index_min:
|
|
index_min += 1
|
|
if len( utils.find_point(
|
|
data[ index_min , : ],
|
|
index_min ,
|
|
'y' ,
|
|
) ) == 3:
|
|
break
|
|
points.append(
|
|
utils.find_point(
|
|
data[ index_min , : ],
|
|
index_min ,
|
|
'y' ,
|
|
)[1]
|
|
) # y_min
|
|
|
|
index_max = data.shape[0] - 1
|
|
while index_min < index_max:
|
|
index_max -= 1
|
|
if len( utils.find_point(
|
|
data[ index_max , : ],
|
|
index_max ,
|
|
'y' ,
|
|
) ) == 3:
|
|
break
|
|
|
|
points.append(
|
|
utils.find_point(
|
|
data[ index_max , : ],
|
|
index_max ,
|
|
'y' ,
|
|
)[1]
|
|
)
|
|
|
|
small_data = utils.compress( data , 5 )
|
|
points = utils.big_to_small( points , 5 )
|
|
|
|
# size - 1
|
|
points[ 1 ][ 1 ] -= 1
|
|
points[ 3 ][ 0 ] -= 1
|
|
|
|
# little shift to be inside the light
|
|
points[ 2 ][ 1 ] += 3
|
|
points[ 3 ][ 1 ] += 3
|
|
|
|
"""
|
|
fill data
|
|
"""
|
|
|
|
extremum = []
|
|
for point in points:
|
|
if point[0] < points[2][0]:
|
|
point[0] = points[2][0]
|
|
if point[1] < points[0][1]:
|
|
point[1] = points[0][1]
|
|
taken_points = utils.small_to_big(
|
|
np.array( [
|
|
points[2][0],
|
|
points[0][1],
|
|
] ) + utils.fill(
|
|
small_data[
|
|
points[2][0] : points[3][0] + 1,
|
|
points[0][1] : points[1][1] + 1,
|
|
] ,
|
|
[
|
|
point[0] - points[2][0],
|
|
point[1] - points[0][1],
|
|
] ,
|
|
1000,
|
|
),
|
|
5,
|
|
)
|
|
extremum.append( [
|
|
np.min( taken_points[ : , 1 ] ),
|
|
np.max( taken_points[ : , 1 ] ),
|
|
np.min( taken_points[ : , 0 ] ),
|
|
np.max( taken_points[ : , 0 ] ),
|
|
] )
|
|
|
|
border = {
|
|
'x': {
|
|
'min': points[0][1] + extremum[0][1] + 1,
|
|
'max': points[0][1] + extremum[1][0] ,
|
|
},
|
|
'y': {
|
|
'min': points[2][0] + extremum[2][3] + 1,
|
|
'max': points[2][0] + extremum[3][2] ,
|
|
},
|
|
}
|
|
|
|
if verbose:
|
|
print( 'first zoom finished' )
|
|
print( 'starting rotation' )
|
|
print( 'retrieving current angle' )
|
|
|
|
"""
|
|
Rotation
|
|
"""
|
|
|
|
gauss = lambda x , sigma , mu , a , b : a * (
|
|
1 / ( sigma * np.sqrt( 2 * np.pi ) )
|
|
) * np.exp(
|
|
- ( x - mu ) ** 2 / ( 2 * sigma ** 2 )
|
|
) + b
|
|
guess_params = [
|
|
1 ,
|
|
( border[ 'x' ][ 'max' ] - border[ 'x' ][ 'min' ] ) / 2,
|
|
np.mean( data[
|
|
border[ 'y' ][ 'min' ] : border[ 'y' ][ 'max' ],
|
|
border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ]
|
|
] ) ,
|
|
np.mean( data[
|
|
border[ 'y' ][ 'min' ] : border[ 'y' ][ 'max' ],
|
|
border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ]
|
|
] ) ,
|
|
]
|
|
number = 1000
|
|
position_peaks = np.zeros( number )
|
|
indexes = np.linspace(
|
|
border[ 'x' ][ 'min' ],
|
|
border[ 'x' ][ 'max' ],
|
|
number , dtype = int ,
|
|
)
|
|
for i in range( number ):
|
|
x = np.arange(
|
|
border[ 'y' ][ 'min' ],
|
|
border[ 'y' ][ 'max' ],
|
|
1 ,
|
|
)
|
|
y = data[
|
|
border[ 'y' ][ 'min' ] : border[ 'y' ][ 'max' ],
|
|
indexes[ i ]
|
|
]
|
|
|
|
try:
|
|
position_peaks[ i ] = curve_fit(
|
|
gauss ,
|
|
x ,
|
|
y ,
|
|
guess_params,
|
|
)[0][1]
|
|
except:
|
|
position_peaks[ i ] = 0
|
|
position_peaks = medfilt(
|
|
position_peaks[ np.where( position_peaks != 0 ) ],
|
|
11 ,
|
|
)
|
|
abciss = np.arange(
|
|
len( position_peaks )
|
|
)[ np.where( position_peaks ) ]
|
|
polyval = np.polyfit( abciss , position_peaks , 1 )
|
|
|
|
angle = np.arctan( polyval[0] )
|
|
|
|
if verbose:
|
|
print( 'current angle retrieved: ' + str( angle ) )
|
|
print( 'starting image rotation' )
|
|
|
|
data = rotate( data , angle * ( 180 / np.pi ) ) # utils.rotate does not keep intenisty absolute value TODO
|
|
|
|
if verbose:
|
|
print( 'image rotation finished' )
|
|
print( 'rotation finished' )
|
|
print( 'starting spectrum isolation' )
|
|
print( 'starting y border detection' )
|
|
|
|
"""
|
|
Spectrum y
|
|
"""
|
|
|
|
list_ = np.mean(
|
|
data[
|
|
border[ 'y' ][ 'min' ] : border[ 'y' ][ 'max' ],
|
|
border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ],
|
|
] ,
|
|
axis = 1,
|
|
)
|
|
indexes = utils.near_value(
|
|
list_ ,
|
|
np.mean( list_ ) + ( np.max( list_ ) - np.mean( list_ ) ) / 10,
|
|
) + border[ 'y' ][ 'min' ]
|
|
|
|
spectrum = {
|
|
'y': {
|
|
'min': indexes[0],
|
|
'max': indexes[1],
|
|
},
|
|
}
|
|
|
|
if verbose:
|
|
print( 'y border detection finished' )
|
|
print( 'starting x border detection' )
|
|
|
|
"""
|
|
Spectrum x
|
|
"""
|
|
|
|
list_ = np.convolve(
|
|
np.mean(
|
|
data[
|
|
spectrum[ 'y' ][ 'min' ] : spectrum[ 'y' ][ 'max' ],
|
|
border[ 'x' ][ 'min' ] : border[ 'x' ][ 'max' ] ,
|
|
] ,
|
|
axis = 0,
|
|
) ,
|
|
np.ones( 200 ),
|
|
'valid' ,
|
|
)
|
|
abciss = np.arange(
|
|
border[ 'x' ][ 'min' ] + 100,
|
|
border[ 'x' ][ 'max' ] - 99 ,
|
|
1 ,
|
|
)
|
|
indexes = utils.near_value(
|
|
list_ ,
|
|
np.min( list_ ) + ( np.mean( list_ ) - np.min( list_ ) ),
|
|
)
|
|
factor = 1
|
|
while len( indexes ) == 2:
|
|
factor += 1
|
|
indexes = utils.near_value(
|
|
list_ ,
|
|
np.min(
|
|
list_,
|
|
) + (
|
|
np.mean(
|
|
list_,
|
|
) - np.min(
|
|
list_,
|
|
)
|
|
) / factor,
|
|
)
|
|
factor -= 1
|
|
indexes = utils.near_value(
|
|
list_ ,
|
|
np.min(
|
|
list_,
|
|
) + (
|
|
np.mean(
|
|
list_,
|
|
) - np.min(
|
|
list_,
|
|
)
|
|
) / factor,
|
|
) + border[ 'x' ][ 'min' ] + 100 # valid convolution only
|
|
|
|
spectrum[ 'x' ] = {
|
|
'min': indexes[ 0 ],
|
|
'max': indexes[ 1 ],
|
|
}
|
|
|
|
if verbose:
|
|
print( 'x border detection finished' )
|
|
print( 'spectrum isolation finished' )
|
|
print( 'starting calibration isolation' )
|
|
|
|
"""
|
|
Calibration
|
|
"""
|
|
|
|
def indicator( list_ ):
|
|
"""
|
|
define an indicator which define if the horizontal slice has a
|
|
chance to be a part of a calibration
|
|
"""
|
|
list_ = list_.copy() # do not change data
|
|
list_ -= np.min( list_ ) # min 0
|
|
list_ /= np.max( list_ ) # max 1
|
|
|
|
amplitude = np.mean( list_ ) # the lower the better
|
|
peaks = find_peaks(
|
|
list_ ,
|
|
height = np.mean( list_ ) + ( 1 - np.mean( list_ ) ) / 2,
|
|
)[0]
|
|
number = 0.01 + abs( len( peaks ) - 90 ) # the lower the
|
|
# better
|
|
intensity = np.sum( list_[ peaks ] )
|
|
return intensity / ( amplitude ** 2 * number )
|
|
|
|
indicators = np.convolve(
|
|
np.array( [
|
|
indicator(
|
|
data[
|
|
i ,
|
|
spectrum[ 'x' ][ 'min' ] : spectrum[ 'x' ][ 'max' ],
|
|
],
|
|
) for i in range(
|
|
border[ 'y' ][ 'min' ],
|
|
border[ 'y' ][ 'max' ],
|
|
1 ,
|
|
)
|
|
] ) ,
|
|
np.ones( 10 ),
|
|
'valid' ,
|
|
)
|
|
indicators /= np.max( indicators )
|
|
calibration_areas = utils.consecutive(
|
|
np.where(
|
|
indicators > 1 / 1000,
|
|
)[0],
|
|
)
|
|
calibration_areas = [
|
|
[
|
|
calibration_area for calibration_area in calibration_areas if (
|
|
calibration_area[0] < (
|
|
border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ]
|
|
) / 2
|
|
)
|
|
],
|
|
[
|
|
calibration_area for calibration_area in calibration_areas if (
|
|
calibration_area[0] > (
|
|
border[ 'y' ][ 'max' ] - border[ 'y' ][ 'min' ]
|
|
) / 2
|
|
)
|
|
],
|
|
]
|
|
calibration_sizes = [
|
|
[
|
|
len(
|
|
calibration_area
|
|
) for calibration_area in calibration_areas[0]
|
|
],
|
|
[
|
|
len(
|
|
calibration_area
|
|
) for calibration_area in calibration_areas[1]
|
|
],
|
|
]
|
|
calibrations_y = [
|
|
calibration_areas[0][
|
|
np.argmax( calibration_sizes[0] )
|
|
],
|
|
calibration_areas[1][
|
|
np.argmax( calibration_sizes[1] )
|
|
],
|
|
]
|
|
|
|
calibrations = {
|
|
'top': {
|
|
'x': {
|
|
'min': spectrum[ 'x' ][ 'min' ],
|
|
'max': spectrum[ 'x' ][ 'max' ],
|
|
},
|
|
'y': {
|
|
'min': border[ 'y' ][ 'min' ] + calibrations_y[0][0],
|
|
'max': border[ 'y' ][ 'min' ] + calibrations_y[0][-1],
|
|
},
|
|
},
|
|
'down': {
|
|
'x': {
|
|
'min': spectrum[ 'x' ][ 'min' ],
|
|
'max': spectrum[ 'x' ][ 'max' ],
|
|
},
|
|
'y': {
|
|
'min': border[ 'y' ][ 'min' ] + calibrations_y[1][0],
|
|
'max': border[ 'y' ][ 'min' ] + calibrations_y[1][-1],
|
|
},
|
|
}
|
|
}
|
|
|
|
if verbose:
|
|
print( 'calibration isolation finished' )
|
|
|
|
if 'cache' in files and not files[ 'cache' ].exists() and not no_cache:
|
|
if verbose:
|
|
print( 'writing result in cache' )
|
|
with shelve.open( str( files[ 'cache' ] ) ) as cache:
|
|
cache[ 'data' ] = data
|
|
cache[ 'border' ] = border
|
|
cache[ 'spectrum' ] = spectrum
|
|
cache[ 'calibrations' ] = calibrations
|
|
if verbose:
|
|
print( 'cache saved' )
|
|
|
|
"""
|
|
Calibration
|
|
"""
|
|
|
|
wavelengths = np.arange(
|
|
spectrum[ 'x' ][ 'max' ] - spectrum[ 'x' ][ 'min' ]
|
|
)
|
|
|
|
if calibration != None:
|
|
if verbose:
|
|
print( 'starting wavelength calibration' )
|
|
|
|
mean_data = np.mean( data[
|
|
spectrum[ 'y' ][ 'min' ] : spectrum[ 'y' ][ 'max' ],
|
|
spectrum[ 'x' ][ 'min' ] : spectrum[ 'x' ][ 'max' ]
|
|
] , axis = 0 )
|
|
abciss = np.arange( len( mean_data ) )
|
|
|
|
ref = np.array( [
|
|
6562.79,
|
|
4861.35,
|
|
4340.47,
|
|
4101.73,
|
|
3970.08,
|
|
3889.06,
|
|
3835.40,
|
|
# 3646 ,
|
|
] ) * 1e-10
|
|
start , end = 5000 , 18440
|
|
|
|
polyval_before = np.polyfit(
|
|
abciss[ : start ] ,
|
|
mean_data[ : start ],
|
|
2 ,
|
|
)
|
|
polyval_middle = np.polyfit(
|
|
abciss[ start : end ] ,
|
|
mean_data[ start : end ],
|
|
2 ,
|
|
)
|
|
polyval_end = np.polyfit(
|
|
abciss[ end : ] ,
|
|
mean_data[ end : ],
|
|
2 ,
|
|
)
|
|
|
|
mean_data[ : start ] = mean_data[ : start ] - np.polyval(
|
|
polyval_before ,
|
|
abciss[ : start ],
|
|
)
|
|
mean_data[ start : end ] = mean_data[ start : end ] - np.polyval(
|
|
polyval_middle ,
|
|
abciss[ start : end ],
|
|
)
|
|
mean_data[ end : ] = mean_data[ end : ] - np.polyval(
|
|
polyval_end ,
|
|
abciss[ end : ],
|
|
)
|
|
|
|
mean_data_normalized = mean_data.copy() # normalization
|
|
mean_data_normalized -= np.min( mean_data_normalized )
|
|
mean_data_normalized /= np.max( mean_data_normalized )
|
|
|
|
lines = [
|
|
np.mean(
|
|
cons,
|
|
) for cons in utils.consecutive(
|
|
np.where(
|
|
mean_data_normalized < 0.3,
|
|
)[0],
|
|
)
|
|
]
|
|
lines = np.array( lines[ : len( ref ) - 1 ] ) # Balmer discontinuity
|
|
|
|
ref = ref[ 1 : ] # start with H-beta
|
|
|
|
wavelength_polyval = np.polyfit( lines , ref , 1 )
|
|
|
|
wavelengths = np.polyval( wavelength_polyval , abciss )
|
|
|
|
if verbose:
|
|
print( 'wavelength calibration finished' )
|
|
|
|
if verbose:
|
|
print( 'starting bias substraction' )
|
|
|
|
bias = {
|
|
'top': np.mean(
|
|
data[
|
|
calibrations[ 'top' ][ 'y' ][ 'min' ] - 200 :
|
|
calibrations[ 'top' ][ 'y' ][ 'min' ] ,
|
|
calibrations[ 'top' ][ 'x' ][ 'min' ] :
|
|
calibrations[ 'top' ][ 'x' ][ 'max' ]
|
|
] ,
|
|
axis = 0,
|
|
),
|
|
'down': np.mean(
|
|
data[
|
|
calibrations[ 'down' ][ 'y' ][ 'max' ] :
|
|
calibrations[ 'down' ][ 'y' ][ 'max' ] + 200,
|
|
calibrations[ 'down' ][ 'x' ][ 'min' ] :
|
|
calibrations[ 'down' ][ 'x' ][ 'max' ]
|
|
] ,
|
|
axis = 0,
|
|
),
|
|
}
|
|
|
|
mean_bias = sp_convolve(
|
|
np.mean(
|
|
[
|
|
bias[ 'top' ],
|
|
bias[ 'down' ],
|
|
] ,
|
|
axis = 0,
|
|
) ,
|
|
np.ones(
|
|
(
|
|
50,
|
|
),
|
|
) ,
|
|
'same',
|
|
) / 50
|
|
|
|
plt.plot( bias[ 'top' ] , label = 'top' )
|
|
plt.plot( bias[ 'down' ] , label = 'down' )
|
|
plt.plot( mean_bias , label = 'mean' )
|
|
plt.legend()
|
|
plt.show()
|
|
|
|
data[
|
|
: ,
|
|
spectrum[ 'x' ][ 'min' ] : spectrum[ 'x' ][ 'max' ]
|
|
] = data[
|
|
:,
|
|
spectrum[ 'x' ][ 'min' ] : spectrum[ 'x' ][ 'max' ]
|
|
] - mean_bias
|
|
|
|
if verbose:
|
|
print( 'bias substraction finished' )
|
|
|
|
ETA_bias = np.load( 'ETA_bias.npy' )
|
|
|
|
plt.plot( mean_bias , label = 'spectrum' )
|
|
plt.plot( ETA_bias , label = 'ETA' )
|
|
plt.legend()
|
|
plt.show()
|
|
|
|
mean_data = np.mean( data[
|
|
spectrum[ 'y' ][ 'min' ] : spectrum[ 'y' ][ 'max' ],
|
|
spectrum[ 'x' ][ 'min' ] : spectrum[ 'x' ][ 'max' ]
|
|
] , axis = 0 )
|
|
|
|
if intensity != None:
|
|
if verbose:
|
|
print( 'starting intensity calibration' )
|
|
|
|
intensity_file = pathlib.Path( intensity )
|
|
|
|
with shelve.open( str( intensity_file ) ) as storage:
|
|
ETA_stairs = storage[ 'data' ]
|
|
ETA_wavelengths = storage[ 'wavelength' ] * 1e-10
|
|
|
|
wavelengths = wavelengths[ # remove wavelengths outside range
|
|
np.where(
|
|
np.logical_and(
|
|
wavelengths > np.min( ETA_wavelengths ),
|
|
wavelengths < np.max( ETA_wavelengths ),
|
|
),
|
|
)
|
|
]
|
|
ETA_wavelengths = ETA_wavelengths[ # remove intensity_wavelengths
|
|
# outside range
|
|
np.where(
|
|
np.logical_and(
|
|
ETA_wavelengths > np.min( wavelengths ),
|
|
ETA_wavelengths < np.max( wavelengths ),
|
|
),
|
|
)
|
|
]
|
|
|
|
if len( wavelengths ) == 0:
|
|
raise Exception(
|
|
'spectrum.py: spectrum and ETA does not share any common' +
|
|
'wavelengths'
|
|
)
|
|
|
|
step = 0.2 # depends of ETA source #TODO
|
|
|
|
final_intensity = np.zeros( len( wavelengths ) )
|
|
|
|
for index in range( len( wavelengths ) ):
|
|
wavelength = wavelengths[ index ]
|
|
current_ETA_index = np.argmin(
|
|
ETA_wavelengths - wavelength
|
|
)
|
|
ETA_wavelength = ETA_wavelengths[ current_ETA_index ]
|
|
|
|
current_stair = ETA_stairs[
|
|
: ,
|
|
current_ETA_index,
|
|
0
|
|
]
|
|
|
|
abciss_intensity_curve = np.arange(
|
|
(
|
|
ETA_stairs.shape[0] - 1 # we want 11 value, no 12
|
|
) * step ,
|
|
- step / 2, # a little more than - step to remove negative
|
|
- step ,
|
|
)
|
|
function_intensity_curve = lambda x , a , b : a * np.log( # log
|
|
# because the curve is inverted (exp on the
|
|
# other side )
|
|
x
|
|
) + b
|
|
|
|
results = curve_fit(
|
|
function_intensity_curve,
|
|
current_stair ,
|
|
abciss_intensity_curve ,
|
|
)[0]
|
|
|
|
final_intensity[ index ] = np.exp(
|
|
function_intensity_curve(
|
|
mean_data[ index ],
|
|
*results ,
|
|
),
|
|
)
|
|
|
|
if verbose:
|
|
print( 'intensity calibration finished' )
|
|
|
|
if verbose:
|
|
print( 'starting output writing' )
|
|
|
|
if output == None:
|
|
print( final_intensity[1:-1] )
|
|
else:
|
|
if verbose:
|
|
print( 'storing result in ' + output )
|
|
main_hdu = fits.PrimaryHDU( final_intensity[1:-1] ) # remove -1
|
|
main_hdu.header[ 'CRVAL1' ] = wavelengths[0]
|
|
main_hdu.header[ 'CDELT1' ] = wavelengths[1] - wavelengths[0]
|
|
main_hdu.header[ 'CTYPE1' ] = 'Wavelength'
|
|
main_hdu.header[ 'CUNIT1' ] = 'Angstrom'
|
|
main_hdu.header[ 'OBJNAME' ] = head[ 'OBJECT' ]
|
|
# missing from Naroo à récupérer sur le txt
|
|
#main_hdu.header[ 'OBSERVER' ] = head[ '' ]
|
|
#main_hdu.header[ 'DATE-OBS' ] = head| '' ]
|
|
#main_hdu.header[ 'EXPTIME' ] = head[ '' ]
|
|
main_hdu.header[ 'RADECSYS' ] = 'FKS'
|
|
main_hdu.header[ 'OBS-ID' ] = head[ 'OBS_ID' ]
|
|
main_hdu.header[ 'DATE-NUM' ] = head[ 'DATE' ]
|
|
main_hdu.header[ 'COMMENT' ] = str(head[ 'COMMENT' ]).replace(
|
|
'\n' ,
|
|
'<CR>',
|
|
)
|
|
main_hdu.header[ 'POLICY' ] = head[ 'POLICY' ]
|
|
|
|
# BSS keywords
|
|
main_hdu.header[ 'BSS_VHEL' ] = 0
|
|
main_hdu.header[ 'BSS_TELL' ] = 'None'
|
|
main_hdu.header[ 'BSS_COM' ] = 'None'
|
|
# missing from Naroo
|
|
#main_hdu.header[ 'BSS_INST' ] = head[ '' ]
|
|
#main_hdu.header[ 'BSS_SITE' ] = head[ '' ]
|
|
|
|
hdul = fits.HDUList( [ main_hdu ] )
|
|
hdul.writeto( output , overwrite = True )
|
|
if verbose:
|
|
print( 'output writing finished' )
|
|
print( '===========================================\
|
|
\nend of spectrum.py' )
|