c踩坑
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic HelloWorld
将localhost必须和PLAINTEXT配置的地址保持一致,否则的话会无限警告不能接收数据
生产者;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerDemo {
public static void main(String[] args){
try {
//新建一个类这个类用来配置kafka
Properties properties = new Properties();
//用来配置 kafka的IP地址和端口号
properties.put("bootstrap.servers", "master:9092");
//ack 的状态可以分为三种 1,-1,all 1代表当生产者
properties.put("acks", "1");//重要
//最多重复几次生产操作
properties.put("retries", 3);//重要
//缓存每个分区未发送消息。缓存的大小是通过 batch.size 配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。
properties.put("batch.size", 16384);
//监听间隔
properties.put("linger.ms", 1);
//控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。
properties.put("buffer.memory", 33554432);
//序列化类型。 Kafka消息是以键值对的形式发送到Kafka集群的,其中Key是可选的,Value可以是任意类型。但是在Message被发送到Kafka集群之前,Producer需要把不同类型的消
properties.put("key.serializer", "org.mon.serialization.StringSerializer");
properties.put("value.serializer", "org.mon.serialization.StringSerializer");
//初始化生产者
Producer<String, String> producer = null;
producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 100; i++) {
String msg = "Messagea" + i;
//topic的名字:HelloWorld
producer.send(new ProducerRecord<String, String>("HelloWorld1", msg));
System.out.println("Sent:" + msg);
//Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
之后再启动消费者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic HelloWorld --from-beginning
//消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerDemo {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "master:9092");
properties.put("group.id", "i");
//是否自动提交
properties.put("mit", "false");
properties.put("mit.interval.ms", "1000");
//偏移量自增
properties.put("auto.offset.reset", "earliest");//
//session保存时间
properties.put("session.timeout.ms", "30000");
//序列化
properties.put("key.deserializer", "org.mon.serialization.StringDeserializer");
properties.put("value.deserializer", "org.mon.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);
//topictest2
kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
最后启动生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic HelloWorld
但是以上操作都有可能遇到数据重复或者数据丢失,在kafka0.11后引入了幂等和事务完美的解决了这个问题
切忌是0.11版本之后包括0.11,如欲了解详情请看下篇
老规矩:散文欣赏
《不老苍穹》
今后的日子,
我的头发越来越少,
我写给你的句子越来越少,
我剩下的时间越来越少,
但世界永恒,
星辰常在,
苍穹不老。
最后的日子,
我抱起你会很难,
我看清你会很难,
我记起你会很难,
但此心至诚,
百年不变,
千岁未寒。