由于篇幅美观性,将flume实现mysql写入的自定义sink单独一篇文章。
引用sink2的配置
#####发送到本地文件#####
# 订阅Sink
agent2.sinks.k2.type = org.flume.sink.mysql.MysqlSink
agent2.sinks.k2.hostname = 10.19.138.232
agent2.sinks.k2.port = 3306
agent2.sinks.k2.databaseName = internet
agent2.sinks.k2.tableName = i_intelligentcustservice
agent2.sinks.k2.username = username
agent2.sinks.k2.password = password
agent2.sinks.k2.batchSize = 20
吐槽下,IDEA使用MAVEN开发简直不要太好用,
比之前的eclipse好用,应用第三方包,直接在pom.xml里配置,自动下载。
新建项目的话需要设置下mavent仓库的位置。
项目POM.XML文件,由于格式在博客里不显示,只能贴图片,见谅。
引用的包具体是,junit、log4j、flume-ng-sdk、flume-ng-core、flume-ng-configuration、mysql-connector-;
font-family: 宋体; font-size: 9pt;">fastjson
javabean文件
package org.flume.sink.mysql;
public class IntelligentBean {
private String
uid;
private String
operchannelname;
private String
msgcontent;
private String
msgdate;
private String
msgtime;
private String
question;
private String
answer;
private String
entranceid;
public String getUid()
{
return uid;
}
public String
getOperchannelname() {
return operchannelname;
}
public String
getMsgcontent() {
return msgcontent;
}
public String
getMsgdate() {
return msgdate;
}
public String
getMsgtime() {
return msgtime;
}
public String
getQuestion() {
return question;
}
public String
getAnswer() {
return answer;
}
public String
getEntranceid() {
return entranceid;
}
public void
setUid(String uid) {
this.uid = uid;
}
public void
setOperchannelname(String operchannelname) {
this.operchannelname = operchannelname;
}
public void
setMsgcontent(String msgcontent) {
this.msgcontent = msgcontent;
}
public void
setMsgdate(String msgdate) {
this.msgdate = msgdate;
}
public void
setMsgtime(String msgtime) {
this.msgtime = msgtime;
}
public void
setQuestion(String question) {
this.question = question;
}
public void
setAnswer(String answer) {
this.answer = answer;
}
public void
setEntranceid(String entranceid) {
this.entranceid = entranceid;
}
}
主要的MysqlSink类内容,
package org.flume.sink.mysql;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.sql.*;
import java.util.List;
import java.util.Map;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MysqlSink extends AbstractSink implements
Configurable {
private Logger LOG =
LoggerFactory.getLogger(MysqlSink.class);
private String
hostname;
private String
port;
private String
databaseName;
private String
tableName;
private String
username;
private String
password;
private
PreparedStatement preparedStatement;
private Connection
conn;
private int batchSize;
//每次提交的批次大小
public MysqlSink()
{
LOG.info("MysqlSink start...");
}
public void
configure(Context context) {
hostname = context.getString("hostname");
Preconditions.checkNotNull(hostname, "hostname
must be set!!");
port = context.getString("port");
Preconditions.checkNotNull(port, "port must be
set!!");
databaseName =
context.getString("databaseName");
Preconditions.checkNotNull(databaseName,
"databaseName must be set!!");
tableName =
context.getString("tableName");
Preconditions.checkNotNull(tableName, "tableName
must be set!!");
username = context.getString("username");
Preconditions.checkNotNull(username, "user must
be set!!");
password = context.getString("password");
Preconditions.checkNotNull(password, "password
must be set!!");
batchSize = context.getInteger("batchSize",
100);
Preconditions.checkNotNull(batchSize > 0,
"batchSize must be a positive number!!");
}
@Override
public void start()
{
super.start();
try {
//调用Class.forName()方法加载驱动程序
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
// url重连
String url = "jdbc:mysql://" + hostname + ":" + port + "/" +
databaseName + "?autoReconnect=true";
//调用DriverManager对象的getConnection()方法,获得一个Connection对象
try {
conn =
DriverManager.getConnection(url, username, password);
conn.setAutoCommit(false);
//创建一个Statement对象
preparedStatement = conn.prepareStatement("insert into " +
tableName +
"
(uid,entranceid,msgcontent,answer,question,operchannelname,msgtime,msgdate,inserttime)
values (?,?,?,?,?,?,?,?,now())");
} catch (SQLException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void stop()
{
super.stop();
if (preparedStatement != null) {
try
{
preparedStatement.close();
} catch
(SQLException e) {
e.printStackTrace();
}
}
if (conn != null) {
try
{
conn.close();
} catch
(SQLException e) {
e.printStackTrace();
}
}
}
public Status process()
throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction =
channel.getTransaction();
Event event = null;
String content = "";
transaction.begin();
List lists = Lists.newArrayList();
try {
for (int i
= 0; i < batchSize; i++) {
event = channel.take();
if (event != null) {
content = new String(event.getBody());
System.out.println(content+"11111111111111111111111111111111");
IntelligentBean intel=new
IntelligentBean();
if(!"".equals(content)){
JSONObject
jsonContent = JSONObject.parseObject(content);
intel.setUid(jsonContent.getString("uid"));
intel.setAnswer(jsonContent.getString("answer"));
intel.setEntranceid(jsonContent.getString("entranceId"));
intel.setMsgcontent(jsonContent.getString("msgContent"));
intel.setMsgdate(jsonContent.getString("msgDate"));
intel.setMsgtime(jsonContent.getString("msgTime"));
intel.setOperchannelname(jsonContent.getString("operChannelName"));
intel.setQuestion(jsonContent.getString("question"));
lists.add(intel);
}
} else {
result = Status.BACKOFF;
break;
}
}
if (lists
!= null && lists.size() > 0) {
preparedStatement.clearBatch();
for (IntelligentBean irl :
lists) {
preparedStatement.setString(1,
irl.getUid());
preparedStatement.setString(2,
irl.getEntranceid());
preparedStatement.setString(3,
irl.getMsgcontent());
preparedStatement.setString(4,
irl.getAnswer());
preparedStatement.setString(5,
irl.getQuestion());
preparedStatement.setString(6,
irl.getOperchannelname());
preparedStatement.setString(7,
irl.getMsgtime());
preparedStatement.setString(8,
irl.getMsgdate());
//java.util.date 转 java.sql.date
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
conn.commit();
}
transaction.commit();
} catch (Throwable e) {
try
{
transaction.rollback();
} catch
(Exception e2) {
LOG.error("Exception in
rollback. Rollback might not have been" +
"successful.", e2);
}
LOG.error("Failed to commit transaction." +
"Transaction rolled back.", e);
Throwables.propagate(e);
} finally {
transaction.close();
}
return result;
}
}
根据写好的程序MysqlSink打包成jar文件,并一并和mysql-connect.jar和fastjson.tar
放到服务器的$FLUME_HOME/lib下即可。
IDEA打包使用,MaventProject->Lifecycle->package
->选中点击上面运行生成。
加载中,请稍候......