一.项目工程结构
二.详细代码
KafkaUtils.java
package com.bijian.study;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
public class KafkaUtils {
private static Producer producer;
private static Consumer consumer;
private KafkaUtils() {
}
/**
* 生产者,注意kafka生产者不能够从代码上生成主题,只有在服务器上用命令生成
*/
static {
Properties props = new Properties();
props.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);// 服务器ip:端口号,集群用逗号分隔
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer","org.mon.serialization.StringSerializer");
props.put("value.serializer","org.mon.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
/**
* 消费者
*/
static {
Properties props = new Properties();
props.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);// 服务器ip:端口号,集群用逗号分隔
props.put("group.id", "test");
props.put("mit", "true");
props.put("mit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer","org.mon.serialization.StringDeserializer");
props.put("value.deserializer","org.mon.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(Constants.TOPIC_NAME));
}
/**
* 发送对象消息 至kafka上,调用json转化为json字符串,应为kafka存储的是String
*/
public static void sendMsgToKafka(IpranAlarm msg) {
producer.send(new ProducerRecord(Constants.TOPIC_NAME, String.valueOf(new Date().getTime()), JSON.toJSONString(msg)));
System.out.println("向kafka发送的消息是:" + msg.toString());
}
/**
* 从kafka上接收对象消息,将json字符串转化为对象,便于获取消息的时候可以使用get方法获取。
*/
public static void getMsgFromKafka() {
while (true) {
ConsumerRecords records = KafkaUtils.getKafkaConsumer().poll(100);
if (records.count() > 0) {
for (ConsumerRecord record : records) {
JSONObject jsonAlarmMsg = JSON.parseObject(record.value());
IpranAlarm alarmMsg = JSONObject.toJavaObject(jsonAlarmMsg, IpranAlarm.class);
System.out.println("从kafka接收到的消息是:" + alarmMsg.toString());
}
}
}
}
public static Consumer getKafkaConsumer() {
return consumer;
}
public static void closeKafkaProducer() {
producer.close();
}
public static void closeKafkaConsumer() {
consumer.close();
}
}
Constants.java
package com.bijian.study;
public class Constants {
public static String BOOTSTRAP_SERVERS = "localhost:9092";
public static String TOPIC_NAME = "haha";
}
IpranAlarm.java
package com.bijian.study;
public class IpranAlarm {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "IpranAlarm [name=" + name + ", age=" + age + "]";
}
}
测试类Main.java
package com.bijian.study;
public class Main {
public static void main(String[] args) {
IpranAlarm ipranAlarm = new IpranAlarm();
ipranAlarm.setAge(18);
ipranAlarm.setName("bijian");
KafkaUtils.sendMsgToKafka(ipranAlarm);
ipranAlarm.setAge(15);
ipranAlarm.setName("zhangshan");
KafkaUtils.sendMsgToKafka(ipranAlarm);
KafkaUtils.getMsgFromKafka();
}
}
运行结果如下:
向kafka发送的消息是:IpranAlarm [name=bijian, age=18]
向kafka发送的消息是:IpranAlarm [name=zhangshan, age=15]
从kafka接收到的消息是:IpranAlarm [name=bijian, age=18]
从kafka接收到的消息是:IpranAlarm [name=zhangshan, age=15]
说明:项目工程依赖的jar包在kafka的libs目录下可以找到(这里的fastjson.jar包除外),在这里是E:\study\bigData\kafka_2.12-2.0.0\libs目录。