# -*- coding: utf-8 -*-
from processor import Processor
class FirstProcessor(Processor):
def __init__(self,
args):
Processor.__init__(self, args)
pass
def
pre_check(self):
print 'firstProcessor pre_check'
return '1'
def process(self):
print 'firstProcessor processor'
return '1'
def close(self):
pass
# -*- coding: utf-8 -*-
from scrapy import Selector
from importlib import import_module
import networkx
import logging
class Parser(object):
@staticmethod
def
_get_selector(config):
with open(config) as f:
xml =
f.read().strip()
sel =
Selector(None, xml, 'xml')
return
sel
class PipelineConfigParser(Parser):
def
__init__(self):
pass
@staticmethod
def
_load_object(path):
try:
dot =
path.rindex('.')
except ValueError:
raise
ValueError("Error loading object '%s': not a full path" %
path)
module, name = path[:dot], path[dot + 1:]
mod = import_module(module)
try:
obj =
getattr(mod, name)
except AttributeError:
raise
NameError("Module '%s' doesn't define any object named '%s'" %
(module, name))
return obj
@staticmethod
def
_get_processor_chain(processor_list, orders=None):
processor_chain = []
if orders is None:
orders =
[]
for order
in processor_list:
orders.append(order)
for order in orders:
processor_chain.append(processor_list.node[order]['processor'])
return processor_chain
@staticmethod
def
_parse_class(root):
class_name =
root.xpath('@class').extract()[0]
args =
PipelineConfigParser._parse_args(root)
mwcls =
PipelineConfigParser._load_object(class_name)
instance = mwcls(args)
logging.info('loaded ' + class_name)
return instance
@staticmethod
def
_parse_args(root):
args = {}
for arg in root.xpath('./args/arg'):
name =
arg.xpath('@name').extract()[0]
value =
arg.xpath('./text()').extract()[0]
args.update({name: value})
return args
# parse the config file
and generate a pipeline
@staticmethod
def parse(config):
selector =
PipelineConfigParser._get_selector(config)
processors =
PipelineConfigParser._parse_processors(selector)
return processors
@staticmethod
def
_parse_processors(selector):
processor_graph = networkx.DiGraph()
for processor in
selector.xpath('//processor'):
node_name
= processor.xpath('@name').extract()[0]
#节点的名字
#在图中添加节点
processor_graph.add_node(node_name,
processor=PipelineConfigParser._parse_class(processor))
#处理依赖关系
for
dependency in processor.xpath('./dependencies/dependency'):
name =
dependency.xpath('./text()').extract()[0]
processor_graph.add_edge(name, node_name)
node_list = networkx.topological_sort(processor_graph)
return
PipelineConfigParser._get_processor_chain(processor_graph,
node_list)
def close(self):
pass
上面的精华部分为标红的拓扑排序的使用,借助networkx工具。
from config_parser import PipelineConfigParser
class Pipeline(object):
def __init__(self, name,
config):
self.name = name
processors =
PipelineConfigParser.parse(config)
self.processors = processors
def process(self):
for processor in self.processors:
if
processor.pre_check():
try:
processor.process()
except Exception,e:
print e