Source code for runmacs.processor.metaStorage
# -*- coding: utf-8 -*-
# Filename: metaStorage.py
"""
meta storage for macsserver
===========================
:author: Tobias Kölling
The meta storage provides an abstraction for a database used to store metadata
about the processing of raw data to products.
Current backend implementations include:
* :py:class:`DictMetaStorage`: Using a python :py:class:`dict` as in-memory storage.
This storage can only be used in one process and looses all contents after program
termination. It is useful for e.g. for unittests.
* :py:class:`LevelDBMetaStorage`: Using a `LevelDB <http://leveldb.org/>`_ as on-disk storage.
This storage can only be used in one process at a time, but keeps information after
program termination. It is intended for interactive testing.
* :py:class:`MongoDBMetaStorage`: Using a `MongoDB <https://www.mongodb.com/>`_ as storage.
This storage can be used by multiple processes concurrently and is intended for
production use.
"""
import re
import cPickle as pickle
import datetime
from itertools import ifilter, starmap
from collections import defaultdict
from runmacs.processor.querybuilder import getNestedItem
def normalizeStringIndex(value):
if isinstance(value, int):
return '%08x'%value
elif isinstance(value, datetime.datetime):
return value.isoformat()
else:
return str(value)
def robustCmp(a, b):
try:
return cmp(a, b)
except TypeError:
if a is None:
if b is None:
return 0
return -1
if b is None:
return 1
return 0
def modify_tree(inp, match, f, path=None):
if path is None:
path = []
if match(path, inp):
return f(inp)
if isinstance(inp, list):
return [modify_tree(element, match, f, path+[i])
for i, element
in enumerate(inp)]
if isinstance(inp, dict):
return {k:modify_tree(v, match, f, path+[k])
for k, v in
inp.iteritems()}
return inp
def apply_set(data, operation):
for key, value in operation.items():
search_path = key.split(".")
search_path, elkey = search_path[:-1], search_path[-1]
def modifier(dct):
oldvalue = dct.get(elkey, [])
dct = dct.copy()
dct[elkey] = value
return dct
data = modify_tree(data, lambda path, _: path == search_path, modifier)
return data
def apply_push(data, operation):
for key, value in operation.items():
search_path = key.split(".")
search_path, elkey = search_path[:-1], search_path[-1]
def modifier(dct):
oldvalue = dct.get(elkey, [])
dct = dct.copy()
dct[elkey] = oldvalue + [value]
return dct
data = modify_tree(data,
lambda path, _: path == search_path,
modifier)
return data
def apply_add_to_set(data, operation):
for key, value in operation.items():
search_path = key.split(".")
search_path, elkey = search_path[:-1], search_path[-1]
def modifier(dct):
oldvalue = dct.get(elkey, [])
if value not in oldvalue:
dct = dct.copy()
dct[elkey] = oldvalue + [value]
return dct
data = modify_tree(data,
lambda path, _: path == search_path,
modifier)
return data
def apply_unset(data, operation):
for key, value in operation.items():
search_path = key.split(".")
search_path, elkey = search_path[:-1], search_path[-1]
data = modify_tree(data,
lambda path, _: path == search_path,
lambda dct: {k:v for k,v in dct.items() if k != elkey})
return data
UPDATE_APPLICATORS = {
"$set": apply_set,
"$push": apply_push,
"$addToSet": apply_add_to_set,
"$unset": apply_unset,
}
def apply_update(data, operation):
for opcode, opdata in operation.items():
data = UPDATE_APPLICATORS[opcode](data, opdata)
return data
[docs]class EnhancedIterable(object):
"""
Iterable which behaves more like a MongoDB-Cursor.
"""
def __init__(self, it):
self._limit = None
self._ofs = 0
self.l = list(it)
def __iter__(self):
return iter(self.l[self.s])
def sort(self, key, direction=None):
if isinstance(key, list):
assert len(key) == 1, 'only single key sort is supported'
key, direction = key[0]
elif isinstance(key, tuple):
assert direction is None, 'if key is given as tuple, direction cannot be given'
key, direction = key
self.l = sorted(self.l, cmp=robustCmp, key=lambda x: x.get(key, None), reverse=direction<0)
return self
def limit(self, limit):
self._limit = limit
return self
def skip(self, ofs):
self._ofs = ofs
return self
def batch_size(self, size):
#only matters for remote storage
return self
def count(self):
return len(self.l[self.s])
[docs] def close(self):
"""
EnhancedIterable is Garbage Collected, explicit closing is not needed.
"""
pass
@property
def s(self):
if self._limit is None:
s = slice(self._ofs, None)
else:
s = slice(self._ofs, self._ofs+self._limit)
return s
def filterMatch(query, data):
if isinstance(query, dict):
for key, subquery in query.items():
if key == '$in':
test_data = data
if not isinstance(data, list):
test_data = [data]
if not any(d in subquery for d in test_data):
return False
elif key == '$nin':
if data in subquery:
return False
elif key == '$elemMatch':
if not any(filterMatch(subquery, d) for d in data):
return False
elif key == '$gte':
if not data >= subquery:
return False
elif key == '$lte':
if not data <= subquery:
return False
elif key == '$gt':
if not data > subquery:
return False
elif key == '$lt':
if not data < subquery:
return False
elif key == '$ne':
if isinstance(data, list):
if any(d==subquery for d in data):
return False
else:
if data==subquery:
return False
elif key == '$regex':
if re.search(subquery, data) is None:
return False
else:
if isinstance(data, list):
return any(filterMatch(query, element) for element in data)
return query == data
return True
[docs]def filterQuery(query, data, context=None):
"""
Checks if ``data`` matches ``query``.
:param query: A mongodb-style query.
:param data: A dict/list tree representing mongodb-like data.
:returns: True if query matches the data, False otherwise.
"""
if isinstance(query, dict):
if context is None:
for key, subquery in query.items():
if key == '$or':
if not any(filterQuery(q, data) for q in subquery):
return False
elif key == '$elemMatch':
if not any(filterMatch(subquery, d) for d in data):
return False
else:
ok = False
if isinstance(subquery, dict):
if len(subquery) == 0:
ok = True
if not ok:
if key == 'oid':
key = '_oid'
try:
subdata = getNestedItem(data, key)
except KeyError:
try:
if len(set(subquery.keys()) - set(['$nin', '$ne'])) == 0:
ok = True
continue
except AttributeError:
pass
return False
if not filterMatch(subquery, subdata):
return False
return True
return True
[docs]def enhancedFilter(query, it):
"""
Filters many datasets using a ``query``.
Creates an :py:class:`EnhancedIterable` from and iterable of mongodb-like datasets.
The result will only contain datasets which match the given ``query``.
:param query: A mongodb-style query.
:param it: An iterable over dict/list trees representing mongodb-like data.
:returns: An :py:class:`EnhancedIterable` with matching datasets only.
"""
f = lambda x: filterQuery(query, x)
return EnhancedIterable(ifilter(f, it))
def evaluate_query_expression(expr, data):
if not isinstance(expr, str) or not expr.startswith("$"):
return expr
expr = expr[1:]
return data.get(expr, None)
REDUCTIONS = {
"$sum": sum,
}
def aggregate_group(query, it):
_id = query["_id"]
def mapper(el):
return evaluate_query_expression(_id, el)
acc = defaultdict(list)
for el in it:
key = mapper(el)
if key is None:
continue
acc[key].append(el)
expressions = []
for key, value in query.items():
if key == "_id":
continue
(reduction, expression), = value.items()
expressions.append((key, reduction, expression))
def reducer(key, elements):
res = {"_id": key}
for field, reduction, expression in expressions:
res[field] = REDUCTIONS[reduction](evaluate_query_expression(expression, el)
for el in elements)
return res
return starmap(reducer, acc.items())
def aggregate_unwind(query, it):
for el in it:
parts = evaluate_query_expression(query, el)
if parts is None:
continue
try:
iter(parts)
except TypeError:
parts = [parts]
for part in parts:
res = el.copy()
res[query[1:]] = part
yield res
AGGREGATION_STAGES = {
"$match": enhancedFilter,
"$group": aggregate_group,
"$unwind": aggregate_unwind,
}
[docs]class KVBaseMetaStorage(MetaStorage):
"""
Base class for MetaStorage using key-value based backends.
"""
def __init__(self):
try:
self.config = self._get('_config')
except KeyError:
self.config = {}
def storeConfig(self):
self._put('_config', self.config)
@property
def indices(self):
try:
return self.config['indices']
except KeyError:
return []
def addIndex(self, index):
index = str(index)
for k, v in self._rangeIter('data_', 'data_\xff'):
try:
iv = v[index]
except KeyError:
continue
oid = k[5:]
self._addObjectToIndex(index, iv, oid)
self.config['indices'] = self.indices + [index]
self.storeConfig()
def _addObjectToIndex(self, index, indexValue, oid):
if isinstance(indexValue, list):
for val in indexValue:
self._addObjectToIndex(index, val, oid)
else:
indexDbKey = 'by_%s_%s'%(index, normalizeStringIndex(indexValue))
try:
oidsInIndex = set(self._get(indexDbKey))
except KeyError:
oidsInIndex = set(())
oidsInIndex.add(oid)
self._put(indexDbKey, list(oidsInIndex))
def addObject(self, oid, value):
try:
o = self._get('data_%s'%oid)
raise ValueError('already in DB!')
except KeyError:
#good case:
self.setObject(oid, value)
def setObject(self, oid, value):
self._put('data_%s'%oid, value)
for index in self.indices:
try:
iv = getNestedItem(value, index)
except KeyError:
continue
self._addObjectToIndex(index, iv, oid)
def getObject(self, oid):
return self._get('data_%s'%oid)
def getOidsForIndexRange(self, index, start='', stop='\xff'):
index = str(index)
oids = set()
for k, v in self._rangeIter(key_from='by_%s_%s'%(index,normalizeStringIndex(start)), key_to='by_%s_%s'%(index,normalizeStringIndex(stop))):
oids |= set(v)
return oids
def getAllObjects(self):
return self._rangeIter(key_from='data_', key_to='data_\xff')
def getAllOids(self):
for key, _ in self.getAllObjects():
yield key[5:]
def _processOrQuery(self, query):
oids = set()
for subQuery in query:
oids |= self._processAndQuery(subQuery)
return oids
def _processAndQuery(self, query):
if len(query) == 0:
return set(self.getAllOids())
if 'oid' in query:
if isinstance(query['oid'], dict):
oidMin = query['oid'].get('$gte', '')
oidMax = query['oid'].get('$lte', '\xff')
else:
return set(query['oid'])
else:
oidMin = ''
oidMax = '\xff'
oids = None
for index, subQuery in query.items():
if index == 'oid':
continue
elif index == '$or':
subQueryOids = self._processOrQuery(subQuery)
elif index not in self.indices:
raise ValueError('no index for %s!'%index)
elif isinstance(subQuery, dict):
if any(op in subQuery for op in ['$gte', '$lte']):
subQueryOids = self.getOidsForIndexRange(index, subQuery.get('$gte', ''), subQuery.get('$lte', '\xff'))
else:
return set(self.getAllOids())
else:
try:
subQueryOids = set(self._get('by_%s_%s'%(index,normalizeStringIndex(subQuery))))
except:
subQueryOids = set()
if oids is None:
oids = subQueryOids
else:
oids &= subQueryOids
if len(oids) == 0:
break #shortcut
return set((oid for oid in oids if oid >= oidMin and oid <= oidMax))
def query(self, query, noTimeout=False, projection=None):
if len(query) == 0:
query = {'oid': {}}
if query.keys() != ['oid']:
oids = self._processAndQuery(query)
def iterator():
for oid in oids:
yield self.getObject(oid)
return enhancedFilter(query, iterator())
oid = query['oid']
if isinstance(oid, dict):
try:
start = oid['$gte']
except KeyError:
start = ''
try:
end = oid['$lte']
except KeyError:
end = '\xff'
def iterator():
for key, value in self._rangeIter(key_from='data_%s'%start, key_to='data_%s'%end):
yield value
return enhancedFilter(query, iterator())
else:
return enhancedFilter(query, [self.getObject(oid)])
def update(self, query, operation):
n_modified = 0
for match in self.query(query):
new_object = apply_update(match, operation)
if new_object != match:
n_modified += 1
self.setObject(new_object["_oid"], new_object)
return {"nModified": n_modified}
def aggregate(self, pipeline):
current_data = (value for _, value in self.getAllObjects())
for stage in pipeline:
assert len(stage) == 1
(stage_type, stage_config), = stage.items()
current_data = AGGREGATION_STAGES[stage_type](stage_config, current_data)
return list(current_data)
def ensureIndex(self, key):
if key not in self.indices:
self.addIndex(key)
[docs]class LevelDBMetaStorage(KVBaseMetaStorage):
"""
MetaStorage with LevelDB backend.
"""
def __init__(self, path):
import leveldb
self.db = leveldb.LevelDB(path)
super(LevelDBMetaStorage, self).__init__()
def _put(self, key, value):
self.db.Put(key, pickle.dumps(value))
def _get(self, key):
return pickle.loads(self.db.Get(key))
def _rangeIter(self, key_from, key_to):
for k, v in self.db.RangeIter(key_from, key_to):
yield k, pickle.loads(v)
[docs]class DictMetaStorage(KVBaseMetaStorage):
"""
MetaStorage with :py:class:`dict` backend.
"""
def __init__(self):
self._data = {}
super(DictMetaStorage, self).__init__()
def _put(self, key, value):
self._data[key] = value
def _get(self, key):
return self._data[key]
def _rangeIter(self, key_from, key_to):
keys = sorted(filter(lambda k: k >= key_from and k <= key_to, self._data.keys()))
for k in keys:
yield k, self._data[k]
[docs]class MongoDBMetaStorage(MetaStorage):
"""
MetaStorage with MongoDB backend.
"""
def __init__(self, url, dbname='macsServer'):
import pymongo
self.db = pymongo.MongoClient(url, tz_aware=True)[dbname].products
self.db.ensure_index('_oid', unique=True, background=True)
self._pymongo_version = pymongo.version_tuple
def addIndex(self, index):
self.db.create_index(index, background=True)
def ensureIndex(self, index):
self.db.ensure_index(index, background=True)
def addObject(self, oid, value):
value['_oid'] = oid
self.db.insert(value)
def setObject(self, oid, value):
self.db.remove({'_oid': oid})
self.addObject(oid, value)
def getObject(self, oid):
obj = self.db.find_one({'_oid': oid})
if obj is None:
raise KeyError(oid)
return obj
[docs] def query(self, query, noTimeout=False, projection=None):
"""
Performs a query
:param query: the query in MongoDB syntax
:param noTimeout: if you have a very slow process, you can set ths
:note: setting ``noTimeout`` will only release the database cursor if the iteration is done until end
"""
extra_kwargs = {}
if self._pymongo_version[0] < 3:
extra_kwargs["timeout"] = not noTimeout
else:
extra_kwargs["no_cursor_timeout"] = noTimeout
if projection is None:
cursor = self.db.find(query, **extra_kwargs)
else:
cursor = self.db.find(query, projection, **extra_kwargs)
return cursor
def update(self, query, update):
return self.db.update(query, update, multi=True)
def aggregate(self, pipeline):
if self._pymongo_version[0] < 3:
res = self.db.aggregate(pipeline, cursor={})
else:
res = self.db.aggregate(pipeline)
return res