# -*- coding: utf-8 -*-
"""
frameclassification
===================
This module provides heuristical functions to find exact frame positions of changes of the cameras settings.
You will most likely need one of:
* :py:func:`getChangesFromDataTimeAndAL`
* :py:func:`getChangesFromEnviFile`
* :py:func:`getPartsFromDataTimeAndAL`
function documentation
----------------------
"""
import math
import bisect
import itertools
from copy import deepcopy
import numpy as np
from scipy.optimize import curve_fit
import runmacs.spec.calibration.tools as ct
checkWidth = (20,30)
checkOfs = (0, -5, -10, 10)
checkSteep = (1., 0.5, 0.25)
joinWidth = 3
debug = False
def logistic(x, x0, y0, a=1., b=-1):
return a/(1+np.exp(-b*(x-x0))) + y0
def bToCuts(x0, b):
w = abs(5./b) + 1.
w = min(100.,w)
x0Lo = int(math.floor(x0-w))
x0Hi = int(math.ceil(x0+w))
return x0Lo, x0Hi
def consistencyCheck(partA,partB,popt,fitSlice,data):
#print popt
x0, y0, a, b = popt
x0Lo, x0Hi = bToCuts(x0, b)
if x0 > fitSlice.stop - 5:
return 'too far right!'
if x0 < fitSlice.start + 5:
return 'too far left!'
if abs(b) < 1:
return 'not steep enough!'
if a < 20:
return 'not high enough!'
if partA['shutter'] == 'close' and partB['shutter'] == 'open' and partA['exposureTime'] <= partB['exposureTime']:
if b < 0:
return 'no raise!'
if partA['shutter'] == 'open' and partB['shutter'] == 'close' and partA['exposureTime'] >= partB['exposureTime']:
59 ↛ 60line 59 didn't jump to line 60, because the condition on line 59 was never true if b > 0:
return 'no fall!'
if partA['shutter'] == partB['shutter']:
if partA['exposureTime'] > partB['exposureTime']:
63 ↛ 64line 63 didn't jump to line 64, because the condition on line 63 was never true if b > 0:
return 'no fall!'
if partA['exposureTime'] < partB['exposureTime']:
if b < 0:
return 'no raise!'
if partA['shutter'] == 'close':
if data[fitSlice.start:x0Lo].std() > 5:
return 'too noisy for dark'
if partB['shutter'] == 'close':
if data[x0Hi:fitSlice.stop].std() > 5:
return 'too noisy for dark'
return True
def getActionTimes(al):
"""
Takes an actionLog and returns all contained unique times in sorted order.
"""
tShutter = {t for v,t in al['shutter'] if t is not None}
tFps = {t for v,t in al['fps'] if t is not None}
tExposureTime = {t for v,t in al['exposureTime'] if t is not None}
return sorted(tShutter | tFps | tExposureTime)
def getActionValue(tcut, al):
"""
Takes a cut time and an actionLog of a single value (e.g. ..['fps']).
:returns: Value at the time of ``tcut``
"""
v,t = zip(*al)
if t[0] is not None and tcut < t[0]:
return None
94 ↛ 95line 94 didn't jump to line 95, because the condition on line 94 was never true if t[0] is None:
t = (tcut, ) + t[1:]
96 ↛ 98line 96 didn't jump to line 98, because the condition on line 96 was never true if t[-1] is None:
#None means +inf in this case, so bisect should return the second last element
t = t[:-1]
idx = bisect.bisect(t, tcut)-1
100 ↛ 101line 100 didn't jump to line 101, because the condition on line 100 was never true if idx < 0:
return None
return v[idx]
def getActionState(al, t):
"""
Takes an actionLog and a time, calculates the full state at that specific time.
"""
params = [('shutter', str),
('exposureTime', float),
('fps', float)]
res = {}
for k,typ in params:
v = getActionValue(t, al[k])
if v is not None:
res[k] = typ(v)
return res
def stateIsComplete(state):
"""
Checks if all state variables are covered by a given state.
"""
return 'shutter' in state and 'exposureTime' in state and 'fps' in state
def getChanges(frameTimes, actionLog):
"""
Computes all actionLog changes for a given ``frameTimes`` and ``actionLog``, approximately interpolated to frames.
:returns: ``(frameNo, stateBefore, stateAfter)``
"""
actionTimes = getActionTimes(actionLog)
state = [getActionState(actionLog, t) for t in actionTimes]
validCuts = [(t, s) for t,s in zip(actionTimes, state) if stateIsComplete(s)]
cutIndices = [bisect.bisect(frameTimes, vc[0]) for vc in validCuts]
if cutIndices[0] != 0:
print "WARNING: incomplete action log:", actionLog
print cutIndices, validCuts
cuts = [(i,vc[1]) for i,vc in zip(cutIndices, validCuts)]
print "CUTS:", cuts
return [(b[0], a[1], b[1]) for a,b in zip(cuts[:-1], cuts[1:])]
def preJoinChanges(changes):
"""
Joins common closely following changes to a single change.
"""
changes = list(changes)
itA,itB = itertools.tee(iter(changes))
itB.next()
lastWasJoin = False
didAnything = False
for b,a in itertools.izip(itB, itA):
didAnything = True
a_x0, a_before, a_after = a
b_x0, b_before, b_after = b
if all([a_before['shutter'] == 'close',
a_after['shutter'] == 'open',
b_before['shutter'] == 'open',
b_after['shutter'] == 'open',
a_after['exposureTime'] != b_after['exposureTime'],
(b_x0 - a_x0) < max(a_after['fps'], b_before['fps'])
]):
#print 'open and exp change:', b_x0 - a_x0, a_before, a_after, b_after
yield a_x0, a_before, b_after
lastWasJoin = True
continue
if not lastWasJoin:
yield a
lastWasJoin = False
if not lastWasJoin:
if didAnything:
yield b
else:
print "yielding one"
yield itA.next()
def refineChange(data, change):
"""
Uses a ``data``-timeline to refine the position of a given ``change``.
"""
pnt, before, after = change
p = []
for cw, co in itertools.product(checkWidth, checkOfs):
lo = max(pnt-cw+co, 0)
hi = min(pnt+cw+co, len(data))
s = slice(lo, hi)
xvals = np.arange(s.start, s.stop)
yvals = data[s]
fst = yvals[:5].mean()
lst = yvals[-5:].mean()
for cs in checkSteep:
pstart = (pnt+co, lst if lst < fst else fst, np.abs(fst-lst), cs*np.sign(lst-fst))
p.append((pstart, 'guess %d, %d, %f'%(cw, co, cs)))
try:
popt, cov = curve_fit(logistic, xvals, yvals, pstart)
consistent = consistencyCheck(before, after, popt, s, data)
if consistent is True:
p.append((popt, 'ok'))
return popt, True, p
else:
p.append((popt, consistent))
raise RuntimeError(consistent)
except RuntimeError:
continue
return pstart, False, p
def refineChanges(data, changes):
"""
Uses a ``data``-timeline to refine the positions of given ``changes``.
"""
for change in changes:
popt, consistent, p = refineChange(data, change)
x0, y0, a, b = popt
x0Lo, x0Hi = bToCuts(x0,b)
yield ((x0Lo, x0Hi) + change[-2:], consistent , p)
def mergeFits(a,b):
"""
Combines the results of two change fits to a single change.
"""
(a_x0Lo, a_x0Hi, a_before, a_after), a_good, pa = a
(b_x0Lo, b_x0Hi, b_before, b_after), b_good, pb = b
223 ↛ 224line 223 didn't jump to line 224, because the condition on line 223 was never true if a_good is True and not b_good is True:
return (a_x0Lo, a_x0Hi, a_before, b_after), True, (pb + pa)[-10:]
if b_good is True and not a_good is True:
return (b_x0Lo, b_x0Hi, a_before, b_after), True, (pa + pb)[-10:]
return (min(a_x0Lo, b_x0Lo), max(a_x0Hi, b_x0Hi), a_before, b_after), (a_good is True) or (b_good is True), (pa + pb)[-10:]
def joinChanges(changes):
"""
Joins nearby changes (after refinement).
"""
itA,itB = itertools.tee(iter(changes))
itB.next()
lastWasJoin = False
didAnything = False
for b,a in itertools.izip(itB, itA):
didAnything = True
(a_x0Lo, a_x0Hi, a_before, a_after), a_good, pa = a
(b_x0Lo, b_x0Hi, b_before, b_after), b_good, pb = b
wa = joinWidth #a_x0Hi - a_x0Lo
wb = joinWidth #b_x0Hi - b_x0Lo
if min(a_x0Hi + wa, b_x0Hi + wb) > max(a_x0Lo - wa, b_x0Lo - wb):
#if a_x0Hi + wa > x(b_x0Hi - wa, b_x0Lo - wb):
yield mergeFits(a,b)
lastWasJoin = True
elif not lastWasJoin:
yield a
lastWasJoin = False
else:
lastWasJoin = False
252 ↛ exitline 252 didn't return from function 'joinChanges', because the condition on line 252 was never false if not lastWasJoin:
if didAnything:
yield b
else:
yield itA.next()
def getChangesFromDataTimeAndAL(data, frameTimes, actionLog):
259 ↛ 260line 259 didn't jump to line 260, because the condition on line 259 was never true if debug:
import pylab
changes = getChanges(frameTimes, actionLog)
262 ↛ 263line 262 didn't jump to line 263, because the condition on line 262 was never true if debug:
print 'PRE:', changes
changes = preJoinChanges(changes)
265 ↛ 266line 265 didn't jump to line 266, because the condition on line 265 was never true if debug:
changes = list(changes)
print 'JOIN1:', changes
changes = refineChanges(data, changes)
changes = list(sorted(changes,key=lambda c: c[0]))
270 ↛ 271line 270 didn't jump to line 271, because the condition on line 270 was never true if debug:
print "POST:", changes
l = 0
while l != len(changes):
l = len(changes)
changes = list(joinChanges(changes))
276 ↛ 277line 276 didn't jump to line 277, because the condition on line 276 was never true if debug:
pylab.plot(frameTimes, data)
vert = min(data),max(data)
for t in getActionTimes(actionLog):
pylab.plot((t,t),vert, label='data', color='red')
for p in changes:
pylab.plot((frameTimes[p[0][0]],frameTimes[p[0][0]]),vert, label='data', color='orange')
pylab.plot((frameTimes[p[0][1]],frameTimes[p[0][1]]),vert, label='data', color='orange')
pylab.show()
return changes
def getChangesFromEnviFile(fil):
"""
Computes and refines changes from EnviFile.
"""
h = deepcopy(fil.envi_header)
ct.normalizeEnviHeader(h)
spatStep = int(fil.shape[1]/30)
specCenter = int(fil.shape[2]/2)
data = fil[:,range(0,fil.shape[1],spatStep),specCenter-5:specCenter+5].mean(axis=-1).mean(axis=-1)
return getChangesFromDataTimeAndAL(data, h['wallTimes'], h['actionLog'])
def getPartsFromDataTimeAndAL(data, frameTimes, actionLog):
"""
Computes changes and converts them to continous image parts
:param data: An 1-dimensional array with representative illumination values for each frame
:param frameTimes: An array of the same shape as ``data``, containing the time of each frame
:param actionLog: The corresponding actionLog as :py:class:`dict` of lists of 2-tuples.
:returns: A list containing ``(cutStart, cutStop, state)``
"""
changes = getChangesFromDataTimeAndAL(data, frameTimes, actionLog)
firstChange = ((0, 0, None, None), True, None)
lastChange = ((len(frameTimes),
len(frameTimes),
getActionState(actionLog, frameTimes[-1]), None), True, None)
changes = list(joinChanges([firstChange] + changes + [lastChange]))
def partGen():
for a,b in zip(changes[:-1], changes[1:]):
(preCutEnd, cutStart, _2, stateA), consistentA, _3 = a
(cutStop, _1, stateB, _2), consistentB, _3 = b
318 ↛ 319line 318 didn't jump to line 319, because the condition on line 318 was never true if preCutEnd < 0 and cutStart >= 0 and not consistentA: #Workaround for single capture with setting approximately at start
cutStart = 0
consistentA = True
321 ↛ 322line 321 didn't jump to line 322, because the condition on line 321 was never true if not (consistentA and consistentB):
continue
323 ↛ 324line 323 didn't jump to line 324, because the condition on line 323 was never true if stateA is None and stateB is None:
continue
elif stateA is None and stateB is not None:
state = stateB
327 ↛ 328line 327 didn't jump to line 328, because the condition on line 327 was never true elif stateB is None and stateA is not None:
state = stateA
else:
try:
assert len(stateA.keys()) == len(stateB.keys())
assert all(ka == kb for ka,kb in zip(sorted(stateA.keys()), sorted(stateB.keys())))
print stateA, stateB
for k in stateA.keys():
assert stateA[k] == stateB[k]
except AssertionError:
print "WARNING, skipping inconsistent slice!"
continue
state = stateA
yield cutStart, cutStop, state
return list(partGen())
|