加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

flume_实现自定义MysqlSink,写入mysql表

(2020-08-12 23:56:44)
标签:

flume

sink

mysql

maven

分类: bigdata



由于篇幅美观性,将flume实现mysql写入的自定义sink单独一篇文章。


引用sink2的配置
#####发送到本地文件#####
# 订阅Sink
agent2.sinks.k2.type = org.flume.sink.mysql.MysqlSink
agent2.sinks.k2.hostname = 10.19.138.232
agent2.sinks.k2.port = 3306
agent2.sinks.k2.databaseName = internet
agent2.sinks.k2.tableName = i_intelligentcustservice
agent2.sinks.k2.username = username
agent2.sinks.k2.password = password
agent2.sinks.k2.batchSize = 20



吐槽下,IDEA使用MAVEN开发简直不要太好用, 比之前的eclipse好用,应用第三方包,直接在pom.xml里配置,自动下载。
新建项目的话需要设置下mavent仓库的位置。
flume_实现自定义MysqlSink,写入mysql表



项目POM.XML文件,由于格式在博客里不显示,只能贴图片,见谅。

flume_实现自定义MysqlSink,写入mysql表

引用的包具体是,junit、log4j、flume-ng-sdkflume-ng-core、flume-ng-configuration、mysql-connector-; font-family: 宋体; font-size: 9pt;">fastjson



javabean文件

package org.flume.sink.mysql;

public class IntelligentBean {

    private String uid;
    private String operchannelname;
    private String msgcontent;
    private String msgdate;
    private String msgtime;
    private String question;
    private String answer;
    private String entranceid;


    public String getUid() {
        return uid;
    }

    public String getOperchannelname() {
        return operchannelname;
    }

    public String getMsgcontent() {
        return msgcontent;
    }

    public String getMsgdate() {
        return msgdate;
    }

    public String getMsgtime() {
        return msgtime;
    }

    public String getQuestion() {
        return question;
    }

    public String getAnswer() {
        return answer;
    }

    public String getEntranceid() {
        return entranceid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }

    public void setOperchannelname(String operchannelname) {
        this.operchannelname = operchannelname;
    }

    public void setMsgcontent(String msgcontent) {
        this.msgcontent = msgcontent;
    }

    public void setMsgdate(String msgdate) {
        this.msgdate = msgdate;
    }

    public void setMsgtime(String msgtime) {
        this.msgtime = msgtime;
    }

    public void setQuestion(String question) {
        this.question = question;
    }

    public void setAnswer(String answer) {
        this.answer = answer;
    }

    public void setEntranceid(String entranceid) {
        this.entranceid = entranceid;
    }
}



主要的MysqlSink类内容,

package org.flume.sink.mysql;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.sql.*;
import java.util.List;
import java.util.Map;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MysqlSink extends AbstractSink implements Configurable {

    private Logger LOG = LoggerFactory.getLogger(MysqlSink.class);
    private String hostname;
    private String port;
    private String databaseName;
    private String tableName;
    private String username;
    private String password;
    private PreparedStatement preparedStatement;
    private Connection conn;
    private int batchSize; //每次提交的批次大小

    public MysqlSink() {
        LOG.info("MysqlSink start...");
    }

   
    public void configure(Context context) {
        hostname = context.getString("hostname");
        Preconditions.checkNotNull(hostname, "hostname must be set!!");
        port = context.getString("port");
        Preconditions.checkNotNull(port, "port must be set!!");
        databaseName = context.getString("databaseName");
        Preconditions.checkNotNull(databaseName, "databaseName must be set!!");
        tableName = context.getString("tableName");
        Preconditions.checkNotNull(tableName, "tableName must be set!!");
        username = context.getString("username");
        Preconditions.checkNotNull(username, "user must be set!!");
        password = context.getString("password");
        Preconditions.checkNotNull(password, "password must be set!!");
        batchSize = context.getInteger("batchSize", 100);
        Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
    }

    @Override
    public void start() {
        super.start();
        try {
            //调用Class.forName()方法加载驱动程序
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
       // url重连
      String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName + "?autoReconnect=true";
        //调用DriverManager对象的getConnection()方法,获得一个Connection对象

        try {
            conn = DriverManager.getConnection(url, username, password);
            conn.setAutoCommit(false);
            //创建一个Statement对象
            preparedStatement = conn.prepareStatement("insert into " + tableName +
                    " (uid,entranceid,msgcontent,answer,question,operchannelname,msgtime,msgdate,inserttime) values (?,?,?,?,?,?,?,?,now())");
        } catch (SQLException e) {
            e.printStackTrace();
            System.exit(1);
        }

    }

    @Override
    public void stop() {
        super.stop();
        if (preparedStatement != null) {
            try {
                preparedStatement.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (conn != null) {
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    public Status process() throws EventDeliveryException {
        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event = null;
        String content = "";

        transaction.begin();
        List lists = Lists.newArrayList();

        try {
            for (int i = 0; i < batchSize; i++) {
                event = channel.take();
                if (event != null) {
                    content = new String(event.getBody());
                    System.out.println(content+"11111111111111111111111111111111");
                    IntelligentBean intel=new IntelligentBean();
                    if(!"".equals(content)){
                        JSONObject jsonContent = JSONObject.parseObject(content);
                        intel.setUid(jsonContent.getString("uid"));
                        intel.setAnswer(jsonContent.getString("answer"));
                        intel.setEntranceid(jsonContent.getString("entranceId"));
                        intel.setMsgcontent(jsonContent.getString("msgContent"));
                        intel.setMsgdate(jsonContent.getString("msgDate"));
                        intel.setMsgtime(jsonContent.getString("msgTime"));
                        intel.setOperchannelname(jsonContent.getString("operChannelName"));
                        intel.setQuestion(jsonContent.getString("question"));
                        lists.add(intel);
                        }
                } else {
                    result = Status.BACKOFF;
                    break;
                }
            }
            if (lists != null && lists.size() > 0) {
                preparedStatement.clearBatch();
                for (IntelligentBean irl : lists) {
                    preparedStatement.setString(1, irl.getUid());
                    preparedStatement.setString(2, irl.getEntranceid());
                    preparedStatement.setString(3, irl.getMsgcontent());
                    preparedStatement.setString(4, irl.getAnswer());
                    preparedStatement.setString(5, irl.getQuestion());
                    preparedStatement.setString(6, irl.getOperchannelname());
                    preparedStatement.setString(7, irl.getMsgtime());
                    preparedStatement.setString(8, irl.getMsgdate());
                    //java.util.date 转 java.sql.date
                    preparedStatement.addBatch();
                }
                preparedStatement.executeBatch();

                conn.commit();
            }
            transaction.commit();
        } catch (Throwable e) {
            try {
                transaction.rollback();
            } catch (Exception e2) {
                LOG.error("Exception in rollback. Rollback might not have been" +
                        "successful.", e2);
            }
            LOG.error("Failed to commit transaction." +
                    "Transaction rolled back.", e);
            Throwables.propagate(e);
        } finally {
            transaction.close();
        }
        return result;
    }

}


根据写好的程序MysqlSink打包成jar文件,并一并和mysql-connect.jar和fastjson.tar 放到服务器的$FLUME_HOME/lib下即可。


IDEA打包使用,MaventProject->Lifecycle->package ->选中点击上面运行生成。

flume_实现自定义MysqlSink,写入mysql表





0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有