解决问题
使用日志服务进行数据处理与传递的过程中,你是否遇到如下监测场景不能很好的解决:
- 特定数据上传到日志服务中需要检查数据内的异常情况,而没有现成监控工具?
- 需要检索数据里面的关键字,但数据没有建立索引,无法使用日志服务的告警功能?
- 数据监测要求实时性(<5秒,例如Web访问500错误),而特定功能都有一定延迟(1分钟以上)?
- 存在多个域的多个日志库(例如每个Region的错误文件对应的日志库),数据量不大,但监控逻辑类似,每个目标都要监控与配置,比较繁琐?
如果是的,您可以考虑使用日志服务Python消费组进行跨域实时数据监控,本文主要介绍如何使用消费组实时监控多个域中的多个日志库中的异常数据,并进行下一步告警动作。可以很好解决以上问题,并利用消费组的特点,达到自动平衡、负载均衡和高可用性。

基本概念
协同消费库(Consumer Library)是对日志服务中日志进行消费的高级模式,提供了消费组(ConsumerGroup)的概念对消费端进行抽象和管理,和直接使用SDK进行数据读取的区别在于,用户无需关心日志服务的实现细节,只需要专注于业务逻辑,另外,消费者之间的负载均衡、failover等用户也都无需关心。
消费组(Consumer Group) - 一个消费组由多个消费者构成,同一个消费组下面的消费者共同消费一个logstore中的数据,消费者之间不会重复消费数据。
消费者(Consumer) - 消费组的构成单元,实际承担消费任务,同一个消费组下面的消费者名称必须不同。
在日志服务中,一个logstore下面会有多个shard,协同消费库的功能就是将shard分配给一个消费组下面的消费者,分配方式遵循以下原则:
- 每个shard只会分配到一个消费者。
- 一个消费者可以同时拥有多个shard。
新的消费者加入一个消费组,这个消费组下面的shard从属关系会调整,以达到消费负载均衡的目的,但是上面的分配原则不会变,分配过程对用户透明。
协同消费库的另一个功能是保存checkpoint,方便程序故障恢复时能接着从断点继续消费,从而保证数据不会被重复消费。
使用消费组进行实时分发
这里我们描述用Python使用消费组进行编程,实时跨域监测多个域的多个日志库,全文或特定字段检查
注意:本篇文章的相关代码可能会更新,最新版本在这里可以找到:Github样例.
安装
环境
- 建议程序运行在靠近源日志库同Region下的ECS上,并使用局域网服务入口,这样好处是网络速度最快,其次是读取没有外网费用产生。
- 强烈推荐PyPy3来运行本程序,而不是使用标准CPython解释器。
- 日志服务的Python SDK可以如下安装:
pypy3 -m pip install aliyun-log-python-sdk -U更多SLS Python SDK的使用手册,可以参考这里
程序配置
如下展示如何配置程序:
- 配置程序日志文件,以便后续测试或者诊断可能的问题(跳过,具体参考样例)。
- 基本的日志服务连接与消费组的配置选项。
- 目标Logstore的一些连接信息
请仔细阅读代码中相关注释并根据需要调整选项:
#encoding: utf8 def get_option(): ########################## # 基本选项 ########################## # 从环境变量中加载SLS参数与选项,endpoint、project、logstore可以多个并配对 endpoints = os.environ.get('SLS_ENDPOINTS', '').split(";") # ;分隔 projects = os.environ.get('SLS_PROJECTS', '').split(";") # ;分隔 logstores = os.environ.get('SLS_LOGSTORES', '').split(";") # ;分隔,同一个Project下的用,分隔 accessKeyId = os.environ.get('SLS_AK_ID', '') accessKey = os.environ.get('SLS_AK_KEY', '') consumer_group = os.environ.get('SLS_CG', '') # 消费的起点。这个参数在第一次跑程序的时候有效,后续再次运行将从上一次消费的保存点继续。 # 可以使”begin“,”end“,或者特定的ISO时间格式。 cursor_start_time = "2018-12-26 0:0:0" # 一般不要修改消费者名,尤其是需要并发跑时 consumer_name = "{0}-{1}".format(consumer_group, current_process().pid) # 设定共享执行器 exeuctor = ThreadPoolExecutor(max_workers=2) # 构建多个消费组(每个logstore一个) options = [] for i in range(len(endpoints)): endpoint = endpoints[i].strip() project = projects[i].strip() if not endpoint or not project: logger.error("project: {0} or endpoint {1} is empty, skip".format(project, endpoint)) continue logstore_list = logstores[i].split(",") for logstore in logstore_list: logstore = logstore.strip() if not logstore: logger.error("logstore for project: {0} or endpoint {1} is empty, skip".format(project, endpoint)) continue option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group, consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR, cursor_start_time=cursor_start_time, shared_executor=exeuctor) options.append(option) # 设定检测目标字段与目标值,例如这里是检测status字段是否有500等错误 keywords = {'status': r'5
