# -*- coding: utf-8 -*-
# Filename: productBuilder.py
"""
product builder for macsProcessor
=================================
This is part of the macsProcessor suite.
Copyright (C) 2014 Tobias Kölling
"""
import argparse
import os
import yaml
import runmacs.processor.product as product
import runmacs.processor.metaStorage as metaStorage
from runmacs.processor.productaccessor import ProductAccessor
from runmacs.processor.productregistry import get_default_registry
from runmacs.processor.collection import find_collections_in_folder, collection_from_dict
from runmacs.processor.querybuilder import processQuery
from runmacs.processor.buildhelpers import *
import logging
import pymongo
logger = logging.getLogger(__name__)
batchSize = 1
class ProductBuilder(object):
"""
The product builder is responsible for discovering new products.
:param db: MetaStorage to be used as product database
:param registry: ProductRegistry containing all desired products
:param accessor: ProductAccessor to get available products from the database
"""
def __init__(self, db, registry, collections=None, accessor=None):
self.db = db
self.registry = registry
44 ↛ 45line 44 didn't jump to line 45, because the condition on line 44 was never true if accessor is None:
self.accessor = ProductAccessor(db)
else:
self.accessor = accessor
if collections is None:
self.collections = []
else:
self.collections = collections
self.needToScan = True
self.needToCrawl = []
self.rescanAll = False
self.rescanProduct = None
self.logger = logging.getLogger(__name__ + '.ProductBuilder')
self.db.ensureIndex('_creationRev')
self.db.ensureIndex('_oid')
self.db.ensureIndex('collections')
self.db.ensureIndex('componentOids')
self.db.ensureIndex('is_primary')
self.db.ensureIndex('filename')
self.db.ensureIndex('date') # sorting by date is common, so this index is generally sensible
for index in product.Product.requiredIndices:
self.db.ensureIndex(index)
try:
self.dbState = self.db.getObject('__dbState__')
except KeyError:
self.dbState = {'inProcessRev': 0,
'completeRev': -1}
self.incInProcessRev()
def incInProcessRev(self):
self.inProcessRev = self.dbState['inProcessRev'] + 1
self.dbState['inProcessRev'] = self.inProcessRev
self.db.setObject('__dbState__', self.dbState)
def buildProduct(self, prototype, extraQuery, primaryComponent=None):
if prototype.building is False:
needed = None
elif primaryComponent is None:
needed = prototype.whatIsNeeded()
else:
needed = prototype.whatIsNeeded(primaryComponent)
if len(prototype.components) > 0:
component_collections = [set(component.collections)
for component in prototype.components.values()]
collections = component_collections[0].intersection(*component_collections[1:])
else:
collections = None
if needed is None:
#product built completely -> store it
95 ↛ 96line 95 didn't jump to line 96, because the condition on line 95 was never true if prototype.building:
raise RuntimeError('prototype does not need anything but is still building')
p = prototype #for clarity, we do have a product now
print "FOUND PRODUCT:", p
oid = p.hash
try:
objectInDb = self.db.getObject(oid)
except KeyError:
try:
ps = p.productSpec
except ValueError:
logger.exception('could not get product spec, skipping')
return
ps['_creationRev'] = self.inProcessRev
ps['_oid'] = oid
110 ↛ 112line 110 didn't jump to line 112, because the condition on line 110 was never false if collections is not None:
ps['collections'] = list(collections)
self.db.addObject(oid, ps)
self.needToScan = True
print "stored new product (%s)"%p.productClassName()
return
query = needed['query']
sort = None
limit = None
if isinstance(query, tuple):
query, queryModifiers = query
121 ↛ 123line 121 didn't jump to line 123, because the condition on line 121 was never false if 'sortBy' in queryModifiers:
sort = queryModifiers['sortBy']
123 ↛ 125line 123 didn't jump to line 125, because the condition on line 123 was never false if 'limit' in queryModifiers:
limit = queryModifiers['limit']
representation = needed['representation']
neededComponentName = needed['componentName']
for k, v in extraQuery.items():
query[k] = v
if collections is not None:
query["collections"] = {"$in": list(collections)}
foundSomething = False
subExtraQuery = extraQuery.copy()
subExtraQuery['_creationRev'] = {'$gte': -1}
with self.accessor.query(query, limit=limit, sort=sort, batchSize=batchSize) as cursor:
135 ↛ 140line 135 didn't jump to line 140, because the condition on line 135 was never false if representation == 'single':
for prod in cursor:
foundSomething = True
for sub in QueryResolutionIterator(prototype, neededComponentName, prod, self.accessor):
self.buildProduct(sub, subExtraQuery)
elif representation == 'list':
products = list(cursor)
if len(products) > 0:
foundSomething = True
for sub in QueryResolutionIterator(prototype, neededComponentName, products, self.accessor):
self.buildProduct(sub, subExtraQuery)
else:
raise ValueError('unknown representation "%s"!'%representation)
#if not foundSomething:
# self.logger.debug('found nothing for query "%s" from %s', query, prototype.productClassName())
def run_crawl_request(self, crawl):
products = []
pathPrefixLen = len(crawl.folder+os.sep)
for collection in self.collections:
alreadyImported = get_already_imported_files_of(crawl.base_product,
self.registry,
self.accessor,
collection=collection.cid)
for filesource in collection.files:
if not filesource.place == crawl.place:
continue
path = os.path.join(crawl.folder, filesource.filename)
pathSuffix = path[pathPrefixLen:]
164 ↛ 165line 164 didn't jump to line 165, because the condition on line 164 was never true if pathSuffix in alreadyImported:
print "haz", pathSuffix
continue
try:
p = build_product_from_filename(pathSuffix,
crawl.place,
crawl.base_product,
self.registry,
self.accessor)
except ValueError:
#print "no Product for %s"%path
continue
oid = p.hash
try:
objectInDb = self.db.getObject(oid)
except KeyError:
ps = p.productSpec
ps['_creationRev'] = self.inProcessRev
ps['_oid'] = oid
ps['collections'] = [collection.cid]
ps['is_primary'] = True
self.db.addObject(oid, ps)
else:
if collection.cid not in objectInDb.get('collections', []):
self.db.update({'_oid': oid},
{"$addToSet": {"collections": collection.cid}})
if not objectInDb.get('is_primary', False):
self.db.update({'_oid': oid},
{"$set": {"is_primary": True}})
def propagate_collections(self, collections):
did_change = False
for collection in collections:
in_collection = [res["_oid"]
for res
in self.db.query(query={"collections": collection},
projection={"_oid": 1})]
missing_query = {"componentOids": {"$elemMatch": {"$elemMatch": {"$in": in_collection}}},
"collections": {"$nin": [collection]},
"is_primary": {"$ne": True}}
maybe_missing_collection = self.db.query(missing_query)
for candidate in maybe_missing_collection:
component_oids = [oid for name, oid in candidate["componentOids"]]
component_collections = [set(res.get("collections",[]))
for res
in self.db.query({"_oid": {"$in": component_oids}})]
desired_collections = component_collections[0].intersection(
*component_collections[1:])
additional_collections = desired_collections - set(candidate.get("collections", []))
if len(additional_collections) > 0:
for additional_collection in additional_collections:
self.db.update({"_oid": candidate["_oid"]},
{"$addToSet":
{"collections": additional_collection}})
did_change = True
return did_change
def run(self):
for crawl in self.needToCrawl:
self.run_crawl_request(crawl)
collection_ids = set([collection.cid for collection in self.collections])
while self.propagate_collections(collection_ids):
pass
derrivedProducts = list(self.registry.find_derived_products())
while self.needToScan:
if self.rescanAll:
minQueryRev = -1
self.rescanAll = False
else:
minQueryRev = self.dbState['completeRev']
#not very elegant but needed as long as products query not for every possible input on first query
#minQueryRev = -1
self.incInProcessRev()
self.needToScan = False
for dp in derrivedProducts:
prototype = dp.build()
_minQueryRev = minQueryRev
246 ↛ 247line 246 didn't jump to line 247, because the condition on line 246 was never true if self.rescanProduct is not None and self.rescanProduct.lower() in dp.productClassName().lower():
minQueryRev = -1
try:
componentNames = prototype.componentNames
except AttributeError:
self.buildProduct(prototype, {'_creationRev': {'$gte': minQueryRev}})
else:
for primaryComponent in componentNames:
self.buildProduct(prototype, {'_creationRev': {'$gte': minQueryRev}}, primaryComponent)
minQueryRev = _minQueryRev
self.dbState['completeRev'] = self.inProcessRev
self.db.setObject('__dbState__', self.dbState)
self.rescanProduct = None
class QueryResolutionIterator(object):
def __init__(self, prototype, componentName, component, accessor):
self.prototype = prototype
self.componentName = componentName
self.component = component
self.accessor = accessor
def __iter__(self):
it = iter(self.prototype.tryToAdd(self.componentName, self.component))
needMoreData = False
while True:
try:
if needMoreData:
needMoreData = False
try:
nextData = iter(self.accessor.query(query, limit=limit, sort=sort, batchSize=1)).next()
except StopIteration:
res = it.throw(DataNotFoundError('no matching data found'))
else:
res = it.send(nextData)
else:
res = it.next()
if getattr(res, 'needMoreData', False):
needMoreData = True
query = res.query
sort = None
limit = 1
286 ↛ 269line 286 didn't jump to line 269, because the condition on line 286 was never false if isinstance(query, tuple):
query, queryModifiers = query
288 ↛ 269line 288 didn't jump to line 269, because the condition on line 288 was never false if 'sortBy' in queryModifiers:
sort = queryModifiers['sortBy']
else:
yield res
292 ↛ 294line 292 didn't jump to line 294 except StopIteration:
break
except:
print "error during processing of component %s (%s) for prototype %s"%(self.componentName, self.component.hash, self.prototype)
raise
class CrawlRequest(object):
def __init__(self, folder, base_product, place):
self.folder = folder
self.base_product = base_product
self.place = place
def get_already_imported_files_of(baseclass, registry, accessor, collection=None):
fns = set()
query = {}
if collection is not None:
query["collections"] = collection
for c in registry.find_subclasses(baseclass):
if hasattr(c, 'getCoveredFilenames'):
query["productClass"] = c.productClassName()
for p in accessor.query(query):
fns |= set(p.getCoveredFilenames())
return fns
def build_product_from_filename(filename, place, baseclass, registry, accessor):
filaname = os.path.abspath(filename)
for c in registry.find_subclasses(baseclass):
if c.updateProduct != baseclass.updateProduct:
#separate normal and update file calls
continue
try:
return c.fromFilename(filename, place, accessor)
except ValueError:
continue
raise ValueError('product class not found')
if __name__ == '__main__':
from runmacs.processor.config import config
logging.basicConfig(level=logging.DEBUG)
parser = argparse.ArgumentParser(description='generates macs products')
parser.add_argument('--crawl',dest='crawl',action='store_true')
parser.add_argument('--skip-crawl',dest='crawl',action='store_false')
parser.set_defaults(crawl=True)
parser.add_argument('--crawl-dir',dest='crawlDir',type=str,default='.')
parser.add_argument('--rescan',dest='rescan',action='store_true')
parser.add_argument('--no-rescan',dest='rescan',action='store_false')
parser.set_defaults(rescan=False)
#shoud we generate derrived products?
parser.add_argument('--derrive',dest='derrive',action='store_true')
parser.add_argument('--no-derrive',dest='derrive',action='store_false')
parser.set_defaults(derrive=True)
parser.add_argument('--production',dest='production',action='store_true')
parser.add_argument('--devel',dest='production',action='store_false')
parser.set_defaults(production=False)
parser.add_argument('--collection', default=None, type=int, help="collection id to crawl")
parser.add_argument('--rescan-product', type=str)
parser.add_argument('--ldb',dest='ldb',default=False,action='store_true')
args = parser.parse_args()
if args.production:
if config['database']['type'] == 'mongodb':
db = metaStorage.MongoDBMetaStorage(config['database']['connection'], config['database']['collection'])
else:
raise RuntimeError('unknown database type "%s"!'%config['database']['type'])
#from raven.handlers.logging import SentryHandler
#handler = SentryHandler('http://4d1399ca040f413d892f2da03a418771:a39aaab9d0754bce910aba7eb06baf27@blesshuhn.meteo.physik.uni-muenchen.de:8001/3')
#from raven.conf import setup_logging
#setup_logging(handler)
crawlDirs = config['data']
else:
if args.ldb:
db = metaStorage.LevelDBMetaStorage('products.ldb')
else:
db = metaStorage.MongoDBMetaStorage('mongodb://blesshuhn:27017/', 'macsServer_devel')
crawlDirs = {}
for crawlPlace, params in crawlDirs.items():
if params['method'] != 'local' and args.crawl:
logger.error('cannot crawl "%s" source method "%s", skipping it!', crawlPlace, params['method'])
continue
crawlDirs = []
if args.crawl:
crawlDirs.append(CrawlRequest(params['folder'],
product.ImportedFileProduct,
crawlPlace))
if 'updatesFolder' in params:
crawlDirs.append(CrawlRequest(params['updatesFolder'],
product.UpdateFileProduct,
crawlPlace))
if args.collection is None:
collections = find_collections_in_folder(os.path.join(params['folder'],
'..',
'collections'))
else:
with open(os.path.join(params['folder'], '..', 'collections', 'collection_{}.yaml'.format(args.collection))) as collfile:
collections = [collection_from_dict(args.collection, yaml.load(collfile))]
pb = ProductBuilder(db,
registry=get_default_registry(),
collections=collections)
pb.needToCrawl = crawlDirs
pb.needToScan = args.derrive
pb.rescanAll = args.rescan
pb.rescanProduct = args.rescan_product
logger.info('productBuilder start for crawl place "%s"!', crawlPlace)
pb.run()
|