加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

flume_实现kerberos认证,双sink写入kafka和mysql

(2020-08-12 23:33:58)
标签:

flume

sink

kafka

kerberos

mysql

分类: bigdata


首先,好久没更新博客,原因是自己太懒,最近在搞接口的开发,本人开发水平属于小白级别。本次需要搭建一个HTTP POST接口,接受JSON类型的传参,能够发送数据给kafks服务端和写入mysql数据库,原来想自己编写一个JAVA程序来实现,咨询同事后,得知flume可以实现对应的功能。

本文介绍flume的http source配置,配置两个sink,分别发送数据给kafka和mysql。



1、####配置/etc/hosts,添加kafka和zookeeper地址
##(kafka:TCP 6667    zookeeper:TCP 2181       x3/x4 : TCP/UDP    88)
##kafka
11.11.11.51   c6h551 h551
##zookeeper
11.11.11.201  c6x1   x1


2、##### 设置环境变量


export FLUME_HOME=/home/dams/flume
export FLUME_CONF_DIR=$FLUME_HOME/conf
export JAVA_HOME=/home/jdk1.8.0_241
export KAFKA_HOME=/home/dams/kafka_2.11-1.1.0
export PATH=$JAVA_HOME/bin:$FLUME_HOME/bin:$KAFKA_HOME/bin:$PATH


3、 ###### 配置flume-env.sh:添加
export JAVA_OPTS="-Djava.security.auth.login.config=/home/dams/flume/conf/jaas.conf -Djava.security.krb5.conf=/home/dams/flume/conf/krb5.conf"

###### 配置文件 $FLUME_HOME/config/agent1.conf
agent1.sources = r1
agent1.channels = c1 c2
agent1.sinks = k1 k2
agent1.sources.r1.selector.type = replicating

# 订阅/配置Source源
agent1.sources.r1.type = http
agent1.sources.r1.port = 5555
agent1.sources.r1.handler = org.apache.flume.source.http.JSONHandler
 
agent1.sources.r1.channels = c1 c2
 
# 设置Memmory Channel
agent1.sinks.k1.channel = c1
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 2000
agent1.channels.c1.transactionCapacity = 20
 
agent1.sinks.k2.channel = c2
agent1.channels.c2.type = memory
agent1.channels.c2.capacity = 2000
agent1.channels.c2.transactionCapacity = 20
 
#####发送到Kafka####
# 订阅k1 Sink
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.kafka.topic = cmol_intelligent_service
agent1.sinks.k1.kafka.bootstrap.servers = 11.11.11.51:6667   ([,多个地址])
agent1.sinks.k1.flumeBatchSize = 1
agent1.sinks.k1.flumeBatchSize = 20
agent1.sinks.k1.kafka.producer.security.protocol = SASL_PLAINTEXT
agent1.sinks.k1.kafka.producer.sasl.mechanism = GSSAPI
agent1.sinks.k1.kafka.producer.sasl.kerberos.service.name = ocdp
 
#####发送到本地文件#####
# 订阅Sink2
agent2.sinks.k2.type = org.flume.sink.mysql.MysqlSink
agent2.sinks.k2.hostname = 10.19.138.232
agent2.sinks.k2.port = 3306
agent2.sinks.k2.databaseName = internet
agent2.sinks.k2.tableName = i_intelligentcustservice
agent2.sinks.k2.username = username
agent2.sinks.k2.password = password
agent2.sinks.k2.batchSize = 20

架构图
flume_实现kerberos认证,双sink写入kafka和mysql


4、###配置kerberos认证,用于使用kafka客户端访问
--## 安装kerberos软件
root@zxzt04:[/root]yum install krb5*

--## 替换/etc/krb5.conf文件内容
root@zxzt04:[/home/dams/flume/conf]cp krb5.conf  /etc/

root@zxzt04:[/root]vi /etc/krb5.conf

# Other applications require this directory to perform krb5 configuration.
includedir /etc/krb5.conf.d/


[libdefaults]
  renew_lifetime = 7d
  forwardable = true
  default_realm = BDX.XX.XXXX
  ticket_lifetime = 24h
  dns_lookup_realm = false
  dns_lookup_kdc = false
  default_ccache_name = /tmp/krb5cc_%{uid}
  kdc_timeout = 30000
  udp_preference_limit = 1  ## 禁用UDP通信kdc服务器,默认使用UDP,禁用使用TCP ,处理下面的 错误1

[domain_realm]
  XXXX

[logging]
  XXXX

[realms]
  BDX.XX.XXXX = {
    ...
    kdc = c6x03      ## realm地址
    max_life = 24h
    max_renewable_life = 7d
  }

--## 配置KEY
su - dams
kinit -kt /home/dams/flume/conf/cmol.keytab cmol

dams@zxzt04:[/home/dams/flume/conf]klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: cmol@BDX.XX.XXXX

