Refactoring (WIP)

This commit is contained in:
linarphy 2023-08-22 03:37:40 +02:00
parent 0d9062ffe5
commit 2e044a0b76
No known key found for this signature in database
GPG key ID: B914FF3FE69AA363
10 changed files with 568 additions and 0 deletions

0
classes/__init__.py Normal file
View file

View file

39
classes/science/border.py Normal file
View file

@ -0,0 +1,39 @@
from classes.science.side import Side
class Border:
"""
Define a rectangle with coordinate as edge
"""
def __init__( self , x = Side() , y = Side() ):
if not isinstance( x , Side ) or not isinstance( y , Side ):
raise ValueError( 'x and y should be sides' )
self.x = x
self.y = y
def scale( self , factor ):
"""
Update coordinate to adapt from compression
"""
self.x.scale( factor )
self.y.scale( factor )
def slice( self ):
"""
Return rectangular slice
"""
print( self )
return (
slice(
self.y.min , self.y.max
),
slice(
self.x.min , self.x.max,
),
)
def __str__( self ):
return '\
border [\
\n x: ' + str( self.x ) + ',\
\n y: ' + str( self.y ) + ',\
\n]\
'

122
classes/science/plate.py Normal file
View file

@ -0,0 +1,122 @@
from numpy import ndarray
from classes.science.border import Border
from function.utils import find_point, fill
class Plate:
"""
Matrix of pixel
"""
def __init__( self , image ):
if not isinstance( image , ndarray ):
raise ValueError( 'image must be a ndarray' )
self.image = image
self.set_border()
def set_border( self , factor = 5 ):
"""
Set current border (without area outside the plate)
"""
compressed = self.compress( factor )
points = self.get_points( compressed )
self.border = Border()
self.border.x.min = 0
self.border.x.max = compressed.shape[1] - 1
self.border.y.min = 0
self.border.y.max = compressed.shape[0] - 1
extremum = []
x_half = compressed.shape[1] // 2
y_half = compressed.shape[0] // 2
for index in range( len( points ) ):
point = points[ index ]
point[0] -= int( compressed.shape[0] == point[0] ) # keep in
point[1] -= int( compressed.shape[1] == point[1] ) # range
taken_points = fill(
compressed,
point ,
1000 , # intensity threshold
)
x = [ taken_point[1] for taken_point in taken_points ]
y = [ taken_point[0] for taken_point in taken_points ]
if max( x ) < x_half:
if self.border.x.min < max( x ):
self.border.x.min = max( x ) # biggest min
elif min( x ) > x_half: # elif to only accept one side
if self.border.x.max > min( x ):
self.border.x.max = min( x ) # smallest max
elif max( y ) < y_half:
if self.border.y.min < max( y ):
self.border.y.min = max( y ) # same
elif min( y ) > y_half:
if self.border.y.max > min( y ):
self.border.y.max = min( y ) # same
offset = 3
self.border.x.min += offset
self.border.y.min += offset
self.border.x.max -= offset
self.border.y.min -= offset
self.border.scale( factor )
def get_points( self , compressed ):
first_column = find_point(
compressed[
:,
0,
],
0,
)
last_column = find_point(
compressed[
: ,
- 1,
],
compressed.shape[1] - 1,
)
first_line = find_point(
compressed[
0,
:,
] ,
0 ,
'y',
)
if len( first_line ) < 2:
last_column = last_column[ 1 : ]
if len( first_line ) < 3:
first_line = []
else:
first_line = first_line[ 1 : - 1 ]
last_line = find_point(
compressed[
- 1,
: ,
] ,
compressed.shape[0] - 1,
'y' ,
)
if len( last_line ) < 2:
last_column = last_column[ : - 1 ]
if len( last_line ) < 3:
last_line = []
else:
last_line = last_line[ 1 : - 1 ]
return first_column + last_column + first_line + last_line
def compress( self , factor ):
return self.image[
: : factor,
: : factor,
]

22
classes/science/side.py Normal file
View file

@ -0,0 +1,22 @@
class Side:
"""
Set of two index from the same axis
"""
def __init__( self , min = 0 , max = 0 ):
if not isinstance( min , int ) or not isinstance( max , int ):
raise ValueError( 'min and max should be int' )
def scale( self , factor ):
"""
Update coordinate to adapt from compression
"""
self.min = int( self.min * factor )
self.max = int( self.max * factor )
def __str__( self ):
return '\
side: [\
\n min: ' + str( self.min ) + '\
\n max: ' + str( self.max ) + '\
\n]\
'

