|
# -*- coding: utf-8 -*-
"""
mounttree
=========
handling of mount tree files
"""
import re
import numpy as np
import coordinateframes as cf
import dateutil.tz as datetz
rotPrimitives = {'x': cf.Rx, 'y': cf.Ry, 'z': cf.Rz}
reRotPrimitive = re.compile('^R([xyz])\\((-?[0-9]+(?:\\.[0-9]*)?)((?:deg|rad)?)\\)$')
def parseRotationPrimitives(r):
"""
Parses a rotation string, given as multiplication of rotations about x,y or z axes.
:param x: String like ``Rx(5deg)*Ry(-7.2rad)``
Units (``deg`` or ``rad``) are optional, ``rad`` is default.
"""
parts = r.split('*')
R = []
for p in parts:
m = reRotPrimitive.match(p)
axis, angle, unit = m.groups()
30 ↛ 31line 30 didn't jump to line 31, because the condition on line 30 was never true if len(unit) == 0:
unit = 'rad'
32 ↛ 34line 32 didn't jump to line 34, because the condition on line 32 was never false if unit == 'deg':
f = np.pi/180
elif unit == 'rad':
f = 1
else:
raise ValueError('unknown unit "%s"'%unit)
angle = float(angle)
R.append(rotPrimitives[axis](angle*f))
return reduce(cf.M.mmul,R)
def parseRotation(r):
"""
Parses a rotation, either as a list of euler angles or like :py:func:`parseRotationPrimitives`.
"""
46 ↛ 47line 46 didn't jump to line 47, because the condition on line 46 was never true if r is None:
return np.eye(3)
if isinstance(r, (list, tuple)):
return r
return parseRotationPrimitives(r)
def isNamed(x):
return isinstance(x, (str, unicode))
def setValueOrCreateUpdater(subtree, frame, fieldname, parser=None):
if parser is None:
parser = lambda x: x
if fieldname in subtree:
field = parser(subtree[fieldname])
namedFields = map(isNamed, field)
if any(namedFields):
argNames = [n for n, nf in zip(field, namedFields) if nf]
def updateFields(*args, **kwargs):
64 ↛ 65line 64 didn't jump to line 65, because the condition on line 64 was never true if len(args) > len(argNames):
raise ValueError('too many arguments')
66 ↛ 67line 66 didn't jump to line 67, because the loop on line 66 never started for k, v in zip(argNames, args):
kwargs[k] = v
newValue = [kwargs[f] if nf else f for f, nf in zip(field, namedFields)]
setattr(frame, fieldname, newValue)
return argNames, updateFields
else:
setattr(frame, fieldname, field)
def parseTree(t, u=None):
if u is None:
u = cf.CoordinateUniverse()
if 'framespec' in t:
frame = u.coordLib(t['framespec'], t['framename'])
else:
frame = u.getCoordinateFrame(t['framename'])
allUpdaters = []
for st in t.get('subframes', []):
subframe, updaters = parseTree(st, u)
subframe.referencedTo(frame)
updater = setValueOrCreateUpdater(st, subframe, 'position')
if updater is not None:
updaters.append(updater)
updater = setValueOrCreateUpdater(st, subframe, 'rotation', parseRotation)
if updater is not None:
updaters.append(updater)
allUpdaters += updaters
return frame, allUpdaters
class NullTimeCorrection(object):
"""
Dummy applicator for no time correction.
"""
def apply(self, t):
"""
Applies time correction (in this case, returns ``t`` as it is).
"""
return t
def apply_dt(self, t):
"""
Applies time correction (in this case, returns ``t`` as it is) for datetime64 object.
"""
return t
class ConstantTimeCorrection(object):
"""
Applicator for constant time correction.
:param dt: time offset (in seconds)
Performs a constant time correction.
"""
def __init__(self, dt):
self.dt = dt
def apply(self, t):
"""
Applies time correction.
"""
return t + self.dt
def apply_dt(self, t):
"""
Applies time correction for datetime64 object.
"""
return t + np.timedelta64(self.dt, 's')
class TwopointTimeCorrection(object):
"""
Applicator for two point time correction.
:param t1: reference time 1 (as unix timestamp)
:param t2: reference time 2 (as unix timestamp)
:param dt1: time offset at ``t1`` (in seconds)
:param dt2: time offset at ``t2`` (in seconds)
Performs a time correction by a linear interpolation (and extrapolation)
of the time offsets ``dt1`` and ``dt2`` between the reference times
``t1`` and ``t2``.
"""
def __init__(self, t1, t2, dt1, dt2):
self.t1 = t1
self.t2 = t2
self.t1_dt = np.datetime64('1970-01-01') + t1 * np.timedelta64(1, 's')
self.dt1 = dt1
self.dt2 = dt2
self.f = (self.dt2 - self.dt1) / (self.t2 - self.t1)
def apply(self, t):
"""
Applies time correction.
"""
return t + self.dt1 + self.f * (t - self.t1)
def apply_dt(self, t):
"""
Applies time correction for datetime64 object.
"""
return t + np.timedelta64(self.dt1, 's') + self.f * (t - self.t1_dt)
timeCorrections = {
'null': NullTimeCorrection,
'constant': ConstantTimeCorrection,
'twopoint': TwopointTimeCorrection
}
def parseSensor(sensor, coordinateframe):
tofs = sensor.get('tofs', {'method': 'null', 'params': {}})
return {
'name': sensor['name'],
'sensorId': sensor['sensorId'],
'coordinateframe': coordinateframe['framename'],
'tofs': timeCorrections[tofs['method']](**tofs['params']),
'provides': sensor.get('provides', [])
}
def parseSensors(t):
sensors = [parseSensor(s, t) for s in t.get('sensors', [])]
for sf in t.get('subframes', []):
sensors += parseSensors(sf)
return sensors
class UpdateApplicator(object):
def __init__(self, updaters):
self.updaters = updaters
def update(self, **kwargs):
for args, updater in self.updaters:
updater(**kwargs)
class MountTree(object):
"""
mounttree representation
.. py:attribute:: name
The name of the mounttree.
.. py:attribute:: validFrom
Start of validity period.
.. py:attribute:: validUntil
End of validity period.
.. py:function:: update(**kwargs)
Updates the dynamic variables of the mounttree.
All named variables from the mount tree can be given as keyword arguments.
.. py:attribute:: universe
The coordinate universe defined by the mounttree
.. py:attribute:: sensors
:py:class:`list` of all sensors defined by the tree.
.. py:attribute:: sensorsById
:py:class:`dict` of all sensors defined by the tree with ``sensorId`` as keys.
"""
@classmethod
def fromDict(cls, mtDict):
"""
Constructs a MountTree from a dictionary.
:param mtDict: :py:class:`dict` describing the mounttree.
"""
new = cls()
new.name = mtDict['description']['name']
new.validFrom = mtDict['description']['valid']['from']
if new.validFrom.tzinfo is None:
new.validFrom = new.validFrom.replace(tzinfo=datetz.tzutc())
new.validUntil = mtDict['description']['valid']['until']
if new.validUntil.tzinfo is None:
new.validUntil = new.validUntil.replace(tzinfo=datetz.tzutc())
new.baseframe, updaters = parseTree(mtDict['mounttree'])
new.universe = new.baseframe.universe
new.ua = UpdateApplicator(updaters)
new.update = new.ua.update
new.sensors = parseSensors(mtDict['mounttree'])
new.sensorsById = {s['sensorId']:s for s in new.sensors}
return new
@property
def requiredSensors(self):
"""
Returns a :py:class:`set` of all sensorIds which provide data for the mounttree orientation.
"""
return set(s['sensorId'] for s in self.sensors if len(s['provides']) > 0)
@property
def availableSensors(self):
"""
Returns a :py:class:`set` of all sensorIds defined in the mounttree.
"""
return set(s['sensorId'] for s in self.sensors)
@property
def coordinateframes(self):
"""
Returns a :py:class:`dict` of coordinate frames defined in the mounttree.
"""
return self.universe._frames
def _repr_dot_(self):
import pydot
graph = pydot.Dot(graph_type='digraph', graph_name='_', rankdir='TD')
for name, frame in self.coordinateframes.items():
node = pydot.Node(name, label=name)
graph.add_node(node)
for name, frame in self.coordinateframes.items():
if frame.referenceSystem is not None:
edge = pydot.Edge(frame.referenceSystem.name, name)
graph.add_edge(edge)
return graph
def _repr_svg_(self):
return self._repr_dot_().create_svg()
|