# -*- coding: utf-8 -*-
from processor import Processor
class FirstProcessor(Processor):
    def __init__(self,
args):
     
  Processor.__init__(self, args)
     
  pass
    def
pre_check(self):
     
  print 'firstProcessor pre_check'
     
  return '1'
    def process(self):
     
  print 'firstProcessor processor'
     
  return '1'
    def close(self):
     
  pass
 
# -*- coding: utf-8 -*-
from scrapy import Selector
from importlib import  import_module
import networkx
import logging
class Parser(object):
    @staticmethod
    def
_get_selector(config):
     
  with open(config) as f:
     
      xml =
f.read().strip()
     
      sel =
Selector(None, xml, 'xml')
     
      return
sel
class PipelineConfigParser(Parser):
    def
__init__(self):
     
  pass
    @staticmethod
    def
_load_object(path):
     
  try:
     
      dot =
path.rindex('.')
     
  except ValueError:
     
      raise
ValueError("Error loading object '%s': not a full path" %
path)
     
  module, name = path[:dot], path[dot + 1:]
     
  mod = import_module(module)
     
  try:
     
      obj =
getattr(mod, name)
     
  except AttributeError:
     
      raise
NameError("Module '%s' doesn't define any object named '%s'" %
(module, name))
     
  return obj
    @staticmethod
    def
_get_processor_chain(processor_list, orders=None):
     
  processor_chain = []
     
  if orders is None:
     
      orders =
[]
     
      for order
in processor_list:
     
     
    orders.append(order)
     
  for order in orders:
     
     
processor_chain.append(processor_list.node[order]['processor'])
     
  return processor_chain
    @staticmethod
    def
_parse_class(root):
     
  class_name =
root.xpath('@class').extract()[0]
     
  args =
PipelineConfigParser._parse_args(root)
     
  mwcls =
PipelineConfigParser._load_object(class_name)
     
  instance = mwcls(args)
     
  logging.info('loaded ' + class_name)
     
  return instance
    @staticmethod
    def
_parse_args(root):
     
  args = {}
     
  for arg in root.xpath('./args/arg'):
     
      name =
arg.xpath('@name').extract()[0]
     
      value =
arg.xpath('./text()').extract()[0]
     
     
args.update({name: value})
     
  return args
    # parse the config file
and generate a pipeline
    @staticmethod
    def parse(config):
     
  selector =
PipelineConfigParser._get_selector(config)
     
  processors =
PipelineConfigParser._parse_processors(selector)
     
  return processors
    @staticmethod
    def
_parse_processors(selector):
     
  processor_graph = networkx.DiGraph()
     
  for processor in
selector.xpath('//processor'):
     
      node_name
= processor.xpath('@name').extract()[0]  
#节点的名字
     
     
#在图中添加节点
     
     
processor_graph.add_node(node_name,
processor=PipelineConfigParser._parse_class(processor))
     
     
#处理依赖关系
     
      for
dependency in processor.xpath('./dependencies/dependency'):
     
     
    name =
dependency.xpath('./text()').extract()[0]
     
     
   
processor_graph.add_edge(name, node_name)
     
  node_list = networkx.topological_sort(processor_graph)
     
  return
PipelineConfigParser._get_processor_chain(processor_graph,
node_list)
    def close(self):
     
  pass
上面的精华部分为标红的拓扑排序的使用,借助networkx工具。
 
from config_parser import PipelineConfigParser
class Pipeline(object):
    def __init__(self, name,
config):
     
  self.name = name
     
  processors =
PipelineConfigParser.parse(config)
     
  self.processors = processors
    def process(self):
     
  for processor in self.processors:
     
      if
processor.pre_check():
     
     
    try:
     
     
     
  processor.process()
     
     
    except Exception,e:
     
     
     
  print e