flume_实现kerberos认证,双sink写入kafka和mysql

标签:
flumesinkkafkakerberosmysql |
分类: bigdata |
首先,好久没更新博客,原因是自己太懒,最近在搞接口的开发,本人开发水平属于小白级别。本次需要搭建一个HTTP
POST接口,接受JSON类型的传参,能够发送数据给kafks服务端和写入mysql数据库,原来想自己编写一个JAVA程序来实现,咨询同事后,得知flume可以实现对应的功能。
本文介绍flume的http source配置,配置两个sink,分别发送数据给kafka和mysql。
1、####配置/etc/hosts,添加kafka和zookeeper地址
##(kafka:TCP 6667
zookeeper:TCP 2181
x3/x4 :
TCP/UDP 88)
##kafka
11.11.11.51 c6h551
h551
##zookeeper
11.11.11.201 c6x1
x1
2、#####
设置环境变量
export FLUME_HOME=/home/dams/flume
export FLUME_CONF_DIR=$FLUME_HOME/conf
export JAVA_HOME=/home/jdk1.8.0_241
export KAFKA_HOME=/home/dams/kafka_2.11-1.1.0
export
PATH=$JAVA_HOME/bin:$FLUME_HOME/bin:$KAFKA_HOME/bin:$PATH
3、 ######
配置flume-env.sh:添加
export
JAVA_OPTS="-Djava.security.auth.login.config=/home/dams/flume/conf/jaas.conf
-Djava.security.krb5.conf=/home/dams/flume/conf/krb5.conf"
###### 配置文件 $FLUME_HOME/config/agent1.conf
agent1.sources = r1
agent1.channels = c1 c2
agent1.sinks = k1 k2
agent1.sources.r1.selector.type = replicating
# 订阅/配置Source源
agent1.sources.r1.type = http
agent1.sources.r1.port = 5555
agent1.sources.r1.handler =
org.apache.flume.source.http.JSONHandler
agent1.sources.r1.channels = c1 c2
# 设置Memmory Channel
agent1.sinks.k1.channel = c1
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 2000
agent1.channels.c1.transactionCapacity = 20
agent1.sinks.k2.channel = c2
agent1.channels.c2.type = memory
agent1.channels.c2.capacity = 2000
agent1.channels.c2.transactionCapacity = 20
#####发送到Kafka####
# 订阅k1 Sink
agent1.sinks.k1.type =
org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.kafka.topic = cmol_intelligent_service
agent1.sinks.k1.kafka.bootstrap.servers =
11.11.11.51:6667
([,多个地址])
agent1.sinks.k1.flumeBatchSize = 1
agent1.sinks.k1.flumeBatchSize = 20
agent1.sinks.k1.kafka.producer.security.protocol =
SASL_PLAINTEXT
agent1.sinks.k1.kafka.producer.sasl.mechanism = GSSAPI
agent1.sinks.k1.kafka.producer.sasl.kerberos.service.name =
ocdp
#####发送到本地文件#####
# 订阅Sink2
agent2.sinks.k2.type = org.flume.sink.mysql.MysqlSink
agent2.sinks.k2.hostname = 10.19.138.232
agent2.sinks.k2.port = 3306
agent2.sinks.k2.databaseName = internet
agent2.sinks.k2.tableName = i_intelligentcustservice
agent2.sinks.k2.username = username
agent2.sinks.k2.password = password
agent2.sinks.k2.batchSize = 20
架构图
4、###配置kerberos认证,用于使用kafka客户端访问
--## 安装kerberos软件
root@zxzt04:[/root]yum install krb5*
--## 替换/etc/krb5.conf文件内容
root@zxzt04:[/home/dams/flume/conf]cp
krb5.conf /etc/
root@zxzt04:[/root]vi /etc/krb5.conf
# Other applications require this directory to perform krb5
configuration.
includedir /etc/krb5.conf.d/
[libdefaults]
[domain_realm]
[logging]
[realms]
--## 配置KEY
su - dams
kinit -kt /home/dams/flume/conf/cmol.keytab cmol
dams@zxzt04:[/home/dams/flume/conf]klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: cmol@BDX.XX.XXXX
Valid starting
Expires
Service
principal
08/10/2020 23:38:14 08/11/2020
23:38:12 krbtgt/BDX.XX.XXXX@BDX.XX.XXXX
root@zxzt04:[/home/dams/flume/conf]
--## 配置kafka-jaas.conf文件 (该文件目标端维护人员提供)
文件kafka-console-consumer.sh和kafka-console-producer.sh
添加内容:
export KAFKA_OPTS="
-Djava.security.auth.login.config=/home/dams/flume/conf/kafka-jaas.conf"
root@zxzt04:[/root]cat
/home/dams/flume/conf/kafka-jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
principal="cmol@BDX.XX.XXXX"
useKeyTab=true
renewTicket=true
keyTab="/home/dams/flume/conf/cmol.keytab"
serviceName="ocdp"
client=true;
};
--## 创建sec配置文件
dams@zxzt04:[/home/dams/flume/conf]cat
sec.config
security.protocol=SASL_PLAINTEXT
--## 尝试访问
dams@zxzt04:[/home/dams/kafka_2.11-1.1.0/bin]./kafka-topics.sh
--list --zookeeper 134.80.160.201:2181|grep cmol
cmol_intelligent_service
-- 测试生产者
./kafka-console-producer.sh --broker-list
11.11.11.51:6667
--producer.config
/home/dams/flume/conf/sec.config --topic
cmol_intelligent_service
-- 测试消费者
./kafka-console-consumer.sh
--bootstrap-server 11.11.11.51:6667
--consumer.config
/home/dams/flume/conf/sec.config --topic
cmol_intelligent_service
------------------------以上的kerberos设置,是用于kafka客户端使用
-------------------------下面配置,是 flume访问使用
5、#### 配置flume端的kerberos认证
#### JAVA_OPTSS设置krbs flume-env.sh
export JAVA_OPTS="-Xms1024m -Xmx4096m
-Djava.security.auth.login.config=/home/dams/flume/conf/kafka-jaas.conf
-Djava.security.krb5.conf=/etc/krb5.conf
-Dcom.sun.management.jmxremote"
root@zxzt04:[/root]cat
/home/dams/flume/conf/kafka-jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
principal="cmol@BDX.XX.XXXX"
useKeyTab=true
renewTicket=true
keyTab="/home/dams/flume/conf/cmol.keytab"
serviceName="ocdp"
client=true;
};
6、####启动flume , -c 读取flume-env.sh配置
flume-ng agent -c ../conf -f ../conf/agent2.conf -n
agent2
--日志安装log4j.properties里的logs配置生成
创建启动文件
dams@zxzt04:[/home/dams/flume]cat
start.sh
nohup flume-ng agent -c ../conf -f ../conf/agent2.conf -n
agent2 &
--启动flume
sh start.sh
--关闭flume
直接kill -9 进程即可
7、###
使用curl测试 ,需要注意用于数据是全部写在body里,JSON数据里的双引号使用\转义
curl -X POST -d '[{ "headers" :{"namenode" :
"namenode.example.com","datanode" :
"random_datanode.example.com"},"body" : "really_random_body"}]'
http://10.19.138.235:5555
curl -X POST --data '{"dmac":
"00:0C:29:EA:39:70", "alert_type": "alarm", "risk": 2,
"trojan_name": "Trojan.qq3344", "smac": "00:0C:29:EA:39:66",
"sub_alert_type": "trojan", "sport": 11, "id": "153189767146",
"desc": "NoSecure 1.2
\u6728\u9a6c\u53d8\u79cd4\u8fde\u63a5\u64cd\u4f5c", "sip":
"62.4.07.18", "dip": "139.82.31.91", "rule_id": 123451,
"trojan_type": 4, "time": "2018-07-18 15:07:51", "dport": 61621,
"detector_id": "170301020011", "os": "Windows", "trojan_id":
50030}' http://10.19.138.235:5555
curl -X POST -d '[{"body" : "{"sun":1234,"agent":22}"}]'
http://10.19.138.235:5555
8、### 自定义mysql sinks
写入mysql表
MYSQL 建表语句:
create table i_intelligentcustservice
(
id
bigint unsigned
auto_increment comment '主键id' primary key,
uid
varchar(64)
comment '用户id',
entranceid varchar(64) comment '入口id',
msgcontent varchar(256) comment
'用户发送的消息',
answer
varchar(512) comment '系统回答',
question varchar(256)
comment '标准问题',
operchannelname varchar(64) comment
'运营渠道名称',
msgtime
varchar(64) comment '消息发送时间',
msgdate
varchar(16) comment
'消息发送日期',
inserttime datetime
comment '插入时间'
) engine=innodb default charset=utf8mb4
collate=utf8mb4_general_ci comment
'智能应答交互日志实时回传至省公司的需求-本地留存表';
根据写好的程序MysqlSink打包成jar文件,并一并和mysql-connect.xx.jar和fastjson.xx.tar
放到服务器的$FLUME_HOME/lib下
9、--##
测试,中文添加header,配置UTF-8编码
curl -X POST -H "Content-Type:application/xml;charset=utf-8"
-d '[{"body" :
"{"uid":"G535SHI5VF6EABLQL4ZTYP45IXJ62SWHFPBIHQY","operChannelName":"APP","msgContent":"添加几个","msgDate":"20200803","msgTime":"1596412239640","question":"成员管理","answer":"亲,诚邀您为移娃点赞,点击右侧大拇指!","entranceId":"87cc5b1e99884cc7a07741bdd8b05e0b"}"}]'
http://10.19.138.235:5555
mysql> SELECT * FROM i_intelligentcustservice;
+----+-----------------------------------------+----------------------------------+-----------------------------+--------------------------------------------------------------------------------+-----------------------+-----------------+---------------+----------+---------------------+
| id | uid
| entranceid
| msgcontent
| answer
|
question
| operchannelname | msgtime
|
msgdate | inserttime
|
+----+-----------------------------------------+----------------------------------+-----------------------------+--------------------------------------------------------------------------------+-----------------------+-----------------+---------------+----------+---------------------+
| 1 |
G535SHI5VF6EABLQL4ZTYP45IXJ62SWHFPBIHQY |
87cc5b1e99884cc7a07741bdd8b05e0b | 添加几个
| 亲,诚邀您为移娃点赞,点击右侧大拇指!
| 亲成员管理
| APP
| 1596412239640 | 20200803 | 2020-08-12 18:18:31
|
+----+-----------------------------------------+----------------------------------+-----------------------------+--------------------------------------------------------------------------------+-----------------------+-----------------+---------------+----------+---------------------+
1 row in set (0.00 sec)
./kafka-console-consumer.sh
--bootstrap-server 11.11.11.11:6667
--consumer.config
/home/dams/flume/conf/sec.config --topic
cmol_intelligent_service
{"uid":"G535SHI5VF6EABLQL4ZTYP45IXJ62SWHFPBIHQY","operChannelName":"APP","msgContent":"添加几个","msgDate":"20200803","msgTime":"1596412239640","question":"成员管理","answer":"亲,诚邀您为移娃点赞,点击右侧大拇指!","entranceId":"87cc5b1e99884cc7a07741bdd8b05e0b"}"}
测试成功
10、
自定义的MysqlSink见下篇文章
###错误1
dams@zxzt04:[/home/dams/kafka_2.11-1.1.0/bin]./kafka-console-producer.sh
--broker-list 11.11.11.51:6667
--producer.config
/home/dams/flume/conf/sec.config --topic
cmol_intelligent_service
[2020-08-11 00:18:28,951] ERROR Error when sending message to
topic cmol_intelligent_service with key: null, value: 3 bytes with
error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Failed to
update metadata after 60000 ms.
>[2020-08-11 00:18:35,300] ERROR [Producer
clientId=console-producer] Connection to node -1 failed
authentication due to: An error:
(java.security.PrivilegedActionException:
javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level:
Receive timed out)]) occurred when evaluating SASL token received
from the Kafka Broker. Kafka Client will go to
AUTHENTICATION_FAILED state.
(org.apache.kafka.clients.NetworkClient)
[2020-08-11 00:18:35,303] WARN [Principal=cmol@BDX.SD.CMCC]:
TGT renewal thread has been interrupted and will exit.
(org.apache.kafka.common.security.kerberos.KerberosLogin)
[2020-08-11 00:19:35,371] ERROR [Producer
clientId=console-producer] Connection to node -1 failed
authentication due to: An error:
(java.security.PrivilegedActionException:
javax.security.sasl.SaslException: GSS initiate failed [Caused by
GSSException: No valid credentials provided (Mechanism level:
Receive timed out)]) occurred when evaluating SASL token received
from the Kafka Broker. Kafka Client will go to
AUTHENTICATION_FAILED state.
(org.apache.kafka.clients.NetworkClient)
[2020-08-11 00:19:35,374] WARN [Principal=cmol@BDX.SD.CMCC]:
TGT renewal thread has been interrupted and will exit.
(org.apache.kafka.common.security.kerberos.KerberosLogin)
https://github.com/steveloughran/kerberos_and_hadoop/blob/master/sections/errors.md
(参考 官方错误信息)
This means the UDP socket awaiting a response from KDC
eventually gave up.
The hostname of the KDC is wrong
The IP address of the KDC is wrong
There's nothing at the far end listening for requests.
A firewall on either client or server is blocking UDP
packets
Kerberos waits ~90 seconds before timing out, which is a long
time to notice there's a problem.
Switch to TCP —at the very least, it will fail faster.
[libdefaults]
测试UDP, nc -vuz 11.11.11.203 88
解决:在/etc/krb5.conf配置文件[libdefaults] 添加 udp_preference_limit =
1 (禁止使用udp), kafka客户端和flume均可访问到远端kafka服务。
udp_preference_limit 参数来支持 TCP 或 UDP 协议配置。如果需要使用 TCP 协议,请对
udp_preference_limit 参数值指定 1 以始终使用 TCP 协议。例如:
udp_preference_limit =1复制代码,如果未指定此参数,那么仅当使用 UDP 协议的 Kerberos
凭单请求失败并且 KDC 返回 KRB_ERR_RESPONSE_TOO_BIG 错误代码时,Java Kerberos 库才使用
TCP 协议。