# -*- coding: utf-8 -*-
# Filename: metaStorage.py
"""
meta storage for macsserver
===========================
:author: Tobias Kölling
The meta storage provides an abstraction for a database used to store metadata
about the processing of raw data to products.
Current backend implementations include:
* :py:class:`DictMetaStorage`: Using a python :py:class:`dict` as in-memory storage.
This storage can only be used in one process and looses all contents after program
termination. It is useful for e.g. for unittests.
* :py:class:`LevelDBMetaStorage`: Using a `LevelDB <http://leveldb.org/>`_ as on-disk storage.
This storage can only be used in one process at a time, but keeps information after
program termination. It is intended for interactive testing.
* :py:class:`MongoDBMetaStorage`: Using a `MongoDB <https://www.mongodb.com/>`_ as storage.
This storage can be used by multiple processes concurrently and is intended for
production use.
"""
import re
import cPickle as pickle
import datetime
from itertools import ifilter, starmap
from collections import defaultdict
from runmacs.processor.querybuilder import getNestedItem
class MetaStorage(object):
"""
Base class for every meta storage implementation.
"""
pass
def normalizeStringIndex(value):
if isinstance(value, int):
return '%08x'%value
elif isinstance(value, datetime.datetime):
return value.isoformat()
else:
return str(value)
def robustCmp(a, b):
try:
return cmp(a, b)
except TypeError:
if a is None:
if b is None:
return 0
return -1
if b is None:
return 1
return 0
def modify_tree(inp, match, f, path=None):
62 ↛ 64line 62 didn't jump to line 64, because the condition on line 62 was never false if path is None:
path = []
64 ↛ 66line 64 didn't jump to line 66, because the condition on line 64 was never false if match(path, inp):
return f(inp)
if isinstance(inp, list):
return [modify_tree(element, match, f, path+[i])
for i, element
in enumerate(inp)]
if isinstance(inp, dict):
return {k:modify_tree(v, match, f, path+[k])
for k, v in
inp.iteritems()}
return inp
def apply_set(data, operation):
for key, value in operation.items():
search_path = key.split(".")
search_path, elkey = search_path[:-1], search_path[-1]
def modifier(dct):
oldvalue = dct.get(elkey, [])
dct = dct.copy()
dct[elkey] = value
return dct
data = modify_tree(data, lambda path, _: path == search_path, modifier)
return data
def apply_push(data, operation):
for key, value in operation.items():
search_path = key.split(".")
search_path, elkey = search_path[:-1], search_path[-1]
def modifier(dct):
oldvalue = dct.get(elkey, [])
dct = dct.copy()
dct[elkey] = oldvalue + [value]
return dct
data = modify_tree(data,
lambda path, _: path == search_path,
modifier)
return data
def apply_add_to_set(data, operation):
for key, value in operation.items():
search_path = key.split(".")
search_path, elkey = search_path[:-1], search_path[-1]
def modifier(dct):
oldvalue = dct.get(elkey, [])
if value not in oldvalue:
dct = dct.copy()
dct[elkey] = oldvalue + [value]
return dct
data = modify_tree(data,
lambda path, _: path == search_path,
modifier)
return data
def apply_unset(data, operation):
for key, value in operation.items():
search_path = key.split(".")
search_path, elkey = search_path[:-1], search_path[-1]
data = modify_tree(data,
lambda path, _: path == search_path,
lambda dct: {k:v for k,v in dct.items() if k != elkey})
return data
UPDATE_APPLICATORS = {
"$set": apply_set,
"$push": apply_push,
"$addToSet": apply_add_to_set,
"$unset": apply_unset,
}
def apply_update(data, operation):
for opcode, opdata in operation.items():
data = UPDATE_APPLICATORS[opcode](data, opdata)
return data
class EnhancedIterable(object):
"""
Iterable which behaves more like a MongoDB-Cursor.
"""
def __init__(self, it):
self._limit = None
self._ofs = 0
self.l = list(it)
def __iter__(self):
return iter(self.l[self.s])
def sort(self, key, direction=None):
150 ↛ 153line 150 didn't jump to line 153, because the condition on line 150 was never false if isinstance(key, list):
assert len(key) == 1, 'only single key sort is supported'
key, direction = key[0]
elif isinstance(key, tuple):
assert direction is None, 'if key is given as tuple, direction cannot be given'
key, direction = key
self.l = sorted(self.l, cmp=robustCmp, key=lambda x: x.get(key, None), reverse=direction<0)
return self
def limit(self, limit):
self._limit = limit
return self
def skip(self, ofs):
self._ofs = ofs
return self
def batch_size(self, size):
#only matters for remote storage
return self
def count(self):
return len(self.l[self.s])
def close(self):
"""
EnhancedIterable is Garbage Collected, explicit closing is not needed.
"""
pass
@property
def s(self):
if self._limit is None:
s = slice(self._ofs, None)
else:
s = slice(self._ofs, self._ofs+self._limit)
return s
def filterMatch(query, data):
if isinstance(query, dict):
for key, subquery in query.items():
if key == '$in':
test_data = data
if not isinstance(data, list):
test_data = [data]
if not any(d in subquery for d in test_data):
return False
elif key == '$nin':
if data in subquery:
return False
elif key == '$elemMatch':
if not any(filterMatch(subquery, d) for d in data):
return False
elif key == '$gte':
if not data >= subquery:
return False
elif key == '$lte':
if not data <= subquery:
return False
elif key == '$gt':
if not data > subquery:
return False
elif key == '$lt':
if not data < subquery:
return False
elif key == '$ne':
if isinstance(data, list):
if any(d==subquery for d in data):
return False
else:
if data==subquery:
return False
217 ↛ 185line 217 didn't jump to line 185, because the condition on line 217 was never false elif key == '$regex':
if re.search(subquery, data) is None:
return False
else:
if isinstance(data, list):
223 ↛ exitline 223 didn't finish the generator expression on line 223 return any(filterMatch(query, element) for element in data)
return query == data
return True
def filterQuery(query, data, context=None):
"""
Checks if ``data`` matches ``query``.
:param query: A mongodb-style query.
:param data: A dict/list tree representing mongodb-like data.
:returns: True if query matches the data, False otherwise.
"""
235 ↛ 265line 235 didn't jump to line 265, because the condition on line 235 was never false if isinstance(query, dict):
236 ↛ 265line 236 didn't jump to line 265, because the condition on line 236 was never false if context is None:
for key, subquery in query.items():
if key == '$or':
if not any(filterQuery(q, data) for q in subquery):
return False
241 ↛ 242line 241 didn't jump to line 242, because the condition on line 241 was never true elif key == '$elemMatch':
if not any(filterMatch(subquery, d) for d in data):
return False
else:
ok = False
if isinstance(subquery, dict):
if len(subquery) == 0:
ok = True
if not ok:
250 ↛ 251line 250 didn't jump to line 251, because the condition on line 250 was never true if key == 'oid':
key = '_oid'
try:
subdata = getNestedItem(data, key)
except KeyError:
try:
if len(set(subquery.keys()) - set(['$nin', '$ne'])) == 0:
ok = True
continue
except AttributeError:
pass
return False
if not filterMatch(subquery, subdata):
return False
return True
return True
def enhancedFilter(query, it):
"""
Filters many datasets using a ``query``.
Creates an :py:class:`EnhancedIterable` from and iterable of mongodb-like datasets.
The result will only contain datasets which match the given ``query``.
:param query: A mongodb-style query.
:param it: An iterable over dict/list trees representing mongodb-like data.
:returns: An :py:class:`EnhancedIterable` with matching datasets only.
"""
f = lambda x: filterQuery(query, x)
return EnhancedIterable(ifilter(f, it))
def evaluate_query_expression(expr, data):
if not isinstance(expr, str) or not expr.startswith("$"):
return expr
expr = expr[1:]
return data.get(expr, None)
REDUCTIONS = {
"$sum": sum,
}
def aggregate_group(query, it):
_id = query["_id"]
def mapper(el):
return evaluate_query_expression(_id, el)
acc = defaultdict(list)
for el in it:
key = mapper(el)
299 ↛ 300line 299 didn't jump to line 300, because the condition on line 299 was never true if key is None:
continue
acc[key].append(el)
expressions = []
for key, value in query.items():
if key == "_id":
continue
(reduction, expression), = value.items()
expressions.append((key, reduction, expression))
def reducer(key, elements):
res = {"_id": key}
for field, reduction, expression in expressions:
res[field] = REDUCTIONS[reduction](evaluate_query_expression(expression, el)
for el in elements)
return res
return starmap(reducer, acc.items())
def aggregate_unwind(query, it):
for el in it:
parts = evaluate_query_expression(query, el)
322 ↛ 323line 322 didn't jump to line 323, because the condition on line 322 was never true if parts is None:
continue
try:
iter(parts)
except TypeError:
parts = [parts]
for part in parts:
res = el.copy()
res[query[1:]] = part
yield res
AGGREGATION_STAGES = {
"$match": enhancedFilter,
"$group": aggregate_group,
"$unwind": aggregate_unwind,
}
class KVBaseMetaStorage(MetaStorage):
"""
Base class for MetaStorage using key-value based backends.
"""
def __init__(self):
try:
self.config = self._get('_config')
except KeyError:
self.config = {}
def storeConfig(self):
self._put('_config', self.config)
@property
def indices(self):
try:
return self.config['indices']
except KeyError:
return []
def addIndex(self, index):
index = str(index)
for k, v in self._rangeIter('data_', 'data_\xff'):
try:
iv = v[index]
except KeyError:
continue
oid = k[5:]
self._addObjectToIndex(index, iv, oid)
self.config['indices'] = self.indices + [index]
self.storeConfig()
def _addObjectToIndex(self, index, indexValue, oid):
if isinstance(indexValue, list):
for val in indexValue:
self._addObjectToIndex(index, val, oid)
else:
indexDbKey = 'by_%s_%s'%(index, normalizeStringIndex(indexValue))
try:
oidsInIndex = set(self._get(indexDbKey))
except KeyError:
oidsInIndex = set(())
oidsInIndex.add(oid)
self._put(indexDbKey, list(oidsInIndex))
def addObject(self, oid, value):
try:
o = self._get('data_%s'%oid)
raise ValueError('already in DB!')
except KeyError:
#good case:
self.setObject(oid, value)
def setObject(self, oid, value):
self._put('data_%s'%oid, value)
for index in self.indices:
try:
iv = getNestedItem(value, index)
except KeyError:
continue
self._addObjectToIndex(index, iv, oid)
def getObject(self, oid):
return self._get('data_%s'%oid)
def getOidsForIndexRange(self, index, start='', stop='\xff'):
index = str(index)
oids = set()
for k, v in self._rangeIter(key_from='by_%s_%s'%(index,normalizeStringIndex(start)), key_to='by_%s_%s'%(index,normalizeStringIndex(stop))):
oids |= set(v)
return oids
def getAllObjects(self):
return self._rangeIter(key_from='data_', key_to='data_\xff')
def getAllOids(self):
for key, _ in self.getAllObjects():
yield key[5:]
def _processOrQuery(self, query):
oids = set()
for subQuery in query:
oids |= self._processAndQuery(subQuery)
return oids
def _processAndQuery(self, query):
425 ↛ 426line 425 didn't jump to line 426, because the condition on line 425 was never true if len(query) == 0:
return set(self.getAllOids())
427 ↛ 428line 427 didn't jump to line 428, because the condition on line 427 was never true if 'oid' in query:
if isinstance(query['oid'], dict):
oidMin = query['oid'].get('$gte', '')
oidMax = query['oid'].get('$lte', '\xff')
else:
return set(query['oid'])
else:
oidMin = ''
oidMax = '\xff'
oids = None
for index, subQuery in query.items():
438 ↛ 439line 438 didn't jump to line 439, because the condition on line 438 was never true if index == 'oid':
continue
440 ↛ 441line 440 didn't jump to line 441, because the condition on line 440 was never true elif index == '$or':
subQueryOids = self._processOrQuery(subQuery)
442 ↛ 443line 442 didn't jump to line 443, because the condition on line 442 was never true elif index not in self.indices:
raise ValueError('no index for %s!'%index)
elif isinstance(subQuery, dict):
if any(op in subQuery for op in ['$gte', '$lte']):
subQueryOids = self.getOidsForIndexRange(index, subQuery.get('$gte', ''), subQuery.get('$lte', '\xff'))
else:
return set(self.getAllOids())
else:
try:
subQueryOids = set(self._get('by_%s_%s'%(index,normalizeStringIndex(subQuery))))
except:
subQueryOids = set()
if oids is None:
oids = subQueryOids
else:
oids &= subQueryOids
if len(oids) == 0:
break #shortcut
return set((oid for oid in oids if oid >= oidMin and oid <= oidMax))
def query(self, query, noTimeout=False, projection=None):
if len(query) == 0:
query = {'oid': {}}
if query.keys() != ['oid']:
oids = self._processAndQuery(query)
def iterator():
for oid in oids:
yield self.getObject(oid)
return enhancedFilter(query, iterator())
oid = query['oid']
473 ↛ 487line 473 didn't jump to line 487, because the condition on line 473 was never false if isinstance(oid, dict):
try:
start = oid['$gte']
except KeyError:
start = ''
try:
end = oid['$lte']
except KeyError:
end = '\xff'
def iterator():
for key, value in self._rangeIter(key_from='data_%s'%start, key_to='data_%s'%end):
yield value
return enhancedFilter(query, iterator())
else:
return enhancedFilter(query, [self.getObject(oid)])
def update(self, query, operation):
n_modified = 0
for match in self.query(query):
new_object = apply_update(match, operation)
if new_object != match:
n_modified += 1
self.setObject(new_object["_oid"], new_object)
return {"nModified": n_modified}
def aggregate(self, pipeline):
current_data = (value for _, value in self.getAllObjects())
for stage in pipeline:
assert len(stage) == 1
(stage_type, stage_config), = stage.items()
current_data = AGGREGATION_STAGES[stage_type](stage_config, current_data)
return list(current_data)
def ensureIndex(self, key):
if key not in self.indices:
self.addIndex(key)
class LevelDBMetaStorage(KVBaseMetaStorage):
"""
MetaStorage with LevelDB backend.
"""
def __init__(self, path):
import leveldb
self.db = leveldb.LevelDB(path)
super(LevelDBMetaStorage, self).__init__()
def _put(self, key, value):
self.db.Put(key, pickle.dumps(value))
def _get(self, key):
return pickle.loads(self.db.Get(key))
def _rangeIter(self, key_from, key_to):
for k, v in self.db.RangeIter(key_from, key_to):
yield k, pickle.loads(v)
class DictMetaStorage(KVBaseMetaStorage):
"""
MetaStorage with :py:class:`dict` backend.
"""
def __init__(self):
self._data = {}
super(DictMetaStorage, self).__init__()
def _put(self, key, value):
self._data[key] = value
def _get(self, key):
return self._data[key]
def _rangeIter(self, key_from, key_to):
keys = sorted(filter(lambda k: k >= key_from and k <= key_to, self._data.keys()))
for k in keys:
yield k, self._data[k]
class MongoDBMetaStorage(MetaStorage):
"""
MetaStorage with MongoDB backend.
"""
def __init__(self, url, dbname='macsServer'):
import pymongo
self.db = pymongo.MongoClient(url, tz_aware=True)[dbname].products
self.db.ensure_index('_oid', unique=True, background=True)
self._pymongo_version = pymongo.version_tuple
def addIndex(self, index):
self.db.create_index(index, background=True)
def ensureIndex(self, index):
self.db.ensure_index(index, background=True)
def addObject(self, oid, value):
value['_oid'] = oid
self.db.insert(value)
def setObject(self, oid, value):
self.db.remove({'_oid': oid})
self.addObject(oid, value)
def getObject(self, oid):
obj = self.db.find_one({'_oid': oid})
if obj is None:
raise KeyError(oid)
return obj
def query(self, query, noTimeout=False, projection=None):
"""
Performs a query
:param query: the query in MongoDB syntax
:param noTimeout: if you have a very slow process, you can set ths
:note: setting ``noTimeout`` will only release the database cursor if the iteration is done until end
"""
extra_kwargs = {}
if self._pymongo_version[0] < 3:
extra_kwargs["timeout"] = not noTimeout
else:
extra_kwargs["no_cursor_timeout"] = noTimeout
if projection is None:
cursor = self.db.find(query, **extra_kwargs)
else:
cursor = self.db.find(query, projection, **extra_kwargs)
return cursor
def update(self, query, update):
return self.db.update(query, update, multi=True)
def aggregate(self, pipeline):
if self._pymongo_version[0] < 3:
res = self.db.aggregate(pipeline, cursor={})
else:
res = self.db.aggregate(pipeline)
return res
|