700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)

KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)

时间:2020-07-24 15:52:52

相关推荐

KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)

文章目录

一、基础集成1. 技术选型2. 导入依赖3. kafka配置4. auto-offset-reset 简述5. 新增一个订单类6. 生产者(异步)7. 消费者8. kafka配置类9.单元测试9. 效果图10. 源码地址11.微服务专栏
一、基础集成
1. 技术选型
2. 导入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

3. kafka配置

properties版本

spring.application.name=springboot-kafkaserver.port=8080# kafka 配置spring.kafka.bootstrap-servers=node1:9092# producer 配置spring.kafka.producer.key-serializer=org.mon.serialization.IntegerSerializerspring.kafka.producer.value-serializer=org.mon.serialization.StringSerializer# 生产者每个批次最多方多少条记录spring.kafka.producer.batch-size=16384# 生产者一端总的可用缓冲区大小,此处设置为32M * 1024 * 1024spring.kafka.producer.buffer-memory=33544432# consumer 配置spring.kafka.consumer.key-deserializer=org.mon.serialization.IntegerDeserializerspring.kafka.consumer.value-deserializer=org.mon.serialization.StringDeserializerspring.kafka.consumer.group-id=springboot-consumer-02# earliest - 如果找不到当前消费者的有效偏移量,则自动重置向到最开始spring.kafka.consumer.auto-offset-reset=earliest# 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量spring.kafka.consumer.enable-auto-commit=true# 消费者偏移量自动提交时间间隔spring.kafka.consumer.auto-commit-interval=1000

yml版本项目内部配置

server:port: 8002spring:application:# 应用名称name: ly-kafkaprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: :8848config:# 配置中心地址server-addr: :8848# 配置文件格式file-extension: yml# 共享配置shared-configs:- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}

nacos-config 服务端配置

在这里插入代码片

4. auto-offset-reset 简述

关于

auto.offset.reset 配置有3个值可以设置,分别如下:

earliest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset时,从头开始消费;

latest:当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据;

none: topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset,则抛出异常;

默认建议用 earliest, 设置该参数后 kafka出错后重启,找到未消费的offset可以继续消费。

而 latest 这个设置容易丢失消息,假如 kafka 出现问题,还有数据往topic中写,这个时候重启kafka,这个设置会从最新的offset开始消费, 中间出问题的哪些就不管了。

none 这个设置没有用过,兼容性太差,经常出问题。

5. 新增一个订单类

模拟业务系统中,用户每下一笔订单,就发送一个消息,供其他服务消费

package com.gblfy.kafka.entity;import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor;import java.time.LocalDateTime;@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic class Order {/*** 订单id*/private long orderId;/*** 订单号*/private String orderNum;/*** 订单创建时间*/private LocalDateTime createTime;}

6. 生产者(异步)

package com.gblfy.lykafka.provider;import com.alibaba.fastjson.JSONObject;import mon.constant.KafkaTopicConstants;import mon.entity.Order;import org.apache.kafka.clients.producer.RecordMetadata;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Service;import org.springframework.util.concurrent.ListenableFuture;import org.springframework.util.concurrent.ListenableFutureCallback;import java.time.LocalDateTime;/*** Kafka生产者** @author gblfy* @date -09-28*/@Servicepublic class KafkaProvider {private final static Logger log = LoggerFactory.getLogger(KafkaProvider.class);@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {// 构建一个订单类Order order = Order.builder().orderId(orderId).orderNum(orderNum).createTime(createTime).build();// 发送消息,订单类的 json 作为消息体ListenableFuture<SendResult<String, String>> future =kafkaTemplate.send(KafkaTopicConstants.KAFKA_MSG_TOPIC, JSONObject.toJSONString(order));// 监听回调future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {@Overridepublic void onFailure(Throwable e) {log.info("发送消息失败: {}", e.getMessage());}@Overridepublic void onSuccess(SendResult<String, String> result) {RecordMetadata metadata = result.getRecordMetadata();log.info("发送的主题:{} ,发送的分区:{} ,发送的偏移量:{} ",metadata.topic(), metadata.partition(), metadata.offset());}});}}

7. 消费者

package com.gblfy.lykafka.controller;import com.gblfy.lykafka.provider.KafkaProvider;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;@RestController@RequestMapping("/kafka")public class KafkaProviderController {@Autowiredprivate KafkaProvider kafkaProvider;@GetMapping("/sendMQ")public String sendMQContent() {kafkaProvider.sendMessage(0001, "10", LocalDateTime.now());return "OK";}}

通过 @KafkaListener注解,我们可以指定需要监听的 topic 以及 groupId, 注意,这里的 topics 是个数组,意味着我们可以指定多个 topic,如:@KafkaListener(topics = {“topic-springboot-01”, “topic-springboot-02”}, groupId = “group_id”)。

注意:消息发布者的 TOPIC 需要保持与消费者监听的 TOPIC 一致,否者消费不到消息。

8. kafka配置类

package mon.constant;public class KafkaTopicConstants {//kafka发送消息主题public static final String KAFKA_MSG_TOPIC = "topic-springboot-01";// kafka消费者组需要和yml文件中的 kafka.consumer.group-id的值保持一致public static final String KAFKA_MSG_TOPIC_GROUP = "springboot-consumer-02";}

9.单元测试

新建单元测试,功能测试消息发布,以及消费。

package com.gblfy.kafka;import com.gblfy.kafka.controller.KafkaProvider;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import java.time.LocalDateTime;import java.util.UUID;import java.util.concurrent.TimeUnit;@SpringBootTestclass KafkaSpringbootApplicationTests {@Autowiredprivate KafkaProvider kafkaProvider;@Testpublic void sendMessage() throws InterruptedException {// 发送 1000 个消息for (int i = 0; i < 1000; i++) {long orderId = i+1;String orderNum = UUID.randomUUID().toString();kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());}TimeUnit.MINUTES.sleep(1);}}

9. 效果图

10. 源码地址

/gb_90/kafka-parent

11.微服务专栏

/gb_90/micro-service-parent

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。