700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > kafka java api 生产者 producer 与消费者consumer

kafka java api 生产者 producer 与消费者consumer

时间:2020-06-21 14:06:28

相关推荐

kafka java api 生产者 producer 与消费者consumer

c踩坑

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic HelloWorld

将localhost必须和PLAINTEXT配置的地址保持一致,否则的话会无限警告不能接收数据

生产者;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerDemo {

public static void main(String[] args){

try {

//新建一个类这个类用来配置kafka

Properties properties = new Properties();

//用来配置 kafka的IP地址和端口号

properties.put("bootstrap.servers", "master:9092");

//ack 的状态可以分为三种 1,-1,all 1代表当生产者

properties.put("acks", "1");//重要

//最多重复几次生产操作

properties.put("retries", 3);//重要

//缓存每个分区未发送消息。缓存的大小是通过 batch.size 配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。

properties.put("batch.size", 16384);

//监听间隔

properties.put("linger.ms", 1);

//控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。

properties.put("buffer.memory", 33554432);

//序列化类型。 Kafka消息是以键值对的形式发送到Kafka集群的,其中Key是可选的,Value可以是任意类型。但是在Message被发送到Kafka集群之前,Producer需要把不同类型的消

properties.put("key.serializer", "org.mon.serialization.StringSerializer");

properties.put("value.serializer", "org.mon.serialization.StringSerializer");

//初始化生产者

Producer<String, String> producer = null;

producer = new KafkaProducer<String, String>(properties);

for (int i = 0; i < 100; i++) {

String msg = "Messagea" + i;

//topic的名字:HelloWorld

producer.send(new ProducerRecord<String, String>("HelloWorld1", msg));

System.out.println("Sent:" + msg);

//Thread.sleep(1000);

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

之后再启动消费者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic HelloWorld --from-beginning

//消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;

import java.util.Properties;

public class ConsumerDemo {

public static void main(String[] args) {

Properties properties = new Properties();

properties.put("bootstrap.servers", "master:9092");

properties.put("group.id", "i");

//是否自动提交

properties.put("mit", "false");

properties.put("mit.interval.ms", "1000");

//偏移量自增

properties.put("auto.offset.reset", "earliest");//

//session保存时间

properties.put("session.timeout.ms", "30000");

//序列化

properties.put("key.deserializer", "org.mon.serialization.StringDeserializer");

properties.put("value.deserializer", "org.mon.serialization.StringDeserializer");

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);

//topictest2

kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));

while (true) {

ConsumerRecords<String, String> records = kafkaConsumer.poll(100);

for (ConsumerRecord<String, String> record : records)

System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

}

}

}

最后启动生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic HelloWorld

但是以上操作都有可能遇到数据重复或者数据丢失,在kafka0.11后引入了幂等和事务完美的解决了这个问题

切忌是0.11版本之后包括0.11,如欲了解详情请看下篇

老规矩:散文欣赏

《不老苍穹》

今后的日子,

我的头发越来越少,

我写给你的句子越来越少,

我剩下的时间越来越少,

但世界永恒,

星辰常在,

苍穹不老。

最后的日子,

我抱起你会很难,

我看清你会很难,

我记起你会很难,

但此心至诚,

百年不变,

千岁未寒。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。