加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

javaSASL_SSL帐号密码方式访问kafka

(2020-07-30 20:54:54)
java   SASL_SSL 帐号密码  方式访问 kafka

Producer Java Sample java生产者:

Properties props = new Properties();
props.put("bootstrap.servers", "*******:9092,*******:9092");
props.put("acks", "all");//
props.put("retries", 3);
props.put("batch.size", 106384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("security.protocol", "SASL_SSL");
props.put("ssl.truststore.location", "D:/client_truststore.jks");
props.put("ssl.truststore.password", "WSO2_sp440");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf_crm' password='xxxxxxx';"); //注意passwod结尾的分号一定不要漏

props.put("ssl.endpoint.identification.algorithm", "");

long sys = System.currentTimeMillis();

String contractId=CRM_ContractID
String payload = "payload";
Producer producer = new KafkaProducer<>(props);

//Synchronized Mode, Producer will wait and block until Kafka Server return response

try{

Future future =producer.send(new ProducerRecord<>("CRM_Contract", contractId, payload));// (topic, key, payload),the second parameter is the key
future.get();//。 If not care whether success or failure , no need this code

producer.close();

} catch(Exception e) {
e.printStackTrace();// Connection, No Leader error can be resolved by retry; but too large message error will not re-try and throw exception immediately
}

//Asynchronized mode, Producer not wait for response, Background process of Producer submit message to Kafka server by Batch size. It need callback to handle whether message is sent to Kafka Server. If error happen ,need to log the exception.

try{

producer.send(new ProducerRecord<>("CRM_Contract", contractId, payload),new Callback() {

public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());}}});

}catch(Exception e) {

e.printStackTrace();

}



Consumer Java Sample java消费者:    

Properties props = new Properties();

props.put("bootstrap.servers", "*******:9092");

props.put("group.id", "wso2_sp");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "G:\\client_truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "WSO2_sp440");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf_xxx' password='xxxxx';");//注意passwod结尾的分号一定不要漏

props.put("ssl.endpoint.identification.algorithm", "");

KafkaConsumer consumer = new KafkaConsumer<>(props);
String topic = "file_poc";
consumer.subscribe(Arrays.asList(topic));

while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync();
}

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有