View file

147
classes/utils/settings.py Normal file
View file

@ -0,0 +1,147 @@
class Settings:
"""
Settings manager
"""
def version( self ):
return '1.0.0'
def __init__( self , arguments ):
# Default configuration
self.cache = None
self.filename = None
self.inte_cal = None
self.no_cache = False
self.output = 'data.fits'
self.verbose = False
self.wave_cal = None
# configuration change
if len( arguments ) < 2:
raise Exception(
'type \'' + __file__ + ' -h\' for more information'
)
index = 0
while index < len( arguments ):
argument = arguments[ index ]
if argument[0] == '-':
if len( argument ) < 2:
raise Exception(
'unknown argument, type \' + __file__ + \' -h' +
' for more information'
)
if argument[1] != '-':
if argument == '-h':
argument = '--help'
elif argument == '-V':
argument = '--version'
elif argument == '-v':
argument = '--verbose'
elif argument == '-n':
argument = '--no-cache'
elif argument == '-c':
if index == len( arguments ) - 1:
raise Exception(
'cache have to take a value'
)
arguments[ index + 1 ] = '--cache=' + \
arguments[ index + 1 ]
index += 1
continue
elif argument == '-o':
if index == len( arguments ) - 1:
raise Exception(
'output have to take a value'
)
arguments[ index + 1 ] = '--output=' + \
arguments[ index + 1 ]
index += 1
continue
elif argument == '-w':
if index == len( arguments ) - 1:
raise Exception(
'wavelength calibration have to take a value'
)
arguments[ index + 1 ] = '--wavelength=' + \
arguments[ index + 1 ]
index += 1
continue
elif argument == '-i':
if index == len( arguments ) - 1:
raise Exception(
'intensity calibration have to take a value'
)
arguments[ index + 1 ] = '--intensity=' + \
arguments[ index + 1 ]
index += 1
continue
else:
raise Exception(
'unknown argument "' + argument + \
'", type \'' + __file__ + \
' -h\' for more information'
)
if argument[1] == '-': # not elif because argument
# can change in the last if
if argument == '--help':
print( self.help() )
exit()
elif argument == '--version':
print( self.version() )
exit()
elif argument == '--verbose':
self.verbose = True
elif argument == '--no-cache':
self.no_cache = True
elif (
len( argument ) > 8 and
argument[ : 8 ] == '--cache='
):
self.cache = argument[ 8 : ]
elif (
len( argument ) > 9 and
argument[ : 9 ] == '--output='
):
self.output = argument[ 9 : ]
elif (
len( argument ) > 13 and
argument[ : 13 ] == '--wavelength='
):
self.wave_cal = argument[ 13 : ]
elif (
len( argument ) > 12 and
argument[ : 12 ] == '--intensity='
):
self.inte_cal = argument[ 12 : ]
else:
raise Exception(
'unknown argument "' + argument + \
'", type \'' + __file__ + ' -h\' ' + \
'for more information'
)
else:
self.filename = argument
index += 1
if self.filename == None:
raise Exception( 'filename should be given' )
def help( self ):
return '\
naroo_reader [options...] filename\
\n -w wavelength wavelength calibration file, default to None.\
\n None means no wavelength interpolation\
\n -i intensity intensity calibration file, default to None.\
\n None means no intensity interpolation\
\n -c --cache use given cache file to store new temporary\
\n data or use old ones.\
\n -h --help show this help and quit\
\n -n --no-cache do not use already existing cache file.\
\n If cache is defined, the cache file will be\
\n overwrited.\
\n If cache is None, it does nothing\
\n -o --output Output file. Default to data.fits\
\n -V --version show version number and quit\
\n -v --verbose show more information to help debugging\
'

0
function/__init__.py Normal file
View file

219
function/utils.py Normal file
View file