Valid starting       Expires              Service principal
08/10/2020 23:38:14  08/11/2020 23:38:12  krbtgt/BDX.XX.XXXX@BDX.XX.XXXX
        renew until 08/17/2020 23:38:12
root@zxzt04:[/home/dams/flume/conf]

--## 配置kafka-jaas.conf文件 (该文件目标端维护人员提供)
文件kafka-console-consumer.sh和kafka-console-producer.sh 添加内容:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/home/dams/flume/conf/kafka-jaas.conf"

root@zxzt04:[/root]cat /home/dams/flume/conf/kafka-jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
principal="cmol@BDX.XX.XXXX"
useKeyTab=true
renewTicket=true
keyTab="/home/dams/flume/conf/cmol.keytab"
serviceName="ocdp"
client=true;
};



--##  创建sec配置文件
dams@zxzt04:[/home/dams/flume/conf]cat sec.config 
security.protocol=SASL_PLAINTEXT


--## 尝试访问
dams@zxzt04:[/home/dams/kafka_2.11-1.1.0/bin]./kafka-topics.sh --list --zookeeper 134.80.160.201:2181|grep cmol
cmol_intelligent_service

-- 测试生产者
./kafka-console-producer.sh --broker-list 11.11.11.51:6667   --producer.config /home/dams/flume/conf/sec.config --topic  cmol_intelligent_service
-- 测试消费者
./kafka-console-consumer.sh  --bootstrap-server 11.11.11.51:6667  --consumer.config  /home/dams/flume/conf/sec.config --topic   cmol_intelligent_service

------------------------以上的kerberos设置,是用于kafka客户端使用

-------------------------下面配置,是 flume访问使用
5、#### 配置flume端的kerberos认证
#### JAVA_OPTSS设置krbs flume-env.sh

export JAVA_OPTS="-Xms1024m -Xmx4096m -Djava.security.auth.login.config=/home/dams/flume/conf/kafka-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Dcom.sun.management.jmxremote"

root@zxzt04:[/root]cat /home/dams/flume/conf/kafka-jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
principal="cmol@BDX.XX.XXXX"
useKeyTab=true
renewTicket=true
keyTab="/home/dams/flume/conf/cmol.keytab"
serviceName="ocdp"
client=true;
};

6、####启动flume , -c 读取flume-env.sh配置
flume-ng agent -c ../conf -f ../conf/agent2.conf -n agent2 

--日志安装log4j.properties里的logs配置生成

创建启动文件
dams@zxzt04:[/home/dams/flume]cat start.sh 
nohup flume-ng agent -c ../conf -f ../conf/agent2.conf -n agent2  &

--启动flume
sh start.sh
--关闭flume
直接kill -9 进程即可


7、### 使用curl测试 ,需要注意用于数据是全部写在body里,JSON数据里的双引号使用\转义

curl -X POST  -d '[{ "headers" :{"namenode" : "namenode.example.com","datanode" : "random_datanode.example.com"},"body" : "really_random_body"}]' http://10.19.138.235:5555

curl  -X POST --data '{"dmac": "00:0C:29:EA:39:70", "alert_type": "alarm", "risk": 2, "trojan_name": "Trojan.qq3344", "smac": "00:0C:29:EA:39:66", "sub_alert_type": "trojan", "sport": 11, "id": "153189767146", "desc": "NoSecure 1.2 \u6728\u9a6c\u53d8\u79cd4\u8fde\u63a5\u64cd\u4f5c", "sip": "62.4.07.18", "dip": "139.82.31.91", "rule_id": 123451, "trojan_type": 4, "time": "2018-07-18 15:07:51", "dport": 61621, "detector_id": "170301020011", "os": "Windows", "trojan_id": 50030}' http://10.19.138.235:5555


curl -X POST -d '[{"body" : "{"sun":1234,"agent":22}"}]' http://10.19.138.235:5555

8、###  自定义mysql sinks 写入mysql表

MYSQL 建表语句:
create table i_intelligentcustservice
(
id          bigint unsigned auto_increment comment '主键id' primary key,
uid         varchar(64)  comment '用户id',
entranceid  varchar(64) comment '入口id',
msgcontent  varchar(256) comment '用户发送的消息',
answer      varchar(512) comment '系统回答',
question    varchar(256) comment '标准问题',
operchannelname  varchar(64) comment '运营渠道名称',
msgtime     varchar(64) comment '消息发送时间',
msgdate     varchar(16)  comment '消息发送日期',
inserttime  datetime  comment '插入时间'
) engine=innodb default charset=utf8mb4 collate=utf8mb4_general_ci comment '智能应答交互日志实时回传至省公司的需求-本地留存表';


根据写好的程序MysqlSink打包成jar文件,并一并和mysql-connect.xx.jar和fastjson.xx.tar 放到服务器的$FLUME_HOME/lib下


