Source code for runmacs.spec.calibration.frameclassification

# -*- coding: utf-8 -*-
"""
frameclassification
===================
This module provides heuristical functions to find exact frame positions of changes of the cameras settings.

You will most likely need one of:

* :py:func:`getChangesFromDataTimeAndAL`
* :py:func:`getChangesFromEnviFile`
* :py:func:`getPartsFromDataTimeAndAL`

function documentation
----------------------
"""

import math
import bisect
import itertools
from copy import deepcopy
import numpy as np
from scipy.optimize import curve_fit
import runmacs.spec.calibration.tools as ct


checkWidth = (20,30)
checkOfs = (0, -5, -10, 10)
checkSteep = (1., 0.5, 0.25)
joinWidth = 3

debug = False

def logistic(x, x0, y0, a=1., b=-1):
    return a/(1+np.exp(-b*(x-x0))) + y0

def bToCuts(x0, b):
    w = abs(5./b) + 1.
    w = min(100.,w)
    x0Lo = int(math.floor(x0-w))
    x0Hi = int(math.ceil(x0+w))
    return x0Lo, x0Hi

def consistencyCheck(partA,partB,popt,fitSlice,data):
    #print popt
    x0, y0, a, b = popt
    x0Lo, x0Hi = bToCuts(x0, b)
    if x0 > fitSlice.stop - 5:
        return 'too far right!'
    if x0 < fitSlice.start + 5:
        return 'too far left!'
    if abs(b) < 1:
        return 'not steep enough!'
    if a < 20:
        return 'not high enough!'
    if partA['shutter'] == 'close' and partB['shutter'] == 'open' and partA['exposureTime'] <= partB['exposureTime']:
        if b < 0:
            return 'no raise!'
    if partA['shutter'] == 'open' and partB['shutter'] == 'close' and partA['exposureTime'] >= partB['exposureTime']:
        if b > 0:
            return 'no fall!'
    if partA['shutter'] == partB['shutter']:
        if partA['exposureTime'] > partB['exposureTime']:
            if b > 0:
                return 'no fall!'
        if partA['exposureTime'] < partB['exposureTime']:
            if b < 0:
                return 'no raise!'
    if partA['shutter'] == 'close':
        if data[fitSlice.start:x0Lo].std() > 5:
            return 'too noisy for dark'
    if partB['shutter'] == 'close':
        if data[x0Hi:fitSlice.stop].std() > 5:
            return 'too noisy for dark'
    return True

