Loading...
Image

python推送数据到kafka的kerberos服务验证(ubuntu)

首页 / 新闻资讯 / 正文

1. 安装必要的依赖包

apt-get install krb5-kdc libkrb5-dev python3-six -y --fix-missing pip3 install gssapi==1.6.6 kafka-python==2.0.1 -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com

2. 获取必要的配置文件krb5.conf、kafka.keytab、jaas.conf放到/etc目录下

3. 配置/etc/hosts地址映射到kafka数据节点

echo "127.0.0.1 kafka-point-01\n127.0.0.1 kafka-point-02\n127.0.0.1 kafka-point-03" >> /etc/hosts

4. 生成kerberos认证票据

kinit -kt /etc/kafka.keytab kafka/bigdata-test-01@TDH

5. 初始化kerberos环境变量

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf"

6. 代码示例

#!/usr/bin/env python # -*- coding:utf-8 -*- import os import time import json from kafka import KafkaProducer   def get_producer():     # 配置地址映射,127.0.0.1为示例     with open('/etc/hosts', 'r') as hosts:         if 'bigdata' not in hosts.read():             os.system('echo "127.0.0.1 kafka-point-01\n127.0.0.1 kafka-point-02\n127.0.0.1 kafka-point-03" >> /etc/hosts')      # kafka鉴权文件软链接到/etc目录     cur_dir = os.path.dirname(os.path.abspath(__file__))  # 自定义该目录     os.system('ln -fs %s/krb5.conf /etc/krb5.conf' % cur_dir)     os.system('ln -fs %s/kafka.keytab /etc/kafka.keytab' % cur_dir)     os.system('ln -fs %s/jaas.conf /etc/jaas.conf' % cur_dir)      # 生成kafka认证密钥,配置系统环境变量     os.system('kinit -kt /etc/kafka.keytab kafka/bigdata-test-01@TDH')     os.environ['KAFKA_OPTS'] = '-Djava.security.auth.login.config=/etc/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf'      producer = KafkaProducer(**{         'bootstrap_servers': ['127.0.0.1:9092', '127.0.0.1:9092', '127.0.0.1:9092'],         'security_protocol': 'SASL_PLAINTEXT',  # 安全协议         'sasl_mechanism': 'GSSAPI',  # SASL机制         'compression_type': 'gzip',  # 压缩方式,可选配置         'api_version': (0, 10, 2),  # API版本         'max_block_ms': 3000,  # 发送请求最大阻塞时间         'value_serializer': lambda value: json.dumps(value).encode(),  # 数据序列化方法         'sasl_kerberos_service_name': 'kafka'  # kerberos服务名称     })      # 等待0.5秒后检测是否连接成功了     time.sleep(0.5)     if not producer.bootstrap_connected():         raise Exception('Connect kafka failed')     return producer  # 因为底层socket的特性,多个进程或者同一进程下的多个线程无法共享一个producer,需要通过消息队列做生产者消费者模型 producer = get_producer() print(producer.send('kafka_topic', {'test': 'test_data'}, partition=0).get(timeout=1)) 

7. 注意的点

pip安装的依赖包是kafka-python,不是kafka,这两个都有,不要混淆了。

因为底层socket的特性,多个进程或者同一进程下的多个线程无法共享一个producer,多线程多进程场景下可以使用redis作为消息中间件实现生产者消费者模型。

使用get_producer获取一次认证后的连接就可以长期使用,票据过期不会导致后续的推送失败。