700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > 使用java创建kafka的生产者和消费者

使用java创建kafka的生产者和消费者

时间:2023-05-19 19:27:55

相关推荐

使用java创建kafka的生产者和消费者

创建一个Kafka的主题,连接到zk集群,副本因子3,分区3,主题名是test111

[root@h5 kafka]# bin/kafka-topics.sh --create --zookeeper h5:2181 --topic test111 --replication-factor 3 --partitions 3

查看Kafka的主题详情

[root@h5 kafka]# bin/kafka-topics.sh --describe --zookeeper h5:2181 --topic test111

查看Kafka所有的主题

[root@h5 kafka]# bin/kafka-topics.sh --list --zookeeper h5:2181

删除Kafka指定的主题

[root@h5 kafka]# bin/kafka-topics.sh --delete --zookeeper h5:2181,h6:2181,h7:2181 --topic test111

如删除时提示

Topic guowang1 is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

请修改Kafka/config/server.properties新增delete.topic.enable=true

kafka_2.10-0.8.2.0.jar

kafka-clients-0.8.2.0.jar

metrics-core-2.2.0.jar

scala-library-2.10.4.jar

zkclient-0.3.jar

zookeeper-3.4.6.jar

1、生产者

1 package storm.test.kafka; 2 3 import java.util.Properties; 4 5 import kafka.javaapi.producer.Producer; 6 import kafka.producer.KeyedMessage; 7 import kafka.producer.ProducerConfig; 8 import kafka.serializer.StringEncoder; 9 10 public class TestProducer {11 12 public static void main(String[] args) throws Exception {13 Properties prop = new Properties();14 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");15 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");16 prop.put("serializer.class", StringEncoder.class.getName());17 Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(prop));18 int i = 0;19 while(true){20 producer.send(new KeyedMessage<String, String>("test111", "msg:"+i++));21 Thread.sleep(1000);22 }23 }24 25 }

2、消费者

1 package storm.test.kafka; 2 3 import java.util.HashMap; 4 import java.util.List; 5 import java.util.Map; 6 import java.util.Properties; 7 8 import kafka.consumer.Consumer; 9 import kafka.consumer.ConsumerConfig;10 import kafka.consumer.ConsumerIterator;11 import kafka.consumer.KafkaStream;12 import kafka.javaapi.consumer.ConsumerConnector;13 import kafka.serializer.StringEncoder;14 15 public class TestConsumer {16 17 static final String topic = "test111";18 19 public static void main(String[] args) {20 Properties prop = new Properties();21 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");22 prop.put("serializer.class", StringEncoder.class.getName());23 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");24 prop.put("group.id", "group1");25 ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));26 Map<String, Integer> topicCountMap = new HashMap<String, Integer>();27 topicCountMap.put(topic, 1);28 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);29 final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);30 ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();31 while (iterator.hasNext()) {32 String msg = new String(iterator.next().message());33 System.out.println("收到消息:"+msg);34 }35 }36 37 }

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。