# -*- coding: utf-8 -*-
# Filename: productpart.py
"""
product part
============
This is part of the macsProcessor suite.
Copyright (C) 2014 Tobias Kölling
"""
import os
import numpy as np
from itertools import izip
import logging
logger = logging.getLogger(__name__)
debug = False
class ProductPart(object):
def __init__(self, product, part, axis=0, restrictions=None):
self.product = product
self.part = part
self.axis = axis
self.axes = self.product.productDimensions[part]
self.axisName = self.axes[axis]
self.doCache = part in self.product.cacheParts
self.start = 0
self.stop = self.product.dataSize[self.axisName]
if restrictions is None:
self.restrictions = {}
else:
self.restrictions = restrictions.copy()
self._dtype = None
if self.doCache:
self._dtype = np.dtype(self.product.cacheParts[part])
if self.product._accessor is None:
self.doCache = False
else:
cachedir = self.product._accessor.cachedir
43 ↛ 46line 43 didn't jump to line 46, because the condition on line 43 was never false if cachedir is None:
self.doCache = False
else:
h = self.product.hash
self.cachedir = os.path.join(cachedir, h[:2], h)
if not os.path.exists(self.cachedir):
os.makedirs(self.cachedir)
self.cacheComplete = os.path.exists(os.path.join(self.cachedir, '%s.cacheComplete'%self.part))
cacheFile = os.path.join(self.cachedir, '%s.data'%self.part)
open(cacheFile,'a').close()
self.cache = np.memmap(cacheFile,
dtype=self.product.cacheParts[part],
shape=tuple(self.product.dataSize[ax] for ax in self.axes),
mode='r+')
if not self.cacheComplete:
validFile = os.path.join(self.cachedir, '%s.%s.valid'%(self.part, self.axisName))
open(validFile,'a').close()
self.cachedSlices = np.memmap(validFile,
dtype='bool',
shape=(self.product.dataSize[self.axisName],),
mode='r+')
def T(self, newaxis):
"""
Transposes Part to given iterable axis.
"""
restrictions = self.restrictions.copy()
restrictions[self.axis] = (self.start, self.stop, 1)
if isinstance(newaxis, int):
return ProductPart(self.product, self.part, newaxis, restrictions=restrictions)
else:
return ProductPart(self.product, self.part, self.axes.index(newaxis), restrictions=restrictions)
def S(self, start, stop):
"""
Slice to a given range along the iterable axis.
"""
newPart = ProductPart(self.product, self.part, self.axis, restrictions=self.restrictions)
newPart.start = self.start + start
newPart.stop = self.start + stop
return newPart
def get(self):
"""
Gets the whole data set.
"""
87 ↛ 88line 87 didn't jump to line 88, because the condition on line 87 was never true if debug:
print "getting", self.product, self.part, int(self.doCache)
89 ↛ 90line 89 didn't jump to line 90, because the condition on line 89 was never true if self.doCache:
if self.cacheComplete:
return self.cache
try:
getter = getattr(self.product, '_get%s'%self.part.title())
except AttributeError:
logger.debug("using inefficient array builder")
96 ↛ 103line 96 didn't jump to line 103, because the loop on line 96 didn't complete for axis in xrange(len(self.axes)):
try:
it = self.T(axis).__iter__(True)
except NotImplementedError:
continue
break
else:
raise NotImplementedError('not enough getter methods implemented for %s!'%self.part)
def getter():
el0 = it.next()
shape = [self.product.dataSize[dim] for dim in self.axes]
temp = np.rollaxis(np.zeros(shape, dtype=el0.dtype),axis)
temp[0] = el0
for i, el in enumerate(it, 1):
temp[i] = el
return np.rollaxis(temp,0,axis+1)
112 ↛ 113line 112 didn't jump to line 113, because the condition on line 112 was never true if self.doCache:
self.cache[:] = getter()[:]
self._setCacheComplete()
return self.cache
return getter()
def copyTo(self, other):
"""
Copies content to another numpy array and tries to be efficient.
"""
print "copying", self.product, self.part
ps = self.product.getPreferredSlicing(self.part)
print "preferred axis is:", ps[0], "current axis:", self.axisName
if ps[0] != self.axisName:
return self.T(ps[0]).copyTo(other)
#this is a workaround for NetCDF not supporting rollaxis / transpose
if isinstance(other, np.ndarray):
rolledTarget = np.rollaxis(other, self.axis)
print "copying rolled"
for target, source in izip(rolledTarget, self):
target[...] = source
else:
s = [slice(None)] * len(self.axes)
print "copying sliced"
for i, source in enumerate(self):
s[self.axis] = i
other[s] = source
@property
def isCached(self):
"""
If cacheing is enabled, returns if data is already cached, otherwise returns always true.
"""
return (not self.doCache) or self.cacheComplete
def __cast(self, data):
146 ↛ 147line 146 didn't jump to line 147, because the condition on line 146 was never true if self.dtype == np.dtype(object):
return data
if data.dtype != self.dtype:
logger.warning("converting dtype %s -> %s!!!", data.dtype, self.dtype)
# some older numpy does not support keyword arguments, for reference they are:
# dtype, order, casting, subok, copy
return data.astype(self.dtype, 'K', 'unsafe', True, False)
else:
return data
def __iter__(self, preventLoop=False):
156 ↛ 157line 156 didn't jump to line 157, because the condition on line 156 was never true if self.doCache:
if self.cacheComplete or all(self.cachedSlices[self.start:self.stop]):
def getter():
for t in np.rollaxis(self.cache[self._restrictionSlice], self.axis):
yield t
del t
return getter()
mayCache = True
else:
mayCache = False
try:
getter = getattr(self.product, self.partIterator)
except AttributeError:
169 ↛ 177line 169 didn't jump to line 177, because the condition on line 169 was never false if not preventLoop:
logger.debug("using inefficient slicing algorithm for %s", self.product)
mayCache = False
def getter(start, stop):
for t in np.rollaxis(self.get()[self._restrictionSlice], self.axis):
yield t
del t
else:
raise NotImplementedError('not enough getter methods implemented!')
else:
179 ↛ 184line 179 didn't jump to line 184, because the condition on line 179 was never false if not mayCache:
getter2 = getter
def getter(start, stop):
for data in getter2(start, stop):
yield self.__cast(data[self._partialRestrictionSlice])
184 ↛ 185line 184 didn't jump to line 185, because the condition on line 184 was never true if mayCache:
getter2 = getter
def getter(start, stop):
temp = np.rollaxis(self.cache, self.axis)
for i, el in enumerate(getter2(start, stop), start):
temp[i] = el
self.cachedSlices[i] = True
yield el[self._partialRestrictionSlice]
if all(self.cachedSlices):
self._setCacheComplete()
return getter(self.start, self.stop)
def __len__(self):
return self.stop-self.start
@property
def shape(self):
return tuple(self.product.dataSize[axis] for axis in self.axes)
@property
def dtype(self):
#we need some hacking to get the correct dtype for the iterator case
if self._dtype is None:
if self.part in self.product.parts:
self._dtype = self.product.parts[self.part]['returntype']
206 ↛ 209line 206 didn't jump to line 209, because the condition on line 206 was never false if self._dtype is not None:
print "found dtype in product specification"
return self._dtype
ps = self.product.getPreferredSlicing(self.part)
210 ↛ 211line 210 didn't jump to line 211, because the condition on line 210 was never true if ps[0] != self.axisName:
return self.T(ps[0]).dtype
print "getting dtype of", self.product.__class__.__name__, self.part
firstElement = iter(self).next()
try:
self._dtype = firstElement.dtype
except AttributeError:
self._dtype = np.dtype(object)
print "got dtype"
return self._dtype
@property
def unit(self):
return self.product.parts[self.part]['unit']
@property
def partIterator(self):
return '_get%sBy%s'%(self.part.title(), self.axisName.title())
def _setCacheComplete(self):
open(os.path.join(self.cachedir, '%s.cacheComplete'%self.part), 'a').close()
self.cacheComplete = True
@property
def _restrictionSlice(self):
s = [slice(None)]*len(self.axes)
for axis, restriction in self.restrictions.items():
s[axis] = slice(*restriction)
s[self.axis] = slice(self.start, self.stop, 1)
return tuple(s)
@property
def _partialRestrictionSlice(self):
s = [slice(None)]*len(self.axes)
for axis, restriction in self.restrictions.items():
s[axis] = slice(*restriction)
s[self.axis] = slice(self.start, self.stop, 1)
return tuple(s[:self.axis] + s[self.axis+1:])
|