# -*- coding: utf-8 -*-
"""
badpixels
=========
Tools to fix bad sensor pixels.
"""
import numpy as np
class BadPixelFixer(object):
"""
A bad pixel fixer for a given resolution and list of bad pixels.
The replacement strategy gets precomputed into a lookup table
during initialization, therefore optimal performance can be reached,
if many frames get fixed with the same bad pixel fixer.
:param size: ``(spatialSize, spectralSize)`` in pixels
:param badPixel: ``[(spatPos, specPos)]`` positions of bad pixels starting from 0
:param strategy: replacement strategy
Current replacement stategies are:
* ``"linear_spatial"``: linear interpolation along spatial axis
* ``"linear_spectral"``: linear interpolation along spectral axis
"""
def __init__(self, size, badPixels, strategy='linear_spatial'):
self._size = size
self._nBadPixels = len(badPixels)
self._badPixels = badPixels
self._badIndex = tuple(np.array(badPixels, dtype='int').T)
getattr(self, '_calc_%s_tables'%strategy)()
def _find_valid_neighbours_along_axis(self, axis):
one = np.zeros(2, dtype="int")
one[axis] = 1
idxA = self._badIndex - one[:,np.newaxis]
idxB = self._badIndex + one[:,np.newaxis]
for idx in np.rollaxis(idxA,1):
while tuple(idx) in self._badPixels:
idx[:] -= one
for idx in np.rollaxis(idxB,1):
while tuple(idx) in self._badPixels:
idx[:] += one
loMask = (idxA[axis] < 0)[np.newaxis,:] & np.array([[True], [True]])
hiMask = (idxB[axis] >= self._size[axis])[np.newaxis,:] & np.array([[True], [True]])
idxA[loMask] = idxB[loMask]
idxB[hiMask] = idxA[hiMask]
return [idxA, idxB]
def _prepare_linear_interpolation(self, neighboursA, neighboursB):
dA = np.sum((self._badIndex - neighboursA)**2, axis=0)**.5
dB = np.sum((self._badIndex - neighboursB)**2, axis=0)**.5
d = dA + dB
self._replacementIndices = tuple(np.array([neighboursA, neighboursB]).transpose(1,0,2))
self._replacementWeights = np.array((dB/d, dA/d))
def _calc_linear_spatial_tables(self):
neighboursA, neighboursB = self._find_valid_neighbours_along_axis(0)
self._prepare_linear_interpolation(neighboursA, neighboursB)
def _calc_linear_spectral_tables(self):
neighboursA, neighboursB = self._find_valid_neighbours_along_axis(1)
self._prepare_linear_interpolation(neighboursA, neighboursB)
def applyInPlace(self, frame):
"""
Apply replacement strategy in-place to given ``frame``.
:returns: ``None``
"""
frame[self._badIndex] = np.einsum('i...,i...->...', frame[self._replacementIndices], self._replacementWeights)
def apply(self, frame):
"""
Apply replacement strategy to given ``frame``.
:returns: new frame
"""
frame = np.array(frame, copy=True)
self.applyInPlace(frame)
return frame
|