Update plate compression and add isolation
This commit is contained in:
parent
9c4ec55617
commit
2a072790ea
4 changed files with 243 additions and 138 deletions
10
classes/science/calibration_spectrum.py
Normal file
10
classes/science/calibration_spectrum.py
Normal file
|
@ -0,0 +1,10 @@
|
|||
from classes.science.border import Border
|
||||
|
||||
class CalibrationSpectrum:
|
||||
"""
|
||||
Define spectrum border
|
||||
"""
|
||||
|
||||
def __init__( self , up = Border() , down = Border ):
|
||||
self.up = up
|
||||
self.down = down
|
|
@ -1,9 +1,10 @@
|
|||
from numpy import ndarray, gradient, ones, argmax, arange, arctan, tan
|
||||
from numpy import ndarray, ones, argmax, arange, arctan, tan, pi, mean, max, min
|
||||
from scipy.optimize import curve_fit
|
||||
from scipy.signal import convolve
|
||||
from cv2 import getRotationMatrix2D, warpAffine, INTER_NEAREST
|
||||
from scipy.signal import convolve, find_peaks
|
||||
from scipy.ndimage import rotate
|
||||
from classes.science.border import Border
|
||||
from function.utils import find_point, fill
|
||||
from classes.science.calibration_spectrum import CalibrationSpectrum
|
||||
from function.utils import find_point, fill, cut_biggest
|
||||
from function.fit import linear
|
||||
|
||||
from logging import getLogger
|
||||
|
@ -20,7 +21,130 @@ class Plate:
|
|||
if len( data.shape ) != 2:
|
||||
raise ValueError( _( 'data must be a 2d matrix' ) )
|
||||
self.data = data
|
||||
self.set_border()
|
||||
self.set_border()\
|
||||
.rotate()\
|
||||
.set_spectrum()\
|
||||
.set_calibration_spectrum()
|
||||
|
||||
def compress( self ):
|
||||
"""
|
||||
Compress the plate data to fit the biggest dimension in a 2000
|
||||
pixels axis and the smallest in a 200 pixels axis at minimum.
|
||||
Return the compressed data and the compression factor used.
|
||||
"""
|
||||
min_factor = max( self.data.shape ) // 2000 # min factor to have a side
|
||||
# with a maximum of 1000 pixels
|
||||
max_factor = min( self.data.shape ) // 200 # max factor to have
|
||||
# a side with a minimum of 100 pixel
|
||||
if min_factor < max_factor:
|
||||
factor = int( mean( ( max_factor , min_factor ) ) )
|
||||
else: # the smallest side will be less than 100 pixels with the
|
||||
# minimum compression factor
|
||||
logger = getLogger( 'naroo reader' )
|
||||
logger.warning(
|
||||
_( (
|
||||
'slow compression: ratio between height and width'
|
||||
' is greater than 10 ({ratio:.2f})'
|
||||
) ).format(
|
||||
ratio = max( self.size() ) / min( self.size() )
|
||||
)
|
||||
)
|
||||
factor = max_factor
|
||||
return self.data[
|
||||
: : factor,
|
||||
: : factor,
|
||||
] , factor
|
||||
|
||||
def get_points( self , compressed ):
|
||||
first_column = find_point(
|
||||
compressed[
|
||||
:,
|
||||
0,
|
||||
],
|
||||
0,
|
||||
)
|
||||
last_column = find_point(
|
||||
compressed[
|
||||
: ,
|
||||
- 1,
|
||||
],
|
||||
compressed.shape[1] - 1,
|
||||
)
|
||||
first_line = find_point(
|
||||
compressed[
|
||||
0,
|
||||
:,
|
||||
] ,
|
||||
0 ,
|
||||
'y',
|
||||
)
|
||||
|
||||
if len( first_line ) < 2:
|
||||
last_column = last_column[ 1 : ]
|
||||
if len( first_line ) < 3:
|
||||
first_line = []
|
||||
else:
|
||||
first_line = first_line[ 1 : - 1 ]
|
||||
|
||||
last_line = find_point(
|
||||
compressed[
|
||||
- 1,
|
||||
: ,
|
||||
] ,
|
||||
compressed.shape[0] - 1,
|
||||
'y' ,
|
||||
)
|
||||
|
||||
if len( last_line ) < 2:
|
||||
last_column = last_column[ : - 1 ]
|
||||
if len( last_line ) < 3:
|
||||
last_line = []
|
||||
else:
|
||||
last_line = last_line[ 1 : - 1 ]
|
||||
|
||||
return first_column + last_column + first_line + last_line
|
||||
|
||||
def rotate( self ):
|
||||
"""
|
||||
Auto-rotate to be vertically and horizontally aligned
|
||||
"""
|
||||
indexes_max = argmax(
|
||||
convolve(
|
||||
self.data[
|
||||
1 * self.border.y.size() // 4:
|
||||
3 * self.border.y.size() // 4,
|
||||
1 * self.border.x.size() // 4:
|
||||
3 * self.border.x.size() // 4
|
||||
] ,
|
||||
ones( ( 500 , 1 ) ),
|
||||
'valid' ,
|
||||
) ,
|
||||
axis = 0,
|
||||
)
|
||||
abciss = arange(
|
||||
1 * self.border.x.size() // 4,
|
||||
3 * self.border.x.size() // 4
|
||||
)
|
||||
fit_result = curve_fit(
|
||||
linear ,
|
||||
abciss ,
|
||||
indexes_max,
|
||||
)[0]
|
||||
|
||||
angle = arctan( fit_result[0] ) * pi / 180 # rad
|
||||
diff = int( # adjust height border
|
||||
tan( angle ) * ( self.border.x.size() )
|
||||
)
|
||||
|
||||
self.data = rotate(
|
||||
self.data,
|
||||
angle ,
|
||||
)
|
||||
|
||||
self.border.y.min -= diff
|
||||
self.border.y.max -= diff
|
||||
|
||||
return self
|
||||
|
||||
def set_border( self ):
|
||||
"""
|
||||
|
@ -96,139 +220,87 @@ class Plate:
|
|||
|
||||
self.border.scale( factor )
|
||||
|
||||
def get_points( self , compressed ):
|
||||
first_column = find_point(
|
||||
compressed[
|
||||
:,
|
||||
0,
|
||||
],
|
||||
0,
|
||||
)
|
||||
last_column = find_point(
|
||||
compressed[
|
||||
: ,
|
||||
- 1,
|
||||
],
|
||||
compressed.shape[1] - 1,
|
||||
)
|
||||
first_line = find_point(
|
||||
compressed[
|
||||
0,
|
||||
:,
|
||||
] ,
|
||||
0 ,
|
||||
'y',
|
||||
)
|
||||
return self
|
||||
|
||||
if len( first_line ) < 2:
|
||||
last_column = last_column[ 1 : ]
|
||||
if len( first_line ) < 3:
|
||||
first_line = []
|
||||
else:
|
||||
first_line = first_line[ 1 : - 1 ]
|
||||
|
||||
last_line = find_point(
|
||||
compressed[
|
||||
- 1,
|
||||
: ,
|
||||
] ,
|
||||
compressed.shape[0] - 1,
|
||||
'y' ,
|
||||
)
|
||||
|
||||
if len( last_line ) < 2:
|
||||
last_column = last_column[ : - 1 ]
|
||||
if len( last_line ) < 3:
|
||||
last_line = []
|
||||
else:
|
||||
last_line = last_line[ 1 : - 1 ]
|
||||
|
||||
return first_column + last_column + first_line + last_line
|
||||
|
||||
def compress( self ):
|
||||
def set_calibration_spectrum( self ):
|
||||
"""
|
||||
Compress the plate data to fit the biggest dimension in a 5000
|
||||
pixels axis and the smallest in a 500 pixels axis at minimum.
|
||||
Return the compressed data and the compression factor used.
|
||||
Set calibration sprectrum area
|
||||
"""
|
||||
min_factor = max( self.data.shape ) // 5000 # min factor to have a side
|
||||
# with a maximum of 1000 pixels
|
||||
max_factor = min( self.data.shape ) // 500 # max factor to have
|
||||
# a side with a minimum of 100 pixel
|
||||
if min_factor < max_factor:
|
||||
factor = int( mean( ( max_factor , min_factor ) ) )
|
||||
else: # the smallest side will be less than 100 pixels with the
|
||||
# minimum compression factor
|
||||
logger = getLogger( 'naroo reader' )
|
||||
logger.warning(
|
||||
_( (
|
||||
'slow compression: ratio between height and width'
|
||||
' is greater than 10 ({ratio:.2f})'
|
||||
) ).format(
|
||||
ratio = max( self.size() ) / min( self.size() )
|
||||
)
|
||||
)
|
||||
factor = max_factor
|
||||
return self.data[
|
||||
: : factor,
|
||||
: : factor,
|
||||
] , factor
|
||||
self.calibration_spectrum = CalibrationSpectrum()
|
||||
|
||||
def middle( self ):
|
||||
"""
|
||||
Get coordinate of center
|
||||
"""
|
||||
return (
|
||||
self.size()[0] // 2,
|
||||
self.size()[1] // 2,
|
||||
)
|
||||
def indicator( list_ , matrix ):
|
||||
"""
|
||||
Define an indicator which define if the horizontal slice has
|
||||
a chance to be a part of a calibration
|
||||
"""
|
||||
avg = mean( matrix )
|
||||
if mean( list_ ) > 0.75 * avg:
|
||||
return 0
|
||||
if mean( list_ ) < 0.25 * avg:
|
||||
return 1
|
||||
positions = where( list_ > mean( list_ ) )[0]
|
||||
if len( positions ) < 100:
|
||||
return 2
|
||||
if len( positions ) > 400:
|
||||
return 3
|
||||
distance = mean( positions[ 1 : ] - positions[ : - 1 ] )
|
||||
if distance < 10:
|
||||
return 4
|
||||
return 10
|
||||
|
||||
def rotate( self ):
|
||||
"""
|
||||
Auto-rotate to be vertically and horizontally aligned
|
||||
"""
|
||||
indexes_max = argmax(
|
||||
convolve(
|
||||
list_ = [
|
||||
indicator(
|
||||
self.data[
|
||||
1 * self.border.y.size() // 4:
|
||||
3 * self.border.y.size() // 4,
|
||||
1 * self.border.x.size() // 4:
|
||||
3 * self.border.x.size() // 4
|
||||
] ,
|
||||
ones( ( 500 , 1 ) ),
|
||||
'valid' ,
|
||||
) ,
|
||||
axis = 0,
|
||||
)
|
||||
abciss = arange(
|
||||
1 * self.border.x.size() // 4,
|
||||
3 * self.border.x.size() // 4
|
||||
)
|
||||
fit_result = curve_fit(
|
||||
linear ,
|
||||
abciss ,
|
||||
indexes_max,
|
||||
)[0]
|
||||
i ,
|
||||
self.border.slice()[1],
|
||||
] ,
|
||||
self.data[ self.border.slice() ],
|
||||
) for i in range(
|
||||
self.border.y.min,
|
||||
self.border.y.max,
|
||||
)
|
||||
]
|
||||
import matplotlib.pyplot as plt
|
||||
plt.plot( list_ )
|
||||
plt.show()
|
||||
|
||||
angle = arctan( fit_result[0] ) # rad
|
||||
diff = int( # adjust height border
|
||||
tan( angle ) * ( self.border.x.size() )
|
||||
return self
|
||||
|
||||
def set_spectrum( self ):
|
||||
"""
|
||||
Set spectrum area
|
||||
"""
|
||||
self.spectrum = Border()
|
||||
|
||||
indexes = cut_biggest(
|
||||
convolve(
|
||||
mean(
|
||||
self.data[ self.border.slice() ],
|
||||
axis = 1 ,
|
||||
) ,
|
||||
ones( 200 ),
|
||||
'valid' ,
|
||||
),
|
||||
)
|
||||
|
||||
rotation_matrix = getRotationMatrix2D(
|
||||
self.middle(),
|
||||
angle ,
|
||||
1 ,
|
||||
)
|
||||
self.data = warpAffine(
|
||||
self.data ,
|
||||
rotation_matrix,
|
||||
self.size() ,
|
||||
flags = INTER_NEAREST,
|
||||
self.spectrum.y.min = indexes[0] + self.border.y.min + 100
|
||||
self.spectrum.y.max = indexes[1] + self.border.y.min + 100
|
||||
|
||||
indexes = cut_biggest(
|
||||
convolve(
|
||||
mean(
|
||||
self.data[ self.border.slice() ],
|
||||
axis = 0 ,
|
||||
) ,
|
||||
ones( 200 ),
|
||||
'valid' ,
|
||||
),
|
||||
)
|
||||
|
||||
self.border.y.min -= diff
|
||||
self.border.y.max -= diff
|
||||
self.spectrum.x.min = indexes[0] + self.border.x.min + 100
|
||||
self.spectrum.x.max = indexes[1] + self.border.x.min + 100
|
||||
|
||||
return self
|
||||
|
||||
def size( self ):
|
||||
"""
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import cv2
|
||||
import numpy as np
|
||||
|
||||
def check_side( data , point , tolerance ):
|
||||
|
@ -153,14 +152,6 @@ def last_same_value( list_ ):
|
|||
raise ValueError( 'list_ must be a list, ' + type( list_ ) + ' given' )
|
||||
value = list_[0]
|
||||
return np.argwhere( list_ == value ).max()
|
||||
def rotate( image , angle ):
|
||||
"""
|
||||
rotate the following image by the given angle
|
||||
"""
|
||||
height , width = image.shape[ : 2 ]
|
||||
cX , cY = ( width // 2 , height // 2 )
|
||||
matrix = cv2.getRotationMatrix2D( ( cX , cY ) , angle , 1 )
|
||||
return cv2.warpAffine( image , matrix , ( width , height ) , flags = cv2.INTER_NEAREST )
|
||||
def retrieve_peaks( data , window_size = 5 , error_coef = 1.05 , max_window_size = 30 , min_successive = 2 ):
|
||||
"""
|
||||
get peak position from a 1D data
|
||||
|
@ -216,3 +207,37 @@ def near_value( list_ , value ):
|
|||
) # interpolation
|
||||
index = np.append( index , np.where( list_ == value ) )
|
||||
return np.round( np.sort( index ) ).astype( int ) # triage
|
||||
|
||||
def cut_biggest( list_ ):
|
||||
"""
|
||||
Return index of start and end of the biggest peak in a list
|
||||
"""
|
||||
factor = 1
|
||||
indexes = near_value(
|
||||
list_ ,
|
||||
np.max( list_ ),
|
||||
)
|
||||
if len( indexes ) > 2:
|
||||
import matplotlib.pyplot as plt
|
||||
plt.plot( list_ )
|
||||
plt.show()
|
||||
raise Exception( 'too much peak' )
|
||||
while len( indexes ) < 2:
|
||||
factor += 1
|
||||
indexes = near_value(
|
||||
list_ ,
|
||||
np.min( list_ ) + (
|
||||
np.max( list_ ) - np.mean( list_ )
|
||||
) / factor,
|
||||
)
|
||||
factor -= 1
|
||||
indexes = near_value(
|
||||
list_ ,
|
||||
np.min( list_ ) + (
|
||||
np.max( list_ ) - np.mean( list_ )
|
||||
) / factor,
|
||||
)
|
||||
if len( indexes ) == 2:
|
||||
raise Exception( 'less than two pixel peak' )
|
||||
|
||||
return indexes
|
||||
|
|
2
main.py
2
main.py
|
@ -9,6 +9,4 @@ settings = Settings( arguments[ 1 : ] ) # remove the "main.py" part
|
|||
|
||||
hdul = open( settings.input )
|
||||
plate = Plate( hdul[0].data )
|
||||
plate.rotate()
|
||||
|
||||
hdul.close()
|
||||
|
|
Loading…
Reference in a new issue