云函数实践:将日志服务的日志投递到自建 Kafka 的 3 个步骤

上文提到 将K8S日志采集到日志服务,这次介绍将采集的日志投递到自建 Kafka 中,用于 Spark 计算。

核心流程

容器日志 -> 日志服务 -> 使用函数处理,将日志投递至自建 Kafka

本文介绍如何创建云函数,将日志投递至 Kafka 中。

1. 创建云函数 SCF

打开 函数服务列表open in new window,基于模板 CLS 数据转存到 Ckafka 创建函数。

虽然模板是投递 Ckafka,不过 Ckafka和 Kafka 兼容性好,所以投递 Kafka 也没问题。

基于模板创建SCF

1.1 填写基础配置

启用私有网络,函数服务使用的 VPC 和 Kafka 所在 VPC 相同。

如果不同,可以使用 对等连接open in new window 解决。

启用 VPC

1.2 函数代码

默认模板会把日志原始数据当成字符串,把每个字符当成一行 message 进行输出(for record in records:),需要调整代码。

不知道是因为我的日志服务原始数据设置的是 JSON 格式,还是当前 CLS 数据转存到 Ckafka 模板过时了

SCF 函数代码 有 3 处代码修改,详见注释,完整代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import time
import logging
import os
import base64
import json
import gzip
import urllib
from kafka import KafkaProducer
from kafka.errors import KafkaError
from StringIO import StringIO

logger = logging.getLogger('kafka')
logger.setLevel(logging.INFO)
 
class ClsToKafka(object):
    """
    CLS 消息投递 kafka
    """

    def __init__(self, host, **kwargs):
        self.host = host

        self.producer = KafkaProducer(bootstrap_servers = [self.host], 
            retries = 10,
            max_in_flight_requests_per_connection = 1, 
            request_timeout_ms = 30000,
            max_block_ms = 60000,
            **kwargs
        )def send(self, topic, records):
        """
        异步生产 kafka 消息
        """

        global count
        count = 0
        def on_send_success(record_metadata):
            global count
            count = count +1

        def on_send_error(excp):
            logger.error('failed to send message', exc_info = excp)

        s_time = time.time()
        
        try:
            ## 修改 1: 原始消息是 JSON 格式(日志服务采集容器服务输出的日志格式是 JSON),每条消息位于 .records(type: List)
            ## for record in records:
            for record in records['records']:
                key = ""# 当 key 为""或者为"None" 时,要传入 key=None,这样 python kafka 库会随机选取一个 partition 写入消息
                if key == "" or key =="None":
                    key = None
                ## 修改 2:record 是 dict,因为原始数据就是 JSON,需要转成 str,否则调用 self.producer.send 会报错 "assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))"
                value = json.dumps(record)

                # 也可以对消息进行处理后再转存
                #value = deal_message(data)

                self.producer.send(topic, key = key, value = value).add_callback(on_send_success).add_errback(on_send_error)

            # block until all async messages are sent
            self.producer.flush()
        except KafkaError as e:
            return e
        finally:
            if self.producer is not None:
                self.producer.close()e_time = time.time()

        return "{} messages delivered in {}s".format(count, e_time - s_time)

# 这里可以对消息进行处理后返回
def deal_message(message):
    return message

def main_handler(event, context):
    kafka_address = os.getenv("kafka_address")
    kafka_topic_name = os.getenv("kafka_topic_name")
  
    kafka_to_kafka = ClsToKafka(
        kafka_address
        #security_protocol = "SASL_PLAINTEXT",
        #sasl_mechanism = "PLAIN",
        #sasl_plain_username = "ckafka-80o10xxx#lkoxx",
        #sasl_plain_password = "ccllxxxx",
        #api_version=(0, 10, 2)
    )event = json.loads(gzip.GzipFile(fileobj=StringIO(event['clslogs']['data'].decode('base64'))).read())# print("type of event: %s" % type(event))
  
    ## 修改 3:直接使用 event 这个字典,便于从字典中获取每条消息的内容
    ## data = json.dumps(event, indent=4, sort_keys=True)
    ## ret = kafka_to_kafka.send(kafka_topic_name,data) 
    ret = kafka_to_kafka.send(kafka_topic_name,event) 
    logger.info(ret)
    return ret
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

1.4 高级设置:环境设置

云函数需要使用 kafka_addresskafka_topic_name 这 2 个变量,在 环境配置 中配好。

设置环境变量 点击创建后,部署成功。

创建SCF成功

2. 为日志服务的日志主题设置函数处理

在日志服务的 日志主题open in new window 页面找到需要投递消息的主题,在 函数处理 TAB 中 选择刚创建的函数即可。

为日志主题设置函数处理 函数处理创建成功。

函数处理设置成功

3. 查看投递到自建 Kafka 的效果

等待 1 分钟后,查看函数每次调用的日志,可以看到调用已成功。

查看SCF的调用日志

同时可以了解整体调用监控数据。

查看SCF的调用监控

自建的 Kakfa 是使用 Cloudera Management 创建的,在 CM 中看到 Topic 已有数据写入。

查看SCF的调用监控

使用命令行也可以看到数据持续写入。

# ./kafka-console-consumer.sh --bootstrap-server 10.0.0.29:9092 --topic scf_topic --offset latest --partition 0
{"content": "{\"Accept\":\"*/*\",\"Body\":\"\",\"Host\":\"header.dev.xxx.cn\",\"Method\":\"GET\",\"Protocol\":\"HTTP/1.1\",\"Referer\":\"\",\"RemoteAddr\":\"172.16.7.71:37468\",\"RequestURI\":\"/\",\"Type\":\"web\",\"UserAgent\":\"Qcloud-boce\",\"X-Forwarded-For\":\"58.87.66.69\"}", "timestamp": 1618716491203428}
{"content": "{\"Accept\":\"*/*\",\"Body\":\"\",\"Host\":\"header.dev.xxx.cn\",\"Method\":\"GET\",\"Protocol\":\"HTTP/1.1\",\"Referer\":\"\",\"RemoteAddr\":\"172.16.7.71:36864\",\"RequestURI\":\"/\",\"Type\":\"curl\",\"UserAgent\":\"Qcloud-curl\",\"X-Forwarded-For\":\"180.163.9.66\"}", "timestamp": 1618716494178403}
...
Processed a total of 9 messages
1
2
3
4
5

FAQ

自建 Kafka 对外提供服务

如果函数调用有日志有如下报错,则证明 Kafka broker 未设置对外可访问的地址,参照 Won’t Connect to My Apache Kafka Clusteropen in new window 修改 advertised.listeners 配置即可。

DNS lookup failed for hadoop-29.com:9092, exception was [Errno -2] Name or service not known. Is your advertised.listeners (called advertised.host.name before Kafka 9) correct and resolvable?

DNS lookup failed for hadoop-29.com:9092 (0)
1
2
3

reference