700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > springboot整合kafka_springboot整合kafka实现消息的发送消费

springboot整合kafka_springboot整合kafka实现消息的发送消费

时间:2022-01-11 17:18:19

相关推荐

springboot整合kafka_springboot整合kafka实现消息的发送消费

如下是springboot整合kafka的一个案例,方便需要的小伙伴。

启动kafka Server

cd 到kafka的bin目录下:前提是启动zk./kafka-server-start.sh /Users/hz/programs/kafka_2.12-2.2.1/config/server.properties &

kafka创建topic:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-kafka

kafka 发送信息命令:

./kafka-console-producer.sh --broker-list localhost:9092 --topic test-kafka

kafka 接收信息命令:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-kafka --from-beginning

整合springboot+kafka

KafkaProducerConfig.java

@Configuration public class KafkaProducerConfig { @Bean public ProducerFactory producerFactory() {Map configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }

KafkaConsumerConfig.java

@EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory consumerFactory() {Map props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-test-kafka"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }

KafkaProducer.java

@Component @AllArgsConstructor @Slf4j public class KafkaProducer { private final KafkaTemplate kafkaTemplate; public void data(String data){ try {kafkaTemplate.send("test-kafka", "这是测试的数据==>"+data ); }catch (Exception e){e.printStackTrace();log.error("出错!!!!!!!!!!!"); } } }

KafkaConsumer.java

@Component @AllArgsConstructor @Slf4j public class KafkaConsumer { @KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.group}",errorHandler = "consumerAwareErrorHandler") public void data(ConsumerRecord consumerRecord) throws Exception { Object value = consumerRecord.value(); if (log.isInfoEnabled()) { log.info("offset {}, value {}", consumerRecord.offset(), consumerRecord.value()); } if (null == value) { log.error("kafka消费数据为空"); } log.info((String) value); //模拟异常情况处理消息 throw new Exception("模拟消费异常,执行errorhandle方法"); } @Bean public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() { return (message, exception, consumer) -> {System.out.println("消费异常:"+message.getPayload()); return null; }; } }

TestKafkaController.java

@RestController @AllArgsConstructorpublic class TestController { private final KafkaProducer kafkaProducer; @GetMapping("/testKafka") public String aVoid(String str){try { kafkaProducer.data(str); }catch (Exception e){ e.printStackTrace(); }return "成功=============================="; } }

application.properties

spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.producer.batch-size=16 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.retries=0 spring.kafka.producer.enable-auto-commit=false #设置大于0的值,则客户端会将发送失败的记录重新发送 spring.kafka.producer.key-serializer=org.mon.serialization.StringSerializer spring.kafka.producer.value-serializer=org.mon.serialization.StringSerializer kafka.topic: test-kafka #主题消费分组 kafka.group: group-test-kafka

启动项目测试一下:http://127.0.0.1:8686/testKafka

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。