[docs]def getActionTimes(al): """ Takes an actionLog and returns all contained unique times in sorted order. """ tShutter = {t for v,t in al['shutter'] if t is not None} tFps = {t for v,t in al['fps'] if t is not None} tExposureTime = {t for v,t in al['exposureTime'] if t is not None} return sorted(tShutter | tFps | tExposureTime)
[docs]def getActionValue(tcut, al): """ Takes a cut time and an actionLog of a single value (e.g. ..['fps']). :returns: Value at the time of ``tcut`` """ v,t = zip(*al) if t[0] is not None and tcut < t[0]: return None if t[0] is None: t = (tcut, ) + t[1:] if t[-1] is None: #None means +inf in this case, so bisect should return the second last element t = t[:-1] idx = bisect.bisect(t, tcut)-1 if idx < 0: return None return v[idx]
[docs]def getActionState(al, t): """ Takes an actionLog and a time, calculates the full state at that specific time. """ params = [('shutter', str), ('exposureTime', float), ('fps', float)] res = {} for k,typ in params: v = getActionValue(t, al[k]) if v is not None: res[k] = typ(v) return res
[docs]def stateIsComplete(state): """ Checks if all state variables are covered by a given state. """ return 'shutter' in state and 'exposureTime' in state and 'fps' in state
[docs]def getChanges(frameTimes, actionLog): """ Computes all actionLog changes for a given ``frameTimes`` and ``actionLog``, approximately interpolated to frames. :returns: ``(frameNo, stateBefore, stateAfter)`` """ actionTimes = getActionTimes(actionLog) state = [getActionState(actionLog, t) for t in actionTimes] validCuts = [(t, s) for t,s in zip(actionTimes, state) if stateIsComplete(s)] cutIndices = [bisect.bisect(frameTimes, vc[0]) for vc in validCuts] if cutIndices[0] != 0: print "WARNING: incomplete action log:", actionLog print cutIndices, validCuts cuts = [(i,vc[1]) for i,vc in zip(cutIndices, validCuts)] print "CUTS:", cuts return [(b[0], a[1], b[1]) for a,b in zip(cuts[:-1], cuts[1:])]
[docs]def preJoinChanges(changes): """ Joins common closely following changes to a single change. """ changes = list(changes) itA,itB = itertools.tee(iter(changes)) itB.next() lastWasJoin = False didAnything = False for b,a in itertools.izip(itB, itA): didAnything = True a_x0, a_before, a_after = a b_x0, b_before, b_after = b if all([a_before['shutter'] == 'close', a_after['shutter'] == 'open', b_before['shutter'] == 'open', b_after['shutter'] == 'open', a_after['exposureTime'] != b_after['exposureTime'], (b_x0 - a_x0) < max(a_after['fps'], b_before['fps']) ]): #print 'open and exp change:', b_x0 - a_x0, a_before, a_after, b_after yield a_x0, a_before, b_after lastWasJoin = True continue if not lastWasJoin: yield a lastWasJoin = False if not lastWasJoin: if didAnything: yield b else: print "yielding one" yield itA.next()
[docs]def refineChange(data, change): """ Uses a ``data``-timeline to refine the position of a given ``change``. """ pnt, before, after = change p = [] for cw, co in itertools.product(checkWidth, checkOfs): lo = max(pnt-cw+co, 0) hi = min(pnt+cw+co, len(data)) s = slice(lo, hi) xvals = np.arange(s.start, s.stop) yvals = data[s] fst = yvals[:5].mean() lst = yvals[-5:].mean() for cs in checkSteep: pstart = (pnt+co, lst if lst < fst else fst, np.abs(fst-lst), cs*np.sign(lst-fst)) p.append((pstart, 'guess %d, %d, %f'%(cw, co, cs))) try: popt, cov = curve_fit(logistic, xvals, yvals, pstart) consistent = consistencyCheck(before, after, popt, s, data) if consistent is True: p.append((popt, 'ok')) return popt, True, p else: p.append((popt, consistent)) raise RuntimeError(consistent) except RuntimeError: continue return pstart, False, p
[docs]def refineChanges(data, changes): """ Uses a ``data``-timeline to refine the positions of given ``changes``. """ for change in changes: popt, consistent, p = refineChange(data, change) x0, y0, a, b = popt x0Lo, x0Hi = bToCuts(x0,b) yield ((x0Lo, x0Hi) + change[-2:], consistent , p)
[docs]def mergeFits(a,b): """ Combines the results of two change fits to a single change. """ (a_x0Lo, a_x0Hi, a_before, a_after), a_good, pa = a (b_x0Lo, b_x0Hi, b_before, b_after), b_good, pb = b if a_good is True and not b_good is True: return (a_x0Lo, a_x0Hi, a_before, b_after), True, (pb + pa)[-10:] if b_good is True and not a_good is True: return (b_x0Lo, b_x0Hi, a_before, b_after), True, (pa + pb)[-10:] return (min(a_x0Lo, b_x0Lo), max(a_x0Hi, b_x0Hi), a_before, b_after), (a_good is True) or (b_good is True), (pa + pb)[-10:]
[docs]def joinChanges(changes): """ Joins nearby changes (after refinement). """ itA,itB = itertools.tee(iter(changes)) itB.next() lastWasJoin = False didAnything = False for b,a in itertools.izip(itB, itA): didAnything = True (a_x0Lo, a_x0Hi, a_before, a_after), a_good, pa = a (b_x0Lo, b_x0Hi, b_before, b_after), b_good, pb = b wa = joinWidth #a_x0Hi - a_x0Lo wb = joinWidth #b_x0Hi - b_x0Lo if min(a_x0Hi + wa, b_x0Hi + wb) > max(a_x0Lo - wa, b_x0Lo - wb): #if a_x0Hi + wa > x(b_x0Hi - wa, b_x0Lo - wb): yield mergeFits(a,b) lastWasJoin = True elif not lastWasJoin: yield a lastWasJoin = False else: lastWasJoin = False if not lastWasJoin: if didAnything: yield b else: yield itA.next()
def getChangesFromDataTimeAndAL(data, frameTimes, actionLog): if debug: import pylab changes = getChanges(frameTimes, actionLog) if debug: print 'PRE:', changes changes = preJoinChanges(changes) if debug: changes = list(changes) print 'JOIN1:', changes changes = refineChanges(data, changes) changes = list(sorted(changes,key=lambda c: c[0])) if debug: print "POST:", changes l = 0 while l != len(changes): l = len(changes) changes = list(joinChanges(changes)) if debug: pylab.plot(frameTimes, data) vert = min(data),max(data) for t in getActionTimes(actionLog): pylab.plot((t,t),vert, label='data', color='red') for p in changes: pylab.plot((frameTimes[p[0][0]],frameTimes[p[0][0]]),vert, label='data', color='orange') pylab.plot((frameTimes[p[0][1]],frameTimes[p[0][1]]),vert, label='data', color='orange') pylab.show() return changes
[docs]def getChangesFromEnviFile(fil): """ Computes and refines changes from EnviFile. """ h = deepcopy(fil.envi_header) ct.normalizeEnviHeader(h) spatStep = int(fil.shape[1]/30) specCenter = int(fil.shape[2]/2) data = fil[:,range(0,fil.shape[1],spatStep),specCenter-5:specCenter+5].mean(axis=-1).mean(axis=-1) return getChangesFromDataTimeAndAL(data, h['wallTimes'], h['actionLog'])
[docs]def getPartsFromDataTimeAndAL(data, frameTimes, actionLog): """ Computes changes and converts them to continous image parts :param data: An 1-dimensional array with representative illumination values for each frame :param frameTimes: An array of the same shape as ``data``, containing the time of each frame :param actionLog: The corresponding actionLog as :py:class:`dict` of lists of 2-tuples. :returns: A list containing ``(cutStart, cutStop, state)`` """ changes = getChangesFromDataTimeAndAL(data, frameTimes, actionLog) firstChange = ((0, 0, None, None), True, None) lastChange = ((len(frameTimes), len(frameTimes), getActionState(actionLog, frameTimes[-1]), None), True, None) changes = list(joinChanges([firstChange] + changes + [lastChange])) def partGen(): for a,b in zip(changes[:-1], changes[1:]): (preCutEnd, cutStart, _2, stateA), consistentA, _3 = a (cutStop, _1, stateB, _2), consistentB, _3 = b if preCutEnd < 0 and cutStart >= 0 and not consistentA: #Workaround for single capture with setting approximately at start cutStart = 0 consistentA = True if not (consistentA and consistentB): continue if stateA is None and stateB is None: continue elif stateA is None and stateB is not None: state = stateB elif stateB is None and stateA is not None: state = stateA else: try: assert len(stateA.keys()) == len(stateB.keys()) assert all(ka == kb for ka,kb in zip(sorted(stateA.keys()), sorted(stateB.keys()))) print stateA, stateB for k in stateA.keys(): assert stateA[k] == stateB[k] except AssertionError: print "WARNING, skipping inconsistent slice!" continue state = stateA yield cutStart, cutStop, state return list(partGen())