资讯专栏INFORMATION COLUMN

Python生成器实现数据处理管道

dreamtecher / 3322人阅读

摘要:使用生成器函数是一个实现管道机制问题,你想以数据管道类似管道的方式迭代处理数据。当这些生成器被连在一起后,每个会将一个多带带的数据元素传递给迭代处理管道的下一阶段。

假设现在有如下业务场景

某生产系统的日志文件如下,并且在持续增加...

[ncms@UPZZGAP02 logs]$ pwd
/home/ncms/ncms/logs
[ncms@UPZZGAP02 logs]$ ll
总用量 797020
-rw-rw-r-- 1 ncms ncms 495465795 11月 30 17:10 ansible.log
-rw-rw-r-- 1 ncms ncms   2251937 11月 30 17:10 celery_beat.log
-rw-rw-r-- 1 ncms ncms     16003 11月 15 10:26 celery_flower.log
-rw-rw-r-- 1 ncms ncms   7042114 11月 30 17:10 celery_worker.log
-rw-r--r-- 1 ncms ncms  24665873 11月 30 17:10 db_error.log
-rw-r--r-- 1 ncms ncms  52428571 11月 28 18:46 db_error.log.1
-rw-r--r-- 1 ncms ncms  52428691 11月 24 06:43 db_error.log.2
-rw-r--r-- 1 ncms ncms  22410652 11月 19 15:16 db_error.log.3
-rw-r--r-- 1 ncms ncms  28064985 11月 30 17:10 db_info.log
-rw-r--r-- 1 ncms ncms  52426630 11月 28 13:29 db_info.log.1
-rw-r--r-- 1 ncms ncms  52427357 11月 24 03:48 db_info.log.2
-rw-r--r-- 1 ncms ncms  24276767 11月 19 15:16 db_info.log.3
-rw-rw-r-- 1 ncms ncms     42490 11月 30 13:06 ncms_access.log
-rw-rw-r-- 1 ncms ncms     24072 10月 30 15:33 ncms_error.log
-rw-rw-r-- 1 ncms ncms   1350318 11月 30 16:38 nginx_access.log
-rw-rw-r-- 1 ncms ncms      1685 11月  7 18:15 nginx_error.log
-rw-rw-r-- 1 ncms ncms     24001 11月 15 10:27 supervisord.log
-rw-rw-r-- 1 ncms ncms    645742 11月 30 16:38 uwsgi.log
[ncms@UPZZGAP02 logs]$ du -sh *
473M    ansible.log
2.2M    celery_beat.log
16K    celery_flower.log
6.8M    celery_worker.log
24M    db_error.log
51M    db_error.log.1
51M    db_error.log.2
22M    db_error.log.3
27M    db_info.log
51M    db_info.log.1
51M    db_info.log.2
24M    db_info.log.3
44K    ncms_access.log
24K    ncms_error.log
1.3M    nginx_access.log
4.0K    nginx_error.log
24K    supervisord.log
632K    uwsgi.log
[ncms@UPZZGAP02 logs]$

其中有应用、数据库、Celery、Nginx、uwsgi、supervisord、Ansible的日志,Ansible.log有473M,未来很定会更大。现在需要使用某些关键字对日志进行查找分析,应该如何做?

最简单粗暴的方式就是使用grep之类的命令,递归查找所有的.log文件,但这样会耗费大量内存,影响机器性能。

可以考虑使用数据管道 (类似 Unix 管道) 的方式迭代处理数据。使用Python生成器函数是一个实现管道机制

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "liao gao xiang"

import os
import fnmatch
import gzip
import bz2
import re

# 问题,你想以数据管道 (类似 Unix 管道) 的方式迭代处理数据。比如,你有个大量的数据
# 需要处理,但是不能将它们一次性放入内存中。可以使用生成器实现数据处理管道
""" 文件格式如下
foo/
access-log-012007.gz
access-log-022007.gz
access-log-032007.gz
...
access-log-012008
bar/
access-log-092007.bz2
...
access-log-022008
"""


def gen_find(filepat, top):
    """
    查找符合Shell正则匹配的目录树下的所有文件名
    :param filepat: shell正则
    :param top: 目录路径
    :return: 文件绝对路径生成器
    """
    for path, _, filenames in os.walk(top):
        for file in fnmatch.filter(filenames, filepat):
            yield os.path.join(path, file)


def gen_opener(filenames):
    """
    每打开一个文件生成就生成一个文件对象,调用下一个迭代前关闭文件
    :param filenames: 多个文件绝对路径组成的可迭代对象
    :return: 文件对象生成器
    """
    for filename in filenames:
        if filename.endswith(".gz"):
            f = gzip.open(filename, "r", encoding="utf-8")
        elif filename.endswith(".bz2"):
            f = bz2.open(filename, "r", encoding="utf-8")
        else:
            f = open(filename, "r", encoding="utf-8")
        yield f
        f.close()


def gen_concatenate(iterators):
    """
    将输入序列拼接成一个很长的行序列。
    :param iterators:
    :return: 返回生成器所产生的所有值
    """
    for it in iterators:
        yield from it


