700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > java向kafka推送数据_Java kafka消息的发送与接收

java向kafka推送数据_Java kafka消息的发送与接收

时间:2024-04-01 09:01:09

相关推荐

java向kafka推送数据_Java kafka消息的发送与接收

一.项目工程结构

二.详细代码

KafkaUtils.java

package com.bijian.study;

import java.util.Arrays;

import java.util.Date;

import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;

public class KafkaUtils {

private static Producer producer;

private static Consumer consumer;

private KafkaUtils() {

}

/**

* 生产者,注意kafka生产者不能够从代码上生成主题,只有在服务器上用命令生成

*/

static {

Properties props = new Properties();

props.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);// 服务器ip:端口号,集群用逗号分隔

props.put("acks", "all");

props.put("retries", 0);

props.put("batch.size", 16384);

props.put("linger.ms", 1);

props.put("buffer.memory", 33554432);

props.put("key.serializer","org.mon.serialization.StringSerializer");

props.put("value.serializer","org.mon.serialization.StringSerializer");

producer = new KafkaProducer<>(props);

}

/**

* 消费者

*/

static {

Properties props = new Properties();

props.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS);// 服务器ip:端口号,集群用逗号分隔

props.put("group.id", "test");

props.put("mit", "true");

props.put("mit.interval.ms", "1000");

props.put("session.timeout.ms", "30000");

props.put("key.deserializer","org.mon.serialization.StringDeserializer");

props.put("value.deserializer","org.mon.serialization.StringDeserializer");

consumer = new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList(Constants.TOPIC_NAME));

}

/**

* 发送对象消息 至kafka上,调用json转化为json字符串,应为kafka存储的是String

*/

public static void sendMsgToKafka(IpranAlarm msg) {

producer.send(new ProducerRecord(Constants.TOPIC_NAME, String.valueOf(new Date().getTime()), JSON.toJSONString(msg)));

System.out.println("向kafka发送的消息是:" + msg.toString());

}

/**

* 从kafka上接收对象消息,将json字符串转化为对象,便于获取消息的时候可以使用get方法获取。

*/

public static void getMsgFromKafka() {

while (true) {

ConsumerRecords records = KafkaUtils.getKafkaConsumer().poll(100);

if (records.count() > 0) {

for (ConsumerRecord record : records) {

JSONObject jsonAlarmMsg = JSON.parseObject(record.value());

IpranAlarm alarmMsg = JSONObject.toJavaObject(jsonAlarmMsg, IpranAlarm.class);

System.out.println("从kafka接收到的消息是:" + alarmMsg.toString());

}

}

}

}

public static Consumer getKafkaConsumer() {

return consumer;

}

public static void closeKafkaProducer() {

producer.close();

}

public static void closeKafkaConsumer() {

consumer.close();

}

}

Constants.java

package com.bijian.study;

public class Constants {

public static String BOOTSTRAP_SERVERS = "localhost:9092";

public static String TOPIC_NAME = "haha";

}

IpranAlarm.java

package com.bijian.study;

public class IpranAlarm {

private String name;

private int age;

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public int getAge() {

return age;

}

public void setAge(int age) {

this.age = age;

}

@Override

public String toString() {

return "IpranAlarm [name=" + name + ", age=" + age + "]";

}

}

测试类Main.java

package com.bijian.study;

public class Main {

public static void main(String[] args) {

IpranAlarm ipranAlarm = new IpranAlarm();

ipranAlarm.setAge(18);

ipranAlarm.setName("bijian");

KafkaUtils.sendMsgToKafka(ipranAlarm);

ipranAlarm.setAge(15);

ipranAlarm.setName("zhangshan");

KafkaUtils.sendMsgToKafka(ipranAlarm);

KafkaUtils.getMsgFromKafka();

}

}

运行结果如下:

向kafka发送的消息是:IpranAlarm [name=bijian, age=18]

向kafka发送的消息是:IpranAlarm [name=zhangshan, age=15]

从kafka接收到的消息是:IpranAlarm [name=bijian, age=18]

从kafka接收到的消息是:IpranAlarm [name=zhangshan, age=15]

说明:项目工程依赖的jar包在kafka的libs目录下可以找到(这里的fastjson.jar包除外),在这里是E:\study\bigData\kafka_2.12-2.0.0\libs目录。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。