700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > Java实现Kafka消费者和生产者

Java实现Kafka消费者和生产者

时间:2023-07-24 08:54:13

相关推荐

Java实现Kafka消费者和生产者

今天来记录一下使用Java实现Kafka的消费者和生产者。

1.所用到的依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency>

2.properties配置文件

server.port=81#============== kafka ===================# 指定kafka server的地址,集群配多个,中间,逗号隔开spring.kafka.bootstrap-servers=192.168.144.150:9092#=============== provider =======================# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。spring.kafka.producer.retries=0# 每次批量发送消息的数量,produce积累到一定数据,一次发送spring.kafka.producer.batch-size=16384# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据spring.kafka.producer.buffer-memory=33554432#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。#可以设置的值为:all, -1, 0, 1spring.kafka.producer.acks=1# 指定消息key和消息体的编解码方式spring.kafka.producer.key-serializer=org.mon.serialization.StringSerializerspring.kafka.producer.value-serializer=org.mon.serialization.StringSerializer#=============== consumer =======================# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名spring.kafka.consumer.group-id=test-consumer-group# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallestspring.kafka.consumer.auto-offset-reset=earliest# mit:true --> 设置自动提交offsetspring.kafka.consumer.enable-auto-commit=true#如果'mit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。spring.kafka.consumer.auto-commit-interval=100# 指定消息key和消息体的编解码方式spring.kafka.consumer.key-deserializer=org.mon.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.mon.serialization.StringDeserializer

3.消费者代码

package com.sumer;import org.springframework.kafka.annotation.KafkaListener;import org.ponent;@Componentpublic class ConsumerListener {@KafkaListener(topics = "test")public void onMessage(String message){//insertIntoDb(buffer);//这里为插入数据库代码System.out.println("接收到:"+message);}}

就问你是不是很简单?就这么简单 !

4.生产者代码

package com.example.kafka2.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class KafkaController {@Autowiredprivate KafkaTemplate kafkaTemplate;@GetMapping("/message/send")public boolean send(@RequestParam String message){kafkaTemplate.send("test",message);return true;}}

就问你是不是很简单?就这么简单 !

搞定!

有一种情况是kafka还没有创建对应的topic,可以看一下这篇文章。

/lp840312696/article/details/109719213

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。