def gen_grep(pattern, lines):
    """
    使用正则匹配行
    :param pattern: 正则匹配
    :param lines: 多行
    :return: 结果生成器
    """
    pat = re.compile(pattern)
    for n, line in enumerate(lines, start=1):
        if pat.search(line):
            yield n, line


if __name__ == "__main__":
    filenames = gen_find("*.log", "/home/ncms/ncms/logs")
    files = gen_opener(filenames)
    lines = gen_concatenate(files)
    user_action = gen_grep("(?i)liaogaoxiang_kd", lines)

    for n, line in user_action:
        print(line)

查询包含用户 liaogaoxiang_kd 的所有记录,数据结果如下:

[views:post]:2018-11-07 18:13:09.841490 -users- liaogaoxiang_kd登录成功!

[views:get]:2018-11-07 18:16:04.681519 -users- liaogaoxiang_kd访问了用户信息列表

[views:post]:2018-11-07 18:16:23.866700 -users- liaogaoxiang_kd编辑了用户的信息

[views:get]:2018-11-07 18:16:23.878949 -users- liaogaoxiang_kd访问了用户信息列表

[views:get]:2018-11-07 18:16:25.641090 -users- liaogaoxiang_kd访问了用户信息列表

[views:post]:2018-11-07 18:16:42.671377 -users- liaogaoxiang_kd编辑了用户的信息

[views:get]:2018-11-07 18:16:42.719873 -users- liaogaoxiang_kd访问了用户信息列表

[views:post]:2018-11-08 11:17:42.627693 -users- liaogaoxiang_kd登录成功!

如需查询其它错误信息,只需替换gen_grep("(?i)liaogaoxiang_kd", lines)中的关键字即可!以管道方式处理数据可以用来解决各类其他问题,包括解析,读取实时数据,定时 轮询等。

为了理解上述代码,重点是要明白 yield 语句作为数据的生产者而 for 循环语句 作为数据的消费者。当这些生成器被连在一起后,每个 yield 会将一个多带带的数据元 素传递给迭代处理管道的下一阶段。这种方式一个非常好的特点是每个生成器函数很小并且都是独立的。这样的话就 很容易编写和维护。很多时候,这些函数如果比较通用的话可以在其他场景重复使用。并且最终将这些组件组合起来的代码看上去非常简单,也很容易理解。

使用这种方式的内存效率很高。上述代码即便是在一个超大型文件目录中 也能工作的很好。事实上,由于使用了迭代方式处理,代码运行过程中只需要很小很小的内存。 在调用 gen_concatenate() 函数的时候你可能会有些不太明白。这个函数的目的是将输入序列拼接成一个很长的行序列。 itertools.chain() 函数同样有类似的功能, 但是它需要将所有可迭代对象最为参数传入。在上面这个例子中,你可能会写类似这样 的语句 lines = itertools.chain(*files) ,这将导致 gen_opener() 生成器被提前全部消费掉。但由于 gen_opener() 生成器每次生成一个打开过的文件,等到下一个迭 代步骤时文件就关闭了,因此 chain() 在这里不能这样使用。上面的方案可以避免这 种情况。

gen_concatenate() 函数中出现过 yield from 语句,它将 yield 操作代理到父 生成器上去。语句 yield from it 简单的返回生成器 it 所产生的所有值。

程序员交流群,干货分享,加我拉你入群。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/42721.html

相关文章

  • Python成器实现数据处理管道

    摘要:使用生成器函数是一个实现管道机制问题,你想以数据管道类似管道的方式迭代处理数据。当这些生成器被连在一起后,每个会将一个单独的数据元素传递给迭代处理管道的下一阶段。 假设现在有如下业务场景 某生产系统的日志文件如下,并且在持续增加... [ncms@UPZZGAP02 logs]$ pwd /home/ncms/ncms/logs [ncms@UPZZGAP02 logs]$ ll 总用...

    SwordFly 评论0 收藏0
  • Python成器实现数据处理管道

    摘要:使用生成器函数是一个实现管道机制问题,你想以数据管道类似管道的方式迭代处理数据。当这些生成器被连在一起后,每个会将一个单独的数据元素传递给迭代处理管道的下一阶段。 假设现在有如下业务场景 某生产系统的日志文件如下,并且在持续增加... [ncms@UPZZGAP02 logs]$ pwd /home/ncms/ncms/logs [ncms@UPZZGAP02 logs]$ ll 总用...

    Lin_R 评论0 收藏0
  • python迭代器与成器小结

    摘要:迭代器要说生成器,必须首先说迭代器区分与讲到迭代器,就需要区别几个概念看着都差不多,其实不然。比如常见就是与分离实现的本身是可迭代对象,但不是迭代器,类似与但是又不同。 2016.3.10关于例子解释的补充更新 源自我的博客 例子 老规矩,先上一个代码: def add(s, x): return s + x def gen(): for i in range(...

    hellowoody 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<