@ -0,0 +1,219 @@
import cv2
import numpy as np
def check_side( data , point , tolerance ):
"""
give coordinates of all side point of the given point which have an
intensity difference inferior than tolerance
"""
if not isinstance( data , np.ndarray ) and not isinstance( data , list ):
raise ValueError( 'data must be a list, ' + type( data ) + ' given' )
if not isinstance( point , np.ndarray ) and not isinstance( point , tuple ) and not isinstance( point , list ):
raise ValueError( 'point must be a tuple, ' + type( point ) + ' given' )
if not isinstance( tolerance , int ) and not isinstance( tolerance , float ):
raise ValueError( 'tolerance must be a number, ' + type( tolerance ) + ' given' )
positions , intensity = [] , data[ tuple( point ) ]
if 0 <= point[0] < data.shape[0] - 1 and intensity - tolerance <= data[ point[0] + 1 , point[1] ] <= intensity + tolerance:
positions.append( [ point[0] + 1 , point[1] ] )
if 0 < point[0] < data.shape[0] and intensity - tolerance <= data[ point[0] - 1 , point[1] ] <= intensity + tolerance:
positions.append( [ point[0] - 1 , point[1] ] )
if 0 <= point[1] < data.shape[1] - 1 and intensity - tolerance <= data[ point[0] , point[1] + 1 ] <= intensity + tolerance:
positions.append( [ point[0] , point[1] + 1 ] )
if 0 < point[1] < data.shape[1] and intensity - tolerance <= data[ point[0] , point[1] - 1 ] <= intensity + tolerance:
positions.append( [ point[0] , point[1] - 1 ] )
return positions
def fill( data , point , tolerance , limit = 100000 ):
"""
give the coordinate of all points that fill the area with the given tolerance
"""
if not isinstance( data , np.ndarray ) and not isinstance( data , list ):
raise ValueError( 'data must be a list, ' + type( data ) + ' given' )
if not isinstance( point , np.ndarray ) and not isinstance( point , tuple ) and not isinstance( point , list ):
raise ValueError( 'point must be a tuple, ' + type( point ) + ' given' )
if not isinstance( tolerance , int ) and not isinstance( tolerance , float ):
raise ValueError( 'tolerance must be a number, ' + type( tolerance ) + ' given' )
if not isinstance( limit , int ):
raise ValueError( 'limit must be an integer, ' + type( limit ) + ' given' )
taken_point = []
new_points = [ point ]
i = 0
while len( new_points ) != 0 and i < limit:
point = new_points.pop(0)
taken_point.append( point )
for position in check_side( data , point , tolerance ):
if not position in new_points and not position in taken_point:
new_points.append( position )
i += 1
return np.array( taken_point )
def point( index_1 , index_2 , axis = 'x' ):
"""
reorder coordinate
"""
if not isinstance( index_1 , int ):
raise ValueError( 'index_1 must be an integer, ' + type( index_1 ) + ' given' )
if not isinstance( index_2 , int ):
raise ValueError( 'index_2 must be an integer, ' + type( index_2 ) + ' given' )
if not isinstance( axis , str ):
raise ValueError( 'axis must be a string, ' + type( axis ) + ' given' )
if axis not in [ 'x' , 'y' ]:
raise ValueError( 'axis must be "x" or "y", ' + axis + ' given' )
if axis == 'x':
return [ index_2 , index_1 ]
return [ index_1 , index_2 ]
def find_point( list_ , index , axis = 'x' , threshold = 0.5 ):
"""
find the index where to fill in a side
"""
if not isinstance( list_ , list ) and not isinstance( list_ , np.ndarray ):
raise ValueError( 'list_ must be a list, ' + type( list_ ) + ' given' )
if not isinstance( index , int ):
raise ValueError( 'index must be an integer, ' + type( index ) + ' given' )
if not isinstance( axis , str ):
raise ValueError( 'axis must be a string, ' + type( axis ) + ' given' )
if axis not in [ 'x' , 'y' ]:
raise ValueError( 'axis must be "x" or "y", ' + axis + ' given' )
if not isinstance( threshold , float ):
raise ValueError( 'threshold must be a float, ' + type( threshold ) + ' given' )
mean = np.mean( list_ )
ampl = np.max( list_ ) - np.min( list_ )
if ampl < mean / 2:
return [ point( index , 0 , axis ) ]
else:
points = []
list_ = np.convolve( list_ , np.ones( 100 ) , 'same' )
list_ -= np.min( list_ )
list_ /= np.max( list_ )
i , inside , size = 0 , False , 0
while i < len( list_ ):
if list_[ i ] > threshold and not inside:
points.append( point( index , i , axis ) )
inside = True
size = 0
elif list_[ i ] < threshold and inside:
size += 1
if size > 0.01 * len( list_ ): # low sensibility
inside = False
i += 1
return points
def consecutive( list_ ):
"""
divide a sorted list of integer by consecutive part
"""
if not isinstance( list_ , list ) and not isinstance( list_ , np.ndarray ):
raise ValueError( 'list_ must be a list, ' + type( list_ ) + ' given' )
if len( list_ ) == 0:
return list_
index = last_consecutive( list_ )
if index == len( list_ ) - 1:
return [ list_ ]
return [ list_[ : index + 1 ] ] + consecutive( list_[ index + 1 : ] ) # happy recursion \o/
def last_consecutive( list_ ):
"""
return the last index of the first consecutive list
"""
if not isinstance( list_ , list ) and not isinstance( list_ , np.ndarray ):
raise ValueError( 'list_ must be a list, ' + type( list_ ) + ' given' )
first , lower , greater = list_[0] , 0 , len( list_ )
i = lower + ( greater - lower ) // 2
while greater - lower != 0:
i = lower + ( greater - lower ) // 2
if list_[ i ] - first != i: # outside of the consecutive list
greater = i
else:
if i != len( list_ ) - 1:
if list_[ i ] + 1 != list_[ i + 1 ]: # next one is not inside the consecutive list => limit retrieved
break
lower = i
else: # if inside the consecutive list and last element, every element is consecutive
break
return i
def same_value( list_ ):
"""
divide a sorted list of integer by same value part
"""
if not isinstance( list_ , list ) and not isinstance( list_ , np.ndarray ):
raise ValueError( 'list_ must be a list, ' + type( list_ ) + ' given' )
if len( list_ ) == 0:
return list_
counter = np.arange( 1 , len( list_ ) )
return np.split( list_ , counter[ list_[ 1 : ] != list_[ : - 1 ] ] )
def last_same_value( list_ ):
"""
return the last index of the first same value list
"""
if not isinstance( list_ , list ) and not isinstance( list_ , np.ndarray ):
raise ValueError( 'list_ must be a list, ' + type( list_ ) + ' given' )
value = list_[0]
return np.argwhere( list_ == value ).max()
def rotate( image , angle ):
"""
rotate the following image by the given angle
"""
height , width = image.shape[ : 2 ]
cX , cY = ( width // 2 , height // 2 )
matrix = cv2.getRotationMatrix2D( ( cX , cY ) , angle , 1 )
return cv2.warpAffine( image , matrix , ( width , height ) , flags = cv2.INTER_NEAREST )
def retrieve_peaks( data , window_size = 5 , error_coef = 1.05 , max_window_size = 30 , min_successive = 2 ):
"""
get peak position from a 1D data
"""
spectral_energy = np.log( data ** 2 )
error_thr = error_coef / np.median( spectral_energy )
average_window = np.convolve(
spectral_energy ,
np.ones( window_size ),
'same' ,
) / window_size
average_energy = np.mean( average_window )
peaks = np.where(
average_window / average_energy ** 2 > error_thr
)[0]
peaks = [
np.mean( peak ) for peak in consecutive( peaks )
]
successive = 0
while successive < min_successive and window_size < max_window_size:
average_window = np.convolve(
spectral_energy ,
np.ones( window_size ),
'same' ,
) / window_size
average_energy = np.mean( average_window )
new_peaks = np.where(
average_window / average_energy ** 2 > error_thr
)[0]
new_peaks = [
np.mean( peak ) for peak in consecutive( new_peaks )
]
if len( peaks ) == len( new_peaks ):
successive += 1
else:
successive = 0
peaks = new_peaks
window_size += 1
return peaks
def near_value( list_ , value ):
"""
return indexes of the list whith a value nearest of the given
one when crossing it
"""
change = np.where( np.diff( np.sign( list_ - value ) ) != 0 ) # sign change
index = change + (
value - list_[ change ]
) / (
list_[ change + np.ones_like( change ) ] - list_[ change ]
) # interpolation
index = np.append( index , np.where( list_ == value ) )
return np.round( np.sort( index ) ).astype( int ) # triage

19
main.py Executable file
View file

@ -0,0 +1,19 @@
#!/usr/bin/env python3
from classes.science.plate import Plate
from classes.utils.settings import Settings
from astropy.io.fits import open
from sys import argv as arguments
settings = Settings( arguments )
hdul = open( settings.filename )
plate = Plate( hdul[0].data )
head = hdul[0].header
hdul.close()
import matplotlib.pyplot as plt
plt.imshow( plate.image[ plate.border.slice() ] )
plt.show()