# -*- coding: utf-8 -*-
# Filename: product.py
"""
product base classes
====================
This is part of the macsProcessor suite.
Copyright (C) 2014 Tobias Kölling
"""
import socket
import hashlib
import importlib
import os
import pkgutil
import sys
import re
import numpy as np
from runmacs.processor.productpart import ProductPart
import runmacs.processor.querybuilder as querybuilder
from runmacs.processor.utils import presentAttrsAsItems
from runmacs.processor.sugar import *
import logging
import pint
logger = logging.getLogger(__name__)
try:
import pydot
except ImportError:
pydot = None
partGetterMethodNameRe = re.compile('^_get([A-Z][a-z]*)(?:By([A-Z][a-z]*))?$')
class ProductMetaclass(type):
def __new__(cls, clsname, bases, dct):
if bases == (object,):
dct = ProductMetaclass.__processMembers(clsname, bases, dct)
return super(ProductMetaclass, cls).__new__(cls, clsname, bases, dct)
elif ProductMixin in bases:
dct = ProductMetaclass.__processMembers(clsname, bases, dct)
return super(ProductMetaclass, cls).__new__(cls, clsname, bases, dct)
else:
# This creates something like a class singleton, so only the first import
# will be used in any case.
dct = ProductMetaclass.__processMembers(clsname, bases, dct)
CLS = super(ProductMetaclass, cls).__new__(cls, clsname, bases, dct)
name = CLS.productClassName()
try:
return Product.productClasses[name]
except KeyError:
Product.productClasses[name] = CLS
Product.requiredIndices |= set(CLS._requireIndices)
return CLS
@staticmethod
def __mergePassedParts(bases, parts):
outParts = {}
for b in bases:
try:
baseParts = b.passedParts
except AttributeError:
continue
68 ↛ 69line 68 didn't jump to line 69, because the loop on line 68 never started for partName, descr in baseParts.items():
outParts[partName] = descr
70 ↛ 71line 70 didn't jump to line 71, because the loop on line 70 never started for partName, descr in parts.items():
outParts[partName] = descr
return outParts
@staticmethod
def __mergePartDescription(a,b):
if a['returntype'] != b['returntype']:
raise ValueError('different part return types in derrived classes')
if a['unit'] != b['unit']:
raise ValueError('different part units in derrived classes')
if a['cacheable'] != b['cacheable']:
raise ValueError('different part cacheable specification in derrived classes')
fullGetter = b['fullGetter'] if b['fullGetter'] is not None else a['fullGetter']
sliceGetter = a['sliceGetter'].copy()
for k,v in b['sliceGetter'].items():
sliceGetter[k] = v
return {'returntype': b['returntype'],
'unit': b['unit'],
'cacheable': b['cacheable'],
'sliceGetter': sliceGetter,
'fullGetter': fullGetter}
@staticmethod
def __mergeParts(bases, parts):
outParts = {}
for b in bases:
try:
baseParts = b.parts
except AttributeError:
continue
for partName, descr in baseParts.items():
99 ↛ 100line 99 didn't jump to line 100, because the condition on line 99 was never true if partName in outParts:
outParts[partName] = __mergePartDescription(outParts[partName], descr)
else:
outParts[partName] = descr
for partName, descr in parts.items():
104 ↛ 105line 104 didn't jump to line 105, because the condition on line 104 was never true if partName in outParts:
outParts[partName] = __mergePartDescription(outParts[partName], descr)
else:
outParts[partName] = descr
return outParts
@staticmethod
def __mergePProps(bases, pprops):
outPProps = {}
for b in bases:
for k,v in getattr(b,'_pprops',{}).items():
outPProps[k] = v
for k,v in pprops.items():
outPProps[k] = v
return outPProps
@staticmethod
def __processMembers(clsname, bases, dct):
passedParts = dct.get('passedParts', {})
121 ↛ 122line 121 didn't jump to line 122, because the condition on line 121 was never true if hasattr(passedParts, '__get__'):
passedParts = {}
passedParts = ProductMetaclass.__mergePassedParts(bases, passedParts)
parts = {}
pprops = {}
for name, val in dct.items():
if name in ('ureg',): #workaround for special attributes
continue
m = partGetterMethodNameRe.match(name)
if m is not None:
returntype = getattr(val, '_returntype_', None)
unit = getattr(val, '_unit_', None)
cacheable = getattr(val, '_cacheable_', False)
part, iteraxis = m.group(1), m.group(2)
part = part.lower()
if iteraxis is not None:
iteraxis = iteraxis.lower()
try:
partDict = parts[part]
except KeyError:
partDict = {'returntype': returntype,
'unit' : unit,
'cacheable' : cacheable,
'fullGetter': None,
'sliceGetter': {}}
parts[part] = partDict
else:
148 ↛ 149line 148 didn't jump to line 149, because the condition on line 148 was never true if returntype != partDict['returntype']:
raise ValueError('part %s of product %s has mismatching return types: %s != %s!'%(
part, clsname, returntype, partDict['returntype']))
151 ↛ 152line 151 didn't jump to line 152, because the condition on line 151 was never true if unit != partDict['unit']:
raise ValueError('part %s of product %s has mismatching units: %s != %s!'%(
part, clsname, unit, partDict['unit']))
154 ↛ 155line 154 didn't jump to line 155, because the condition on line 154 was never true if cacheable != partDict['cacheable']:
raise ValueError('part %s of product %s has mismatching cacheable specification: %s != %s!'%(
part, clsname, cacheable, partDict['cacheable']))
if iteraxis is None:
partDict['fullGetter'] = val
else:
partDict['sliceGetter'][iteraxis] = val
if getattr(val, '_isPassedPart', False) == True:
del dct[name]
passedParts[name] = (val.component, val.part)
if getattr(val, '_isPersistentProperty', False) == True:
del dct[name]
pprops[name] = val
parts = ProductMetaclass.__mergeParts(bases, parts)
dct['passedParts'] = passedParts
dct['parts'] = parts
dct['_pprops'] = ProductMetaclass.__mergePProps(bases, pprops)
for partName in passedParts.keys():
172 ↛ 173line 172 didn't jump to line 173, because the condition on line 172 was never true if partName in parts:
raise ValueError('part %s of product %s is marked as passed but has an implementation!'%(partName, clsname))
return dct
class ChainloadedProductProxy(object):
def __init__(self, accessor, oid):
self.__accessor = accessor
self.__oid = oid
self.__product = None
def __ensureProductAvailable(self):
if self.__product is None:
self.__product = self.__accessor.get(self.__oid)
def __getattr__(self, attribute):
185 ↛ 189line 185 didn't jump to line 189, because the condition on line 185 was never false if not attribute.startswith('_ChainloadedProductProxy'):
self.__ensureProductAvailable()
return getattr(self.__product, attribute)
else:
super(ChainloadedProductProxy, self).__getattr__(attribute)
def __setattr__(self, attribute, value):
191 ↛ 192line 191 didn't jump to line 192, because the condition on line 191 was never true if not attribute.startswith('_ChainloadedProductProxy'):
return setattr(self.__product, attribute, value)
else:
super(ChainloadedProductProxy, self).__setattr__(attribute, value)
def __getitem__(self, item):
self.__ensureProductAvailable()
return self.__product[item]
@property
def hash(self):
return self.__oid
def _repr_svg_(self):
self.__ensureProductAvailable()
return self.__product._repr_svg_()
class Product(object):
__metaclass__ = ProductMetaclass
defaulthash = hashlib.sha256
productClasses = {}
requiredIndices = set()
_requireIndices = ()
updateProduct = False
longTermCache = False #generally, a long term cache is not a good idea, because many intermediary products exist
nodeShape = 'oval'
cacheExt = '.npz'
passedParts = {}
preferredSlicing = {}
executeLocal = True
ureg = pint.UnitRegistry()
_accessor = None
def __init__(self):
cls = type(self)
self._productSpec = {
'productClass': cls.productClassName(),
'productType': self.productType,
}
self._productSpecComplete = False
self._productSpecCreating = False
self._getattr_loop_preventer = False
@classmethod
def productClassName(cls):
return '%s.%s'%(cls.__module__, cls.__name__)
@classmethod
def fromProductSpec(cls, productSpec, accessor=None):
"""
Preferably call this with an ascociated product accessor, otherwise chainloading of products
and cacheing will not work.
:note: This goes deeper as you might think, sometimes chainloading is implicitly.
"""
pc = productSpec['productClass']
try:
PC = cls.productClasses[pc]
except KeyError:
pmn, pcn = pc.rsplit('.', 1)
PM = importlib.import_module(pmn)
PC = getattr(PM, pcn)
cls.productClasses[pc] = PC
new = PC()
new._accessor = accessor
new._productSpec = productSpec
new._productSpecComplete = True
new._loadProductSpec()
return new
@property
def hash(self):
try:
return self._productSpec['_oid']
except KeyError:
h = self._hash()
self._productSpec['_oid'] = h
return h
@property
def productSpec(self):
271 ↛ 283line 271 didn't jump to line 283, because the condition on line 271 was never false if len(self._pprops) > 0: #pprop mode should be the only one later, but this is for the transistion phase
try:
if not self._productSpecComplete:
self._fillProductSpec()
for k in self._pprops.keys(): #just request every pprop and ignore if it does not exist, this means it is not applicable
getattr(self, k, None)
self._productSpecComplete = True
return self._productSpec
except AttributeError as e:
logger.exception('error in productSpec generation')
raise RuntimeError, "Getting an AttributeError in productSpec %s" % e, sys.exc_info()[2]
else:
try:
if not self._productSpecComplete:
if self._productSpecCreating:
#accessing product spec while creating it -> best guess is to return the partially created spec
#print "returning incomplete ProductSpec", self._productSpec.keys()
return self._productSpec
self._productSpecCreating = True
self._fillProductSpec()
self._productSpecComplete = True
self._productSpecCreating = False
return self._productSpec
except AttributeError as e:
logger.exception('error in productSpec generation')
raise RuntimeError, "Getting an AttributeError in productSpec %s" % e, sys.exc_info()[2]
def _fillProductSpec(self):
pass
def _loadProductSpec(self):
pass
def __getattr__(self, attrname):
303 ↛ 335line 303 didn't jump to line 335, because the condition on line 303 was never false if len(self._pprops) > 0: #pprop mode should be the only one later, but this is for the transistion phase
try:
return self._productSpec[attrname]
except KeyError:
try:
pprop = self._pprops[attrname]
except KeyError:
310 ↛ 311line 310 didn't jump to line 311, because the condition on line 310 was never true if attrname == "collections":
return []
312 ↛ 313line 312 didn't jump to line 313, because the condition on line 312 was never true if self._getattr_loop_preventer:
productName = str(type(self))
else:
self._getattr_loop_preventer = True
try:
productName = self.canonicalName
except RuntimeError:
# can occur when canonicalName is dynamically calculated and results in an infinite loop
# so fall back to an alternate but less descriptive name for this error message
productName = str(type(self))
self._getattr_loop_preventer = False
raise AttributeError('Product %s has no attribute "%s"'%(productName, attrname))
try:
availableProducts = self.components.copy()
except AttributeError:
availableProducts = {}
availableProducts['self'] = self
329 ↛ 330line 329 didn't jump to line 330, because the condition on line 329 was never true if not pprop.isApplicable(availableProducts):
raise AttributeError('Attribute %s is not applicable for this product instance'%attrname)
res = pprop.compute(availableProducts)
self._productSpec[attrname] = res
return res
else:
try:
try:
return getattr(super(Product, self), attrname)
except AttributeError:
if attrname.startswith('_'):
raise AttributeError('no attribute "%s"'%attrname)
try:
return self.productSpec[attrname]
except:
#logger.exception('could not get attribute "%s" from %s', attrname, self)
raise
except KeyError:
if attrname == "collections":
return []
raise AttributeError('Product has not attribute "%s"'%attrname)
def __getitem__(self, itemname):
if itemname in self.passedParts:
component, part = self.passedParts[itemname]
return self.components[component][part]
else:
return ProductPart(self, itemname)
def getPreferredSlicing(self, part):
if part in self.preferredSlicing:
ps = self.preferredSlicing[part]
if isinstance(ps, (str, unicode)):
ps = [ps]
else:
ps = list(ps)
for item in self.productDimensions[part]:
if item not in ps:
ps.append(ps)
else:
ps = list(self.productDimensions[part])
return ps
@property
def cacheParts(self):
return {n:p['returntype'] for n,p in self.parts.items() if p['cacheable']}
@property
def longTermCache(self):
return len(self.cacheParts) > 0
goodForCacheing = pprop(lambda self: self.longTermCache)
@property
def canonicalName(self):
return type(self).__name__
def getRelations(self):
if pydot is None:
raise AttributeError('pydot is not loaded')
return [], [self] #[self.createDotNode()]
def createDotNode(self):
if self.longTermCache:
if self.executeLocal:
bgColor = '#48cccd'
else:
bgColor = '#82cafa'
else:
if self.executeLocal:
bgColor = '#c1f0c1'
else:
bgColor = '#ffffff'
return pydot.Node(self.hash, label=self.canonicalName, style='filled', fillcolor=bgColor, shape=self.nodeShape)
def _repr_dot_(self):
if pydot is None:
raise AttributeError('pydot is not loaded')
edges, nodes = self.getRelations()
g = pydot.Dot(graph_type='digraph', graph_name='_', rankdir='LR')
importedNodes = []
otherNodes = []
for node in nodes:
if isinstance(node, ImportedProduct):
importedNodes.append(node)
else:
otherNodes.append(node)
ig = pydot.Subgraph(graph_type='digraph', graph_name='', rank='same')
for node in importedNodes:
ig.add_node(node.createDotNode())
g.add_subgraph(ig)
for node in nodes:
g.add_node(node.createDotNode())
for edge in edges:
g.add_edge(edge)
return g
def _repr_svg_(self):
return self._repr_dot_().create_svg()
def getPrintableComponentHistory(self):
try:
components = self.components
except AttributeError:
return []
data = []
for componentName, component in sorted(components.items()):
data.append("+- %s (%s %s)"%(componentName, component.productType, component.hash))
for line in component.getPrintableComponentHistory():
data.append("| %s"%line)
return data
def unitConversion(self, sourceUnit, targetUnit):
sourceUnit = self.ureg(sourceUnit)
targetUnit = self.ureg(targetUnit)
return sourceUnit.to(targetUnit).magnitude
class ProductMixin(object):
__metaclass__ = ProductMetaclass
class ImportedProduct(Product):
pass
class ImportedFileProduct(ImportedProduct):
nodeShape = 'folder'
hostname = socket.gethostname()
def __init__(self):
super(ImportedFileProduct, self).__init__()
@classmethod
def fromProductSpec(cls, productSpec):
if productSpec['hostname'] != cls.hostname:
raise ValueError('cannot load file on this host')
def _fillProductSpec(self):
self._productSpec['hostname'] = self.hostname
super(ImportedFileProduct, self)._fillProductSpec()
def _loadProductSpec(self):
self.hostname = self._productSpec['hostname']
super(ImportedFileProduct, self)._loadProductSpec()
class WithComponentsMixin(ProductMixin):
def __init__(self):
self.components = {}
def getRelations(self):
if pydot is None:
raise AttributeError('pydot is not loaded')
edges = []
nodes = [self]
for componentName, componentObject in self.components.items():
subedges, subnodes = componentObject.getRelations()
edges += subedges
nodes += subnodes
edges.append(pydot.Edge(componentObject.hash, self.hash))
return edges, nodes
@pprop
def componentOids(self):
return [(k, v.hash) for k,v in self.components.items()]
def _fillProductSpec(self):
self._productSpec['componentOids'] = [(k, v.hash) for k,v in self.components.items()]
def _loadProductSpec(self):
self.components = dict((k, ChainloadedProductProxy(self._accessor, h)) for k, h in self._productSpec['componentOids'])
class UpdateFileProduct(WithComponentsMixin, ImportedFileProduct):
updateProduct = True
def __init__(self):
Product.__init__(self)
WithComponentsMixin.__init__(self)
def _fillProductSpec(self):
WithComponentsMixin._fillProductSpec(self)
ImportedFileProduct._fillProductSpec(self)
def _loadProductSpec(self):
WithComponentsMixin._loadProductSpec(self)
ImportedFileProduct._loadProductSpec(self)
def _find_base(self, base_spec):
505 ↛ 512line 505 didn't jump to line 512, because the condition on line 505 was never false if isinstance(base_spec, dict):
with self._accessor.query(base_spec) as res:
print list(res)
508 ↛ 509line 508 didn't jump to line 509, because the condition on line 508 was never true if res.count() != 1:
raise ValueError("base_spec \"{}\" is not unique, {} results found!".format(base_spec, res.count()))
return iter(res).next()
else:
return self._accessor.get(base_spec)
class DerrivedProduct(WithComponentsMixin, Product):
nodeShape = 'box'
#componentCount = ? #to be set in subclass
def __init__(self):
Product.__init__(self)
WithComponentsMixin.__init__(self)
self.building = False
self.extraHashAttributes = set()
@classmethod
def build(cls):
instance = cls()
instance.building = True
try:
query = instance.query
except AttributeError:
pass
else:
instance.componentNames, instance._productQueryGroup = querybuilder.processQuery(instance.query)
return instance
@property
def componentCount(self):
return len(self.componentNames)
@property
def queryPartRepresentations(self):
return tuple(['single'] * self.componentCount)
def whatIsNeeded(self, primaryComponent=None):
if primaryComponent is not None:
544 ↛ 548line 544 didn't jump to line 548, because the condition on line 544 was never false if primaryComponent not in self.components:
return {'query': self._productQueryGroup.getQuery(primaryComponent, {k:presentAttrsAsItems(v) for k,v in self.components.items()}),
'representation': self.queryPartRepresentations[self.componentNames.index(primaryComponent)],
'componentName': primaryComponent}
assert len(self.componentNames) == len(self.queryPartRepresentations)
for name, representation in zip(self.componentNames, self.queryPartRepresentations):
if name not in self.components:
try:
query = self._productQueryGroup.getQuery(name, {k:presentAttrsAsItems(v) for k,v in self.components.items()})
except querybuilder.NeedOtherComponent as oc:
name = oc.neededComponentName
query = self._productQueryGroup.getQuery(name, {k:presentAttrsAsItems(v) for k,v in self.components.items()})
return {'query': query,
'representation': representation,
'componentName': name}
else:
return None
def queryForSingleComponent(self, component):
return self._productQueryGroup.getQuery(component)
def tryToAdd(self, componentName, product):
yield self._addComponent(componentName, product)
def _addComponent(self, name, product):
569 ↛ 570line 569 didn't jump to line 570, because the condition on line 569 was never true if name in self.components:
raise NotNeeded('%s already present'%product.productType)
new = self.build()
new.components = self.components.copy()
new.components[name] = product
if len(new.components) == self.componentCount:
new.building = False
return new
def _fillProductSpec(self):
WithComponentsMixin._fillProductSpec(self)
Product._fillProductSpec(self)
def _loadProductSpec(self):
WithComponentsMixin._loadProductSpec(self)
Product._loadProductSpec(self)
def _hash(self):
prehashString = ':'.join((v.hash for k,v in sorted(self.components.items())))
for attr in sorted(self.extraHashAttributes):
prehashString += ';' + str(getattr(self, attr))
prehashString += ';' + self.productClassName()
return self.defaulthash(prehashString).hexdigest()
@property
def reduceFraction(self):
try:
inSize = sum(reduce( lambda x,y: x*y, component.dataSize.values()) for component in self.components.values())
outSize = reduce(lambda x,y: x*y, self.dataSize.values())
return float(outSize) / float(inSize)
except:
logger.exception('could not calculate reduceFraction')
raise
@property
def reducing(self):
try:
return self.reduceFraction <= .5
except:
logger.exception('could not calculate reducing')
raise
@property
def executeLocal(self):
return True
#try:
#return self.computeComplexity * self.reduceFraction <= 1.
#except:
#logger.exception('could not calculate executeLocal')
#raise
class NotNeeded(Exception):
pass
|