加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

管道模型pipeline的python实现

(2016-11-24 23:57:16)
标签:

管道

pipeline

python

实现

代码

分类: 设计模式
       管道模型就像管道一样一节一节,然后水通过这一节节管道流到另一头。每一节管道都有各自的功能,对水进行一次次的加工。同时可以在节点之间快速增加一个管道,而不用更改原有的逻辑。
       管道模型的关键部分有两方面:
       (1)编写配置文件定义各个节点,同时设定他们的依赖关系,然后通过拓扑排序重新梳理节点调用的顺序。
       (2)在pipeline主程序中通过importlib动态调用每个节点processor,从而让数据流动起来
      实现的完整demo见:http://download.csdn.net/download/benben044/9692922
      代码实现说明如下:

1、代码目录结构:
管道模型pipeline的python实现

2、编写3个processor文件,其中基类如下:
class Processor(object):
    def __init__(self, args):
        self.args = args

    def pre_check(self):
        raise BaseException("this function is forbidden")

    def process(self):
        raise BaseException("this function is forbidden")

    def close(self):
        pass
其他两个processor分别重载pre_check和process方法,比如FirstProcessor代码如下:

# -*- coding: utf-8 -*-

from processor import Processor

class FirstProcessor(Processor):

    def __init__(self, args):
        Processor.__init__(self, args)
        pass

    def pre_check(self):
        print 'firstProcessor pre_check'
        return '1'

    def process(self):
        print 'firstProcessor processor'
        return '1'

    def close(self):
        pass

3、编写pipeline管道的配置文件,定义各processor的依赖关系
管道模型pipeline的python实现
如果需要添加新的processor,直接在这里配置即可,另外在processor中还可以定义参数。

4、编写解析xml的工具,代码如下:
# -*- coding: utf-8 -*-
from scrapy import Selector
from importlib import  import_module
import networkx
import logging

class Parser(object):
    @staticmethod
    def _get_selector(config):
        with open(config) as f:
            xml = f.read().strip()
            sel = Selector(None, xml, 'xml')
            return sel


class PipelineConfigParser(Parser):

    def __init__(self):
        pass

    @staticmethod
    def _load_object(path):
        try:
            dot = path.rindex('.')
        except ValueError:
            raise ValueError("Error loading object '%s': not a full path" % path)

        module, name = path[:dot], path[dot + 1:]
        mod = import_module(module)

        try:
            obj = getattr(mod, name)
        except AttributeError:
            raise NameError("Module '%s' doesn't define any object named '%s'" % (module, name))

        return obj

    @staticmethod
    def _get_processor_chain(processor_list, orders=None):
        processor_chain = []

        if orders is None:
            orders = []
            for order in processor_list:
                orders.append(order)
        for order in orders:
            processor_chain.append(processor_list.node[order]['processor'])

        return processor_chain

    @staticmethod
    def _parse_class(root):
        class_name = root.xpath('@class').extract()[0]
        args = PipelineConfigParser._parse_args(root)
        mwcls = PipelineConfigParser._load_object(class_name)
        instance = mwcls(args)
        logging.info('loaded ' + class_name)
        return instance

    @staticmethod
    def _parse_args(root):
        args = {}
        for arg in root.xpath('./args/arg'):
            name = arg.xpath('@name').extract()[0]
            value = arg.xpath('./text()').extract()[0]
            args.update({name: value})
        return args

    # parse the config file and generate a pipeline
    @staticmethod
    def parse(config):
        selector = PipelineConfigParser._get_selector(config)
        processors = PipelineConfigParser._parse_processors(selector)
        return processors

    @staticmethod
    def _parse_processors(selector):
        processor_graph = networkx.DiGraph()
        for processor in selector.xpath('//processor'):
            node_name = processor.xpath('@name').extract()[0]   #节点的名字

            #在图中添加节点
            processor_graph.add_node(node_name, processor=PipelineConfigParser._parse_class(processor))

            #处理依赖关系
            for dependency in processor.xpath('./dependencies/dependency'):
                name = dependency.xpath('./text()').extract()[0]
                processor_graph.add_edge(name, node_name)

        node_list = networkx.topological_sort(processor_graph)
        return PipelineConfigParser._get_processor_chain(processor_graph, node_list)

    def close(self):
        pass

上面的精华部分为标红的拓扑排序的使用,借助networkx工具。

5、定义pipeline实现如下:
from config_parser import PipelineConfigParser

class Pipeline(object):
    def __init__(self, name, config):
        self.name = name
        processors = PipelineConfigParser.parse(config)
        self.processors = processors

    def process(self):
        for processor in self.processors:
            if processor.pre_check():
                try:
                    processor.process()
                except Exception,e:
                    print e

6、测试代码如下:
from pipeline.pipeline import Pipeline

pipeline = Pipeline('test', 'my_pipeline_config.xml')
pipeline.process()
      

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有