Source code for runmacs.processor.metaStorage

# -*- coding: utf-8 -*-
# Filename: metaStorage.py
"""
meta storage for macsserver
===========================

:author: Tobias Kölling

The meta storage provides an abstraction for a database used to store metadata
about the processing of raw data to products.

Current backend implementations include:

    * :py:class:`DictMetaStorage`: Using a python :py:class:`dict` as in-memory storage.
      This storage can only be used in one process and looses all contents after program
      termination. It is useful for e.g. for unittests.
    * :py:class:`LevelDBMetaStorage`: Using a `LevelDB <http://leveldb.org/>`_ as on-disk storage.
      This storage can only be used in one process at a time, but keeps information after
      program termination. It is intended for interactive testing.
    * :py:class:`MongoDBMetaStorage`: Using a `MongoDB <https://www.mongodb.com/>`_ as storage.
      This storage can be used by multiple processes concurrently and is intended for
      production use.

"""

import re
import cPickle as pickle
import datetime
from itertools import ifilter, starmap
from collections import defaultdict

from runmacs.processor.querybuilder import getNestedItem

[docs]class MetaStorage(object): """ Base class for every meta storage implementation. """ pass
def normalizeStringIndex(value): if isinstance(value, int): return '%08x'%value elif isinstance(value, datetime.datetime): return value.isoformat() else: return str(value) def robustCmp(a, b): try: return cmp(a, b) except TypeError: if a is None: if b is None: return 0 return -1 if b is None: return 1 return 0 def modify_tree(inp, match, f, path=None): if path is None: path = [] if match(path, inp): return f(inp) if isinstance(inp, list): return [modify_tree(element, match, f, path+[i]) for i, element in enumerate(inp)] if isinstance(inp, dict): return {k:modify_tree(v, match, f, path+[k]) for k, v in inp.iteritems()} return inp def apply_set(data, operation): for key, value in operation.items(): search_path = key.split(".") search_path, elkey = search_path[:-1], search_path[-1] def modifier(dct): oldvalue = dct.get(elkey, []) dct = dct.copy() dct[elkey] = value return dct data = modify_tree(data, lambda path, _: path == search_path, modifier) return data def apply_push(data, operation): for key, value in operation.items(): search_path = key.split(".") search_path, elkey = search_path[:-1], search_path[-1] def modifier(dct): oldvalue = dct.get(elkey, []) dct = dct.copy() dct[elkey] = oldvalue + [value] return dct data = modify_tree(data, lambda path, _: path == search_path, modifier) return data def apply_add_to_set(data, operation): for key, value in operation.items(): search_path = key.split(".") search_path, elkey = search_path[:-1], search_path[-1] def modifier(dct): oldvalue = dct.get(elkey, []) if value not in oldvalue: dct = dct.copy() dct[elkey] = oldvalue + [value] return dct data = modify_tree(data, lambda path, _: path == search_path, modifier) return data def apply_unset(data, operation): for key, value in operation.items(): search_path = key.split(".") search_path, elkey = search_path[:-1], search_path[-1] data = modify_tree(data, lambda path, _: path == search_path, lambda dct: {k:v for k,v in dct.items() if k != elkey}) return data UPDATE_APPLICATORS = { "$set": apply_set, "$push": apply_push, "$addToSet": apply_add_to_set, "$unset": apply_unset, } def apply_update(data, operation): for opcode, opdata in operation.items(): data = UPDATE_APPLICATORS[opcode](data, opdata) return data
[docs]class EnhancedIterable(object): """ Iterable which behaves more like a MongoDB-Cursor. """ def __init__(self, it): self._limit = None self._ofs = 0 self.l = list(it) def __iter__(self): return iter(self.l[self.s]) def sort(self, key, direction=None): if isinstance(key, list): assert len(key) == 1, 'only single key sort is supported' key, direction = key[0] elif isinstance(key, tuple): assert direction is None, 'if key is given as tuple, direction cannot be given' key, direction = key self.l = sorted(self.l, cmp=robustCmp, key=lambda x: x.get(key, None), reverse=direction<0) return self def limit(self, limit): self._limit = limit return self def skip(self, ofs): self._ofs = ofs return self def batch_size(self, size): #only matters for remote storage return self def count(self): return len(self.l[self.s])
[docs] def close(self): """ EnhancedIterable is Garbage Collected, explicit closing is not needed. """ pass
@property def s(self): if self._limit is None: s = slice(self._ofs, None) else: s = slice(self._ofs, self._ofs+self._limit) return s
def filterMatch(query, data): if isinstance(query, dict): for key, subquery in query.items(): if key == '$in': test_data = data if not isinstance(data, list): test_data = [data] if not any(d in subquery for d in test_data): return False elif key == '$nin': if data in subquery: return False elif key == '$elemMatch': if not any(filterMatch(subquery, d) for d in data): return False elif key == '$gte': if not data >= subquery: return False elif key == '$lte': if not data <= subquery: return False elif key == '$gt': if not data > subquery: return False elif key == '$lt': if not data < subquery: return False elif key == '$ne': if isinstance(data, list): if any(d==subquery for d in data): return False else: if data==subquery: return False elif key == '$regex': if re.search(subquery, data) is None: return False else: if isinstance(data, list): return any(filterMatch(query, element) for element in data) return query == data return True
[docs]def filterQuery(query, data, context=None): """ Checks if ``data`` matches ``query``. :param query: A mongodb-style query. :param data: A dict/list tree representing mongodb-like data. :returns: True if query matches the data, False otherwise. """ if isinstance(query, dict): if context is None: for key, subquery in query.items(): if key == '$or': if not any(filterQuery(q, data) for q in subquery): return False elif key == '$elemMatch': if not any(filterMatch(subquery, d) for d in data): return False else: ok = False if isinstance(subquery, dict): if len(subquery) == 0: ok = True if not ok: if key == 'oid': key = '_oid' try: subdata = getNestedItem(data, key) except KeyError: try: if len(set(subquery.keys()) - set(['$nin', '$ne'])) == 0: ok = True continue except AttributeError: pass return False if not filterMatch(subquery, subdata): return False return True return True
[docs]def enhancedFilter(query, it): """ Filters many datasets using a ``query``. Creates an :py:class:`EnhancedIterable` from and iterable of mongodb-like datasets. The result will only contain datasets which match the given ``query``. :param query: A mongodb-style query. :param it: An iterable over dict/list trees representing mongodb-like data. :returns: An :py:class:`EnhancedIterable` with matching datasets only. """ f = lambda x: filterQuery(query, x) return EnhancedIterable(ifilter(f, it))
def evaluate_query_expression(expr, data): if not isinstance(expr, str) or not expr.startswith("$"): return expr expr = expr[1:] return data.get(expr, None) REDUCTIONS = { "$sum": sum, } def aggregate_group(query, it): _id = query["_id"] def mapper(el): return evaluate_query_expression(_id, el) acc = defaultdict(list) for el in it: key = mapper(el) if key is None: continue acc[key].append(el) expressions = [] for key, value in query.items(): if key == "_id": continue (reduction, expression), = value.items() expressions.append((key, reduction, expression)) def reducer(key, elements): res = {"_id": key} for field, reduction, expression in expressions: res[field] = REDUCTIONS[reduction](evaluate_query_expression(expression, el) for el in elements) return res return starmap(reducer, acc.items()) def aggregate_unwind(query, it): for el in it: parts = evaluate_query_expression(query, el) if parts is None: continue try: iter(parts) except TypeError: parts = [parts] for part in parts: res = el.copy() res[query[1:]] = part yield res AGGREGATION_STAGES = { "$match": enhancedFilter, "$group": aggregate_group, "$unwind": aggregate_unwind, }
[docs]class KVBaseMetaStorage(MetaStorage): """ Base class for MetaStorage using key-value based backends. """ def __init__(self): try: self.config = self._get('_config') except KeyError: self.config = {} def storeConfig(self): self._put('_config', self.config) @property def indices(self): try: return self.config['indices'] except KeyError: return [] def addIndex(self, index): index = str(index) for k, v in self._rangeIter('data_', 'data_\xff'): try: iv = v[index] except KeyError: continue oid = k[5:] self._addObjectToIndex(index, iv, oid) self.config['indices'] = self.indices + [index] self.storeConfig() def _addObjectToIndex(self, index, indexValue, oid): if isinstance(indexValue, list): for val in indexValue: self._addObjectToIndex(index, val, oid) else: indexDbKey = 'by_%s_%s'%(index, normalizeStringIndex(indexValue)) try: oidsInIndex = set(self._get(indexDbKey)) except KeyError: oidsInIndex = set(()) oidsInIndex.add(oid) self._put(indexDbKey, list(oidsInIndex)) def addObject(self, oid, value): try: o = self._get('data_%s'%oid) raise ValueError('already in DB!') except KeyError: #good case: self.setObject(oid, value) def setObject(self, oid, value): self._put('data_%s'%oid, value) for index in self.indices: try: iv = getNestedItem(value, index) except KeyError: continue self._addObjectToIndex(index, iv, oid) def getObject(self, oid): return self._get('data_%s'%oid) def getOidsForIndexRange(self, index, start='', stop='\xff'): index = str(index) oids = set() for k, v in self._rangeIter(key_from='by_%s_%s'%(index,normalizeStringIndex(start)), key_to='by_%s_%s'%(index,normalizeStringIndex(stop))): oids |= set(v) return oids def getAllObjects(self): return self._rangeIter(key_from='data_', key_to='data_\xff') def getAllOids(self): for key, _ in self.getAllObjects(): yield key[5:] def _processOrQuery(self, query): oids = set() for subQuery in query: oids |= self._processAndQuery(subQuery) return oids def _processAndQuery(self, query): if len(query) == 0: return set(self.getAllOids()) if 'oid' in query: if isinstance(query['oid'], dict): oidMin = query['oid'].get('$gte', '') oidMax = query['oid'].get('$lte', '\xff') else: return set(query['oid']) else: oidMin = '' oidMax = '\xff' oids = None for index, subQuery in query.items(): if index == 'oid': continue elif index == '$or': subQueryOids = self._processOrQuery(subQuery) elif index not in self.indices: raise ValueError('no index for %s!'%index) elif isinstance(subQuery, dict): if any(op in subQuery for op in ['$gte', '$lte']): subQueryOids = self.getOidsForIndexRange(index, subQuery.get('$gte', ''), subQuery.get('$lte', '\xff')) else: return set(self.getAllOids()) else: try: subQueryOids = set(self._get('by_%s_%s'%(index,normalizeStringIndex(subQuery)))) except: subQueryOids = set() if oids is None: oids = subQueryOids else: oids &= subQueryOids if len(oids) == 0: break #shortcut return set((oid for oid in oids if oid >= oidMin and oid <= oidMax)) def query(self, query, noTimeout=False, projection=None): if len(query) == 0: query = {'oid': {}} if query.keys() != ['oid']: oids = self._processAndQuery(query) def iterator(): for oid in oids: yield self.getObject(oid) return enhancedFilter(query, iterator()) oid = query['oid'] if isinstance(oid, dict): try: start = oid['$gte'] except KeyError: start = '' try: end = oid['$lte'] except KeyError: end = '\xff' def iterator(): for key, value in self._rangeIter(key_from='data_%s'%start, key_to='data_%s'%end): yield value return enhancedFilter(query, iterator()) else: return enhancedFilter(query, [self.getObject(oid)]) def update(self, query, operation): n_modified = 0 for match in self.query(query): new_object = apply_update(match, operation) if new_object != match: n_modified += 1 self.setObject(new_object["_oid"], new_object) return {"nModified": n_modified} def aggregate(self, pipeline): current_data = (value for _, value in self.getAllObjects()) for stage in pipeline: assert len(stage) == 1 (stage_type, stage_config), = stage.items() current_data = AGGREGATION_STAGES[stage_type](stage_config, current_data) return list(current_data) def ensureIndex(self, key): if key not in self.indices: self.addIndex(key)
[docs]class LevelDBMetaStorage(KVBaseMetaStorage): """ MetaStorage with LevelDB backend. """ def __init__(self, path): import leveldb self.db = leveldb.LevelDB(path) super(LevelDBMetaStorage, self).__init__() def _put(self, key, value): self.db.Put(key, pickle.dumps(value)) def _get(self, key): return pickle.loads(self.db.Get(key)) def _rangeIter(self, key_from, key_to): for k, v in self.db.RangeIter(key_from, key_to): yield k, pickle.loads(v)
[docs]class DictMetaStorage(KVBaseMetaStorage): """ MetaStorage with :py:class:`dict` backend. """ def __init__(self): self._data = {} super(DictMetaStorage, self).__init__() def _put(self, key, value): self._data[key] = value def _get(self, key): return self._data[key] def _rangeIter(self, key_from, key_to): keys = sorted(filter(lambda k: k >= key_from and k <= key_to, self._data.keys())) for k in keys: yield k, self._data[k]
[docs]class MongoDBMetaStorage(MetaStorage): """ MetaStorage with MongoDB backend. """ def __init__(self, url, dbname='macsServer'): import pymongo self.db = pymongo.MongoClient(url, tz_aware=True)[dbname].products self.db.ensure_index('_oid', unique=True, background=True) self._pymongo_version = pymongo.version_tuple def addIndex(self, index): self.db.create_index(index, background=True) def ensureIndex(self, index): self.db.ensure_index(index, background=True) def addObject(self, oid, value): value['_oid'] = oid self.db.insert(value) def setObject(self, oid, value): self.db.remove({'_oid': oid}) self.addObject(oid, value) def getObject(self, oid): obj = self.db.find_one({'_oid': oid}) if obj is None: raise KeyError(oid) return obj
[docs] def query(self, query, noTimeout=False, projection=None): """ Performs a query :param query: the query in MongoDB syntax :param noTimeout: if you have a very slow process, you can set ths :note: setting ``noTimeout`` will only release the database cursor if the iteration is done until end """ extra_kwargs = {} if self._pymongo_version[0] < 3: extra_kwargs["timeout"] = not noTimeout else: extra_kwargs["no_cursor_timeout"] = noTimeout if projection is None: cursor = self.db.find(query, **extra_kwargs) else: cursor = self.db.find(query, projection, **extra_kwargs) return cursor
def update(self, query, update): return self.db.update(query, update, multi=True) def aggregate(self, pipeline): if self._pymongo_version[0] < 3: res = self.db.aggregate(pipeline, cursor={}) else: res = self.db.aggregate(pipeline) return res