9、--## 测试,中文添加header,配置UTF-8编码
curl -X POST -H "Content-Type:application/xml;charset=utf-8" -d '[{"body" : "{"uid":"G535SHI5VF6EABLQL4ZTYP45IXJ62SWHFPBIHQY","operChannelName":"APP","msgContent":"添加几个","msgDate":"20200803","msgTime":"1596412239640","question":"成员管理","answer":"亲,诚邀您为移娃点赞,点击右侧大拇指!","entranceId":"87cc5b1e99884cc7a07741bdd8b05e0b"}"}]' http://10.19.138.235:5555

mysql> SELECT * FROM i_intelligentcustservice;
+----+-----------------------------------------+----------------------------------+-----------------------------+--------------------------------------------------------------------------------+-----------------------+-----------------+---------------+----------+---------------------+
| id | uid                                     | entranceid                       | msgcontent                  | answer                                                                         | question              | operchannelname | msgtime       | msgdate  | inserttime          |
+----+-----------------------------------------+----------------------------------+-----------------------------+--------------------------------------------------------------------------------+-----------------------+-----------------+---------------+----------+---------------------+
1 | G535SHI5VF6EABLQL4ZTYP45IXJ62SWHFPBIHQY | 87cc5b1e99884cc7a07741bdd8b05e0b | 添加几个          | 亲,诚邀您为移娃点赞,点击右侧大拇指!                           | 亲成员管理        | APP         | 1596412239640 | 20200803 | 2020-08-12 18:18:31 |
+----+-----------------------------------------+----------------------------------+-----------------------------+--------------------------------------------------------------------------------+-----------------------+-----------------+---------------+----------+---------------------+
1 row in set (0.00 sec)

./kafka-console-consumer.sh  --bootstrap-server 11.11.11.11:6667  --consumer.config  /home/dams/flume/conf/sec.config --topic   cmol_intelligent_service
{"uid":"G535SHI5VF6EABLQL4ZTYP45IXJ62SWHFPBIHQY","operChannelName":"APP","msgContent":"添加几个","msgDate":"20200803","msgTime":"1596412239640","question":"成员管理","answer":"亲,诚邀您为移娃点赞,点击右侧大拇指!","entranceId":"87cc5b1e99884cc7a07741bdd8b05e0b"}"}


测试成功

10、 自定义的MysqlSink见下篇文章



###错误1

dams@zxzt04:[/home/dams/kafka_2.11-1.1.0/bin]./kafka-console-producer.sh --broker-list 11.11.11.51:6667   --producer.config /home/dams/flume/conf/sec.config --topic  cmol_intelligent_service

[2020-08-11 00:18:28,951] ERROR Error when sending message to topic cmol_intelligent_service with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
>[2020-08-11 00:18:35,300] ERROR [Producer clientId=console-producer] Connection to node -1 failed authentication due to: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Receive timed out)]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state. (org.apache.kafka.clients.NetworkClient)
[2020-08-11 00:18:35,303] WARN [Principal=cmol@BDX.SD.CMCC]: TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin)
[2020-08-11 00:19:35,371] ERROR [Producer clientId=console-producer] Connection to node -1 failed authentication due to: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Receive timed out)]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state. (org.apache.kafka.clients.NetworkClient)
[2020-08-11 00:19:35,374] WARN [Principal=cmol@BDX.SD.CMCC]: TGT renewal thread has been interrupted and will exit. (org.apache.kafka.common.security.kerberos.KerberosLogin)

https://github.com/steveloughran/kerberos_and_hadoop/blob/master/sections/errors.md (参考 官方错误信息)


This means the UDP socket awaiting a response from KDC eventually gave up.

The hostname of the KDC is wrong
The IP address of the KDC is wrong
There's nothing at the far end listening for requests.
A firewall on either client or server is blocking UDP packets
Kerberos waits ~90 seconds before timing out, which is a long time to notice there's a problem.

Switch to TCP —at the very least, it will fail faster.
[libdefaults]
  udp_preference_limit = 1
  
测试UDP, nc -vuz 11.11.11.203 88
解决:在/etc/krb5.conf配置文件[libdefaults] 添加 udp_preference_limit = 1 (禁止使用udp), kafka客户端和flume均可访问到远端kafka服务。

udp_preference_limit 参数来支持 TCP 或 UDP 协议配置。如果需要使用 TCP 协议,请对 udp_preference_limit 参数值指定 1 以始终使用 TCP 协议。例如:
udp_preference_limit =1复制代码,如果未指定此参数,那么仅当使用 UDP 协议的 Kerberos 凭单请求失败并且 KDC 返回 KRB_ERR_RESPONSE_TOO_BIG 错误代码时,Java Kerberos 库才使用 TCP 协议。

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有