From 2e044a0b7679f1bf11f422aa87f0bda17bebe447 Mon Sep 17 00:00:00 2001 From: linarphy Date: Tue, 22 Aug 2023 03:37:40 +0200 Subject: [PATCH] Refactoring (WIP) --- classes/__init__.py | 0 classes/science/__init__.py | 0 classes/science/border.py | 39 +++++++ classes/science/plate.py | 122 ++++++++++++++++++++ classes/science/side.py | 22 ++++ classes/utils/__init__.py | 0 classes/utils/settings.py | 147 ++++++++++++++++++++++++ function/__init__.py | 0 function/utils.py | 219 ++++++++++++++++++++++++++++++++++++ main.py | 19 ++++ 10 files changed, 568 insertions(+) create mode 100644 classes/__init__.py create mode 100644 classes/science/__init__.py create mode 100644 classes/science/border.py create mode 100644 classes/science/plate.py create mode 100644 classes/science/side.py create mode 100644 classes/utils/__init__.py create mode 100644 classes/utils/settings.py create mode 100644 function/__init__.py create mode 100644 function/utils.py create mode 100755 main.py diff --git a/classes/__init__.py b/classes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/classes/science/__init__.py b/classes/science/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/classes/science/border.py b/classes/science/border.py new file mode 100644 index 0000000..be8b2ff --- /dev/null +++ b/classes/science/border.py @@ -0,0 +1,39 @@ +from classes.science.side import Side + +class Border: + """ + Define a rectangle with coordinate as edge + """ + + def __init__( self , x = Side() , y = Side() ): + if not isinstance( x , Side ) or not isinstance( y , Side ): + raise ValueError( 'x and y should be sides' ) + self.x = x + self.y = y + + def scale( self , factor ): + """ + Update coordinate to adapt from compression + """ + self.x.scale( factor ) + self.y.scale( factor ) + def slice( self ): + """ + Return rectangular slice + """ + print( self ) + return ( + slice( + self.y.min , self.y.max + ), + slice( + self.x.min , self.x.max, + ), + ) + def __str__( self ): + return '\ +border [\ +\n x: ' + str( self.x ) + ',\ +\n y: ' + str( self.y ) + ',\ +\n]\ + ' diff --git a/classes/science/plate.py b/classes/science/plate.py new file mode 100644 index 0000000..1af28ef --- /dev/null +++ b/classes/science/plate.py @@ -0,0 +1,122 @@ +from numpy import ndarray +from classes.science.border import Border +from function.utils import find_point, fill + +class Plate: + """ + Matrix of pixel + """ + + def __init__( self , image ): + if not isinstance( image , ndarray ): + raise ValueError( 'image must be a ndarray' ) + self.image = image + self.set_border() + + def set_border( self , factor = 5 ): + """ + Set current border (without area outside the plate) + """ + compressed = self.compress( factor ) + + points = self.get_points( compressed ) + self.border = Border() + self.border.x.min = 0 + self.border.x.max = compressed.shape[1] - 1 + self.border.y.min = 0 + self.border.y.max = compressed.shape[0] - 1 + + extremum = [] + + x_half = compressed.shape[1] // 2 + y_half = compressed.shape[0] // 2 + for index in range( len( points ) ): + point = points[ index ] + point[0] -= int( compressed.shape[0] == point[0] ) # keep in + point[1] -= int( compressed.shape[1] == point[1] ) # range + + taken_points = fill( + compressed, + point , + 1000 , # intensity threshold + ) + + x = [ taken_point[1] for taken_point in taken_points ] + y = [ taken_point[0] for taken_point in taken_points ] + + if max( x ) < x_half: + if self.border.x.min < max( x ): + self.border.x.min = max( x ) # biggest min + elif min( x ) > x_half: # elif to only accept one side + if self.border.x.max > min( x ): + self.border.x.max = min( x ) # smallest max + elif max( y ) < y_half: + if self.border.y.min < max( y ): + self.border.y.min = max( y ) # same + elif min( y ) > y_half: + if self.border.y.max > min( y ): + self.border.y.max = min( y ) # same + + offset = 3 + + self.border.x.min += offset + self.border.y.min += offset + self.border.x.max -= offset + self.border.y.min -= offset + + self.border.scale( factor ) + + def get_points( self , compressed ): + first_column = find_point( + compressed[ + :, + 0, + ], + 0, + ) + last_column = find_point( + compressed[ + : , + - 1, + ], + compressed.shape[1] - 1, + ) + first_line = find_point( + compressed[ + 0, + :, + ] , + 0 , + 'y', + ) + + if len( first_line ) < 2: + last_column = last_column[ 1 : ] + if len( first_line ) < 3: + first_line = [] + else: + first_line = first_line[ 1 : - 1 ] + + last_line = find_point( + compressed[ + - 1, + : , + ] , + compressed.shape[0] - 1, + 'y' , + ) + + if len( last_line ) < 2: + last_column = last_column[ : - 1 ] + if len( last_line ) < 3: + last_line = [] + else: + last_line = last_line[ 1 : - 1 ] + + return first_column + last_column + first_line + last_line + + def compress( self , factor ): + return self.image[ + : : factor, + : : factor, + ] diff --git a/classes/science/side.py b/classes/science/side.py new file mode 100644 index 0000000..c8fb3bf --- /dev/null +++ b/classes/science/side.py @@ -0,0 +1,22 @@ +class Side: + """ + Set of two index from the same axis + """ + + def __init__( self , min = 0 , max = 0 ): + if not isinstance( min , int ) or not isinstance( max , int ): + raise ValueError( 'min and max should be int' ) + + def scale( self , factor ): + """ + Update coordinate to adapt from compression + """ + self.min = int( self.min * factor ) + self.max = int( self.max * factor ) + def __str__( self ): + return '\ +side: [\ +\n min: ' + str( self.min ) + '\ +\n max: ' + str( self.max ) + '\ +\n]\ + ' diff --git a/classes/utils/__init__.py b/classes/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/classes/utils/settings.py b/classes/utils/settings.py new file mode 100644 index 0000000..65113cc --- /dev/null +++ b/classes/utils/settings.py @@ -0,0 +1,147 @@ +class Settings: + """ + Settings manager + """ + + def version( self ): + return '1.0.0' + + def __init__( self , arguments ): + # Default configuration + self.cache = None + self.filename = None + self.inte_cal = None + self.no_cache = False + self.output = 'data.fits' + self.verbose = False + self.wave_cal = None + + # configuration change + if len( arguments ) < 2: + raise Exception( + 'type \'' + __file__ + ' -h\' for more information' + ) + + index = 0 + while index < len( arguments ): + argument = arguments[ index ] + if argument[0] == '-': + if len( argument ) < 2: + raise Exception( + 'unknown argument, type \' + __file__ + \' -h' + + ' for more information' + ) + if argument[1] != '-': + if argument == '-h': + argument = '--help' + elif argument == '-V': + argument = '--version' + elif argument == '-v': + argument = '--verbose' + elif argument == '-n': + argument = '--no-cache' + elif argument == '-c': + if index == len( arguments ) - 1: + raise Exception( + 'cache have to take a value' + ) + arguments[ index + 1 ] = '--cache=' + \ + arguments[ index + 1 ] + index += 1 + continue + elif argument == '-o': + if index == len( arguments ) - 1: + raise Exception( + 'output have to take a value' + ) + arguments[ index + 1 ] = '--output=' + \ + arguments[ index + 1 ] + index += 1 + continue + elif argument == '-w': + if index == len( arguments ) - 1: + raise Exception( + 'wavelength calibration have to take a value' + ) + arguments[ index + 1 ] = '--wavelength=' + \ + arguments[ index + 1 ] + index += 1 + continue + elif argument == '-i': + if index == len( arguments ) - 1: + raise Exception( + 'intensity calibration have to take a value' + ) + arguments[ index + 1 ] = '--intensity=' + \ + arguments[ index + 1 ] + index += 1 + continue + else: + raise Exception( + 'unknown argument "' + argument + \ + '", type \'' + __file__ + \ + ' -h\' for more information' + ) + if argument[1] == '-': # not elif because argument + # can change in the last if + if argument == '--help': + print( self.help() ) + exit() + elif argument == '--version': + print( self.version() ) + exit() + elif argument == '--verbose': + self.verbose = True + elif argument == '--no-cache': + self.no_cache = True + elif ( + len( argument ) > 8 and + argument[ : 8 ] == '--cache=' + ): + self.cache = argument[ 8 : ] + elif ( + len( argument ) > 9 and + argument[ : 9 ] == '--output=' + ): + self.output = argument[ 9 : ] + elif ( + len( argument ) > 13 and + argument[ : 13 ] == '--wavelength=' + ): + self.wave_cal = argument[ 13 : ] + elif ( + len( argument ) > 12 and + argument[ : 12 ] == '--intensity=' + ): + self.inte_cal = argument[ 12 : ] + else: + raise Exception( + 'unknown argument "' + argument + \ + '", type \'' + __file__ + ' -h\' ' + \ + 'for more information' + ) + else: + self.filename = argument + index += 1 + + if self.filename == None: + raise Exception( 'filename should be given' ) + + def help( self ): + return '\ +naroo_reader [options...] filename\ +\n -w wavelength wavelength calibration file, default to None.\ +\n None means no wavelength interpolation\ +\n -i intensity intensity calibration file, default to None.\ +\n None means no intensity interpolation\ +\n -c --cache use given cache file to store new temporary\ +\n data or use old ones.\ +\n -h --help show this help and quit\ +\n -n --no-cache do not use already existing cache file.\ +\n If cache is defined, the cache file will be\ +\n overwrited.\ +\n If cache is None, it does nothing\ +\n -o --output Output file. Default to data.fits\ +\n -V --version show version number and quit\ +\n -v --verbose show more information to help debugging\ + ' diff --git a/function/__init__.py b/function/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/function/utils.py b/function/utils.py new file mode 100644 index 0000000..56b6c9f --- /dev/null +++ b/function/utils.py @@ -0,0 +1,219 @@ +import cv2 +import numpy as np + +def check_side( data , point , tolerance ): + """ + give coordinates of all side point of the given point which have an + intensity difference inferior than tolerance + """ + if not isinstance( data , np.ndarray ) and not isinstance( data , list ): + raise ValueError( 'data must be a list, ' + type( data ) + ' given' ) + if not isinstance( point , np.ndarray ) and not isinstance( point , tuple ) and not isinstance( point , list ): + raise ValueError( 'point must be a tuple, ' + type( point ) + ' given' ) + if not isinstance( tolerance , int ) and not isinstance( tolerance , float ): + raise ValueError( 'tolerance must be a number, ' + type( tolerance ) + ' given' ) + + positions , intensity = [] , data[ tuple( point ) ] + if 0 <= point[0] < data.shape[0] - 1 and intensity - tolerance <= data[ point[0] + 1 , point[1] ] <= intensity + tolerance: + positions.append( [ point[0] + 1 , point[1] ] ) + if 0 < point[0] < data.shape[0] and intensity - tolerance <= data[ point[0] - 1 , point[1] ] <= intensity + tolerance: + positions.append( [ point[0] - 1 , point[1] ] ) + if 0 <= point[1] < data.shape[1] - 1 and intensity - tolerance <= data[ point[0] , point[1] + 1 ] <= intensity + tolerance: + positions.append( [ point[0] , point[1] + 1 ] ) + if 0 < point[1] < data.shape[1] and intensity - tolerance <= data[ point[0] , point[1] - 1 ] <= intensity + tolerance: + positions.append( [ point[0] , point[1] - 1 ] ) + return positions + +def fill( data , point , tolerance , limit = 100000 ): + """ + give the coordinate of all points that fill the area with the given tolerance + """ + if not isinstance( data , np.ndarray ) and not isinstance( data , list ): + raise ValueError( 'data must be a list, ' + type( data ) + ' given' ) + if not isinstance( point , np.ndarray ) and not isinstance( point , tuple ) and not isinstance( point , list ): + raise ValueError( 'point must be a tuple, ' + type( point ) + ' given' ) + if not isinstance( tolerance , int ) and not isinstance( tolerance , float ): + raise ValueError( 'tolerance must be a number, ' + type( tolerance ) + ' given' ) + if not isinstance( limit , int ): + raise ValueError( 'limit must be an integer, ' + type( limit ) + ' given' ) + + taken_point = [] + new_points = [ point ] + i = 0 + while len( new_points ) != 0 and i < limit: + point = new_points.pop(0) + taken_point.append( point ) + for position in check_side( data , point , tolerance ): + if not position in new_points and not position in taken_point: + new_points.append( position ) + i += 1 + return np.array( taken_point ) + +def point( index_1 , index_2 , axis = 'x' ): + """ + reorder coordinate + """ + if not isinstance( index_1 , int ): + raise ValueError( 'index_1 must be an integer, ' + type( index_1 ) + ' given' ) + if not isinstance( index_2 , int ): + raise ValueError( 'index_2 must be an integer, ' + type( index_2 ) + ' given' ) + if not isinstance( axis , str ): + raise ValueError( 'axis must be a string, ' + type( axis ) + ' given' ) + if axis not in [ 'x' , 'y' ]: + raise ValueError( 'axis must be "x" or "y", ' + axis + ' given' ) + if axis == 'x': + return [ index_2 , index_1 ] + return [ index_1 , index_2 ] + +def find_point( list_ , index , axis = 'x' , threshold = 0.5 ): + """ + find the index where to fill in a side + """ + if not isinstance( list_ , list ) and not isinstance( list_ , np.ndarray ): + raise ValueError( 'list_ must be a list, ' + type( list_ ) + ' given' ) + if not isinstance( index , int ): + raise ValueError( 'index must be an integer, ' + type( index ) + ' given' ) + if not isinstance( axis , str ): + raise ValueError( 'axis must be a string, ' + type( axis ) + ' given' ) + if axis not in [ 'x' , 'y' ]: + raise ValueError( 'axis must be "x" or "y", ' + axis + ' given' ) + if not isinstance( threshold , float ): + raise ValueError( 'threshold must be a float, ' + type( threshold ) + ' given' ) + + mean = np.mean( list_ ) + ampl = np.max( list_ ) - np.min( list_ ) + + if ampl < mean / 2: + return [ point( index , 0 , axis ) ] + else: + points = [] + + list_ = np.convolve( list_ , np.ones( 100 ) , 'same' ) + list_ -= np.min( list_ ) + list_ /= np.max( list_ ) + + i , inside , size = 0 , False , 0 + while i < len( list_ ): + if list_[ i ] > threshold and not inside: + points.append( point( index , i , axis ) ) + inside = True + size = 0 + elif list_[ i ] < threshold and inside: + size += 1 + if size > 0.01 * len( list_ ): # low sensibility + inside = False + i += 1 + return points +def consecutive( list_ ): + """ + divide a sorted list of integer by consecutive part + """ + if not isinstance( list_ , list ) and not isinstance( list_ , np.ndarray ): + raise ValueError( 'list_ must be a list, ' + type( list_ ) + ' given' ) + if len( list_ ) == 0: + return list_ + index = last_consecutive( list_ ) + if index == len( list_ ) - 1: + return [ list_ ] + return [ list_[ : index + 1 ] ] + consecutive( list_[ index + 1 : ] ) # happy recursion \o/ +def last_consecutive( list_ ): + """ + return the last index of the first consecutive list + """ + if not isinstance( list_ , list ) and not isinstance( list_ , np.ndarray ): + raise ValueError( 'list_ must be a list, ' + type( list_ ) + ' given' ) + first , lower , greater = list_[0] , 0 , len( list_ ) + i = lower + ( greater - lower ) // 2 + while greater - lower != 0: + i = lower + ( greater - lower ) // 2 + if list_[ i ] - first != i: # outside of the consecutive list + greater = i + else: + if i != len( list_ ) - 1: + if list_[ i ] + 1 != list_[ i + 1 ]: # next one is not inside the consecutive list => limit retrieved + break + lower = i + else: # if inside the consecutive list and last element, every element is consecutive + break + return i +def same_value( list_ ): + """ + divide a sorted list of integer by same value part + """ + if not isinstance( list_ , list ) and not isinstance( list_ , np.ndarray ): + raise ValueError( 'list_ must be a list, ' + type( list_ ) + ' given' ) + if len( list_ ) == 0: + return list_ + counter = np.arange( 1 , len( list_ ) ) + return np.split( list_ , counter[ list_[ 1 : ] != list_[ : - 1 ] ] ) +def last_same_value( list_ ): + """ + return the last index of the first same value list + """ + if not isinstance( list_ , list ) and not isinstance( list_ , np.ndarray ): + raise ValueError( 'list_ must be a list, ' + type( list_ ) + ' given' ) + value = list_[0] + return np.argwhere( list_ == value ).max() +def rotate( image , angle ): + """ + rotate the following image by the given angle + """ + height , width = image.shape[ : 2 ] + cX , cY = ( width // 2 , height // 2 ) + matrix = cv2.getRotationMatrix2D( ( cX , cY ) , angle , 1 ) + return cv2.warpAffine( image , matrix , ( width , height ) , flags = cv2.INTER_NEAREST ) +def retrieve_peaks( data , window_size = 5 , error_coef = 1.05 , max_window_size = 30 , min_successive = 2 ): + """ + get peak position from a 1D data + """ + spectral_energy = np.log( data ** 2 ) + error_thr = error_coef / np.median( spectral_energy ) + + average_window = np.convolve( + spectral_energy , + np.ones( window_size ), + 'same' , + ) / window_size + average_energy = np.mean( average_window ) + peaks = np.where( + average_window / average_energy ** 2 > error_thr + )[0] + peaks = [ + np.mean( peak ) for peak in consecutive( peaks ) + ] + successive = 0 + + while successive < min_successive and window_size < max_window_size: + average_window = np.convolve( + spectral_energy , + np.ones( window_size ), + 'same' , + ) / window_size + average_energy = np.mean( average_window ) + new_peaks = np.where( + average_window / average_energy ** 2 > error_thr + )[0] + new_peaks = [ + np.mean( peak ) for peak in consecutive( new_peaks ) + ] + + if len( peaks ) == len( new_peaks ): + successive += 1 + else: + successive = 0 + peaks = new_peaks + window_size += 1 + return peaks +def near_value( list_ , value ): + """ + return indexes of the list whith a value nearest of the given + one when crossing it + """ + change = np.where( np.diff( np.sign( list_ - value ) ) != 0 ) # sign change + index = change + ( + value - list_[ change ] + ) / ( + list_[ change + np.ones_like( change ) ] - list_[ change ] + ) # interpolation + index = np.append( index , np.where( list_ == value ) ) + return np.round( np.sort( index ) ).astype( int ) # triage diff --git a/main.py b/main.py new file mode 100755 index 0000000..a070f63 --- /dev/null +++ b/main.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 + +from classes.science.plate import Plate +from classes.utils.settings import Settings +from astropy.io.fits import open +from sys import argv as arguments + +settings = Settings( arguments ) + +hdul = open( settings.filename ) +plate = Plate( hdul[0].data ) +head = hdul[0].header + +hdul.close() + +import matplotlib.pyplot as plt + +plt.imshow( plate.image[ plate.border.slice() ] ) +plt.show()