From d1f25185d1824b9d57cd003db203bd0a2f1929f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E7=A5=A5?= <1366971433@qq.com> Date: Tue, 28 May 2019 15:46:00 +0800 Subject: [PATCH] =?UTF-8?q?kafka=E6=B6=88=E8=B4=B9=E8=80=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../heibaiying/consumers/ConsumerASyn.java | 52 ++++++++++++++ .../consumers/ConsumerASynAndSyn.java | 49 ++++++++++++++ .../consumers/ConsumerASynWithOffsets.java | 54 +++++++++++++++ .../heibaiying/consumers/ConsumerExit.java | 67 +++++++++++++++++++ .../heibaiying/consumers/ConsumerGroup.java | 46 +++++++++++++ .../com/heibaiying/consumers/ConsumerSyn.java | 43 ++++++++++++ .../consumers/RebalanceListener.java | 58 ++++++++++++++++ .../consumers/StandaloneConsumer.java | 56 ++++++++++++++++ .../heibaiying/producers/ProducerASyn.java | 5 +- .../com/heibaiying/producers/ProducerSyn.java | 4 +- .../producers/ProducerWithPartitioner.java | 9 +-- .../heibaiying/producers/SimpleProducer.java | 4 +- .../CustomPartitioner.java | 6 +- 13 files changed, 440 insertions(+), 13 deletions(-) create mode 100644 code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASyn.java create mode 100644 code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASynAndSyn.java create mode 100644 code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASynWithOffsets.java create mode 100644 code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerExit.java create mode 100644 code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerGroup.java create mode 100644 code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerSyn.java create mode 100644 code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/RebalanceListener.java create mode 100644 code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/StandaloneConsumer.java rename code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/{utils => partitioners}/CustomPartitioner.java (88%) diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASyn.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASyn.java new file mode 100644 index 0000000..c0c416b --- /dev/null +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASyn.java @@ -0,0 +1,52 @@ +package com.heibaiying.consumers; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; + +/** + * Kafka消费者——异步提交 + */ +public class ConsumerASyn { + + public static void main(String[] args) { + String topic = "Hello-Kafka"; + String group = "group1"; + Properties props = new Properties(); + props.put("bootstrap.servers", "hadoop001:9092"); + props.put("group.id", group); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + KafkaConsumer consumer = new KafkaConsumer<>(props); + + consumer.subscribe(Collections.singletonList(topic)); + + try { + while (true) { + ConsumerRecords records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); + for (ConsumerRecord record : records) { + System.out.println(record); + } + /*异步提交并定义回调*/ + consumer.commitAsync(new OffsetCommitCallback() { + @Override + public void onComplete(Map offsets, Exception exception) { + if (exception != null) { + System.out.println("错误处理"); + offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n", + x.topic(), x.partition(), y.offset())); + } + } + }); + } + } finally { + consumer.close(); + } + + } +} diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASynAndSyn.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASynAndSyn.java new file mode 100644 index 0000000..13dda52 --- /dev/null +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASynAndSyn.java @@ -0,0 +1,49 @@ +package com.heibaiying.consumers; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.Properties; + +/** + * Kafka消费者——同步加异步提交 + */ +public class ConsumerASynAndSyn { + + public static void main(String[] args) { + String topic = "Hello-Kafka"; + String group = "group1"; + Properties props = new Properties(); + props.put("bootstrap.servers", "hadoop001:9092"); + props.put("group.id", group); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + KafkaConsumer consumer = new KafkaConsumer<>(props); + + consumer.subscribe(Collections.singletonList(topic)); + + try { + while (true) { + ConsumerRecords records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); + for (ConsumerRecord record : records) { + System.out.println(record); + } + // 异步提交 + consumer.commitAsync(); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + try { + // 因为即将要关闭消费者,所以要用同步提交保证提交成功 + consumer.commitSync(); + } finally { + consumer.close(); + } + } + } +} diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASynWithOffsets.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASynWithOffsets.java new file mode 100644 index 0000000..df6e773 --- /dev/null +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerASynWithOffsets.java @@ -0,0 +1,54 @@ +package com.heibaiying.consumers; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Kafka消费者——异步提交特定偏移量 + */ +public class ConsumerASynWithOffsets { + + public static void main(String[] args) { + String topic = "Hello-Kafka"; + String group = "group1"; + Properties props = new Properties(); + props.put("bootstrap.servers", "hadoop001:9092"); + props.put("group.id", group); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + KafkaConsumer consumer = new KafkaConsumer<>(props); + + consumer.subscribe(Collections.singletonList(topic)); + + Map offsets = new HashMap<>(); + + try { + while (true) { + ConsumerRecords records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); + for (ConsumerRecord record : records) { + System.out.println(record); + /*记录每个主题的每个分区的偏移量*/ + TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset()+1, "no metaData"); + /*TopicPartition重写过hashCode和equals方法,所以能够保证同一主题和分区的实例不会被重复添加*/ + offsets.put(topicPartition, offsetAndMetadata); + } + /*提交特定偏移量*/ + consumer.commitAsync(offsets, null); + } + } finally { + consumer.close(); + } + + } +} diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerExit.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerExit.java new file mode 100644 index 0000000..ae6ec69 --- /dev/null +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerExit.java @@ -0,0 +1,67 @@ +package com.heibaiying.consumers; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.Properties; +import java.util.Scanner; + + +/** + * Kafka消费者和消费者组 + */ +public class ConsumerExit { + + public static void main(String[] args) { + String topic = "Hello-Kafka"; + String group = "group1"; + Properties props = new Properties(); + props.put("bootstrap.servers", "hadoop001:9092"); + props.put("group.id", group); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + KafkaConsumer consumer = new KafkaConsumer<>(props); + + consumer.subscribe(Collections.singletonList(topic)); + + + /*调用wakeup优雅的退出*/ + final Thread mainThread = Thread.currentThread(); + new Thread(() -> { + Scanner sc = new Scanner(System.in); + while (sc.hasNext()) { + if ("exit".equals(sc.next())) { + consumer.wakeup(); + try { + /*等待主线程完成提交偏移量、关闭消费者等操作*/ + mainThread.join(); + break; + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + }).start(); + + try { + while (true) { + ConsumerRecords records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); + for (ConsumerRecord record : records) { + System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n", + record.topic(), record.partition(), record.key(), record.value(), record.offset()); + } + } + } catch (WakeupException e) { + //对于wakeup()调用引起的WakeupException异常可以不必处理 + } finally { + consumer.close(); + System.out.println("consumer关闭"); + } + + } +} diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerGroup.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerGroup.java new file mode 100644 index 0000000..c06bf12 --- /dev/null +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerGroup.java @@ -0,0 +1,46 @@ +package com.heibaiying.consumers; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.Properties; + + +/** + * Kafka消费者和消费者组 + */ +public class ConsumerGroup { + + public static void main(String[] args) { + String topic = "Hello-Kafka"; + String group = "group1"; + Properties props = new Properties(); + props.put("bootstrap.servers", "hadoop001:9092"); + /*指定分组ID*/ + props.put("group.id", group); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + KafkaConsumer consumer = new KafkaConsumer<>(props); + + /*订阅主题(s)*/ + consumer.subscribe(Collections.singletonList(topic)); + + try { + while (true) { + /*轮询获取数据*/ + ConsumerRecords records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); + for (ConsumerRecord record : records) { + System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n", + record.topic(), record.partition(), record.key(), record.value(), record.offset()); + } + } + } finally { + consumer.close(); + } + + } +} diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerSyn.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerSyn.java new file mode 100644 index 0000000..d9f8c7c --- /dev/null +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/ConsumerSyn.java @@ -0,0 +1,43 @@ +package com.heibaiying.consumers; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.Properties; + +/** + * Kafka消费者——同步提交 + */ +public class ConsumerSyn { + + public static void main(String[] args) { + String topic = "Hello-Kafka"; + String group = "group1"; + Properties props = new Properties(); + props.put("bootstrap.servers", "hadoop001:9092"); + props.put("group.id", group); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + KafkaConsumer consumer = new KafkaConsumer<>(props); + + consumer.subscribe(Collections.singletonList(topic)); + + try { + while (true) { + ConsumerRecords records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); + for (ConsumerRecord record : records) { + System.out.println(record); + } + /*同步提交*/ + consumer.commitSync(); + } + } finally { + consumer.close(); + } + + } +} diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/RebalanceListener.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/RebalanceListener.java new file mode 100644 index 0000000..fa86350 --- /dev/null +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/RebalanceListener.java @@ -0,0 +1,58 @@ +package com.heibaiying.consumers; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.TopicPartition; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.*; + +public class RebalanceListener { + + public static void main(String[] args) { + String topic = "Hello-Kafka"; + String group = "group1"; + Properties props = new Properties(); + props.put("bootstrap.servers", "hadoop001:9092"); + props.put("group.id", group); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + KafkaConsumer consumer = new KafkaConsumer<>(props); + + Map offsets = new HashMap<>(); + + consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() { + + /*该方法会在消费者停止读取消息之后,再均衡开始之前就调用*/ + @Override + public void onPartitionsRevoked(Collection partitions) { + System.out.println("再均衡即将触发"); + // 提交当前偏移量 + consumer.commitSync(offsets); + } + + /*该方法会在重新分配分区之后,消费者开始读取消息之前被调用*/ + @Override + public void onPartitionsAssigned(Collection partitions) { + + } + }); + + try { + while (true) { + ConsumerRecords records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); + for (ConsumerRecord record : records) { + System.out.println(record); + TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1, "no metaData"); + /*TopicPartition重写过hashCode和equals方法,所以能够保证同一主题和分区的实例不会被重复添加*/ + offsets.put(topicPartition, offsetAndMetadata); + } + consumer.commitAsync(offsets, null); + } + } finally { + consumer.close(); + } + + } +} diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/StandaloneConsumer.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/StandaloneConsumer.java new file mode 100644 index 0000000..1c91103 --- /dev/null +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/consumers/StandaloneConsumer.java @@ -0,0 +1,56 @@ +package com.heibaiying.consumers; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + + +/** + * 独立消费者 + */ +public class StandaloneConsumer { + + public static void main(String[] args) { + + String topic = "Kafka-Partitioner-Test"; + String group = "group1"; + Properties props = new Properties(); + props.put("bootstrap.servers", "hadoop001:9092"); + props.put("group.id", group); + props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + KafkaConsumer consumer = new KafkaConsumer<>(props); + + List partitions = new ArrayList<>(); + List partitionInfos = consumer.partitionsFor(topic); + + /*可以指定读取哪些分区 如这里假设只读取主题的0分区*/ + for (PartitionInfo partition : partitionInfos) { + if (partition.partition()==0){ + partitions.add(new TopicPartition(partition.topic(), partition.partition())); + } + } + + // 为消费者指定分区 + consumer.assign(partitions); + + + while (true) { + ConsumerRecords records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); + for (ConsumerRecord record : records) { + System.out.printf("partition = %s, key = %d, value = %s\n", + record.partition(), record.key(), record.value()); + } + consumer.commitSync(); + } + + } +} diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerASyn.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerASyn.java index 0bebb09..07ddcfe 100644 --- a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerASyn.java +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerASyn.java @@ -3,7 +3,6 @@ package com.heibaiying.producers; import org.apache.kafka.clients.producer.*; import java.util.Properties; -import java.util.concurrent.ExecutionException; /* * Kafka生产者示例——异步发送消息 @@ -15,15 +14,15 @@ public class ProducerASyn { String topicName = "Hello-Kafka"; Properties props = new Properties(); - props.put("bootstrap.servers", "192.168.200.226:9092"); + props.put("bootstrap.servers", "hadoop001:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /*创建生产者*/ Producer producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { - /*异步发送消息*/ ProducerRecord record = new ProducerRecord<>(topicName, "k" + i, "world" + i); + /*异步发送消息,并监听回调*/ producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerSyn.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerSyn.java index 0fa05ad..ef6a696 100644 --- a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerSyn.java +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerSyn.java @@ -18,16 +18,16 @@ public class ProducerSyn { String topicName = "Hello-Kafka"; Properties props = new Properties(); - props.put("bootstrap.servers", "192.168.200.226:9092"); + props.put("bootstrap.servers", "hadoop001:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /*创建生产者*/ Producer producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { - /*同步发送消息*/ try { ProducerRecord record = new ProducerRecord<>(topicName, "k" + i, "world" + i); + /*同步发送消息*/ RecordMetadata metadata = producer.send(record).get(); System.out.printf("topic=%s, partition=%d, offset=%s \n", metadata.topic(), metadata.partition(), metadata.offset()); diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerWithPartitioner.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerWithPartitioner.java index b707323..5f52737 100644 --- a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerWithPartitioner.java +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerWithPartitioner.java @@ -14,24 +14,25 @@ public class ProducerWithPartitioner { String topicName = "Kafka-Partitioner-Test"; Properties props = new Properties(); - props.put("bootstrap.servers", "192.168.200.226:9092"); + props.put("bootstrap.servers", "hadoop001:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("partitioner.class", "com.heibaiying.producers.utils.CustomPartitioner"); + /*传递自定义分区器*/ + props.put("partitioner.class", "com.heibaiying.producers.partitioners.CustomPartitioner"); + /*传递分区器所需的参数*/ props.put("pass.line", 6); Producer producer = new KafkaProducer<>(props); for (int i = 0; i <= 10; i++) { - /*异步发送消息*/ String score = "score:" + i; ProducerRecord record = new ProducerRecord<>(topicName, i, score); + /*异步发送消息*/ producer.send(record, (metadata, exception) -> System.out.printf("%s, partition=%d, \n", score, metadata.partition())); } - /*关闭生产者*/ producer.close(); } } \ No newline at end of file diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/SimpleProducer.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/SimpleProducer.java index e0429ec..8987206 100644 --- a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/SimpleProducer.java +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/SimpleProducer.java @@ -16,15 +16,15 @@ public class SimpleProducer { String topicName = "Hello-Kafka"; Properties props = new Properties(); - props.put("bootstrap.servers", "192.168.200.226:9092"); + props.put("bootstrap.servers", "hadoop001:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /*创建生产者*/ org.apache.kafka.clients.producer.Producer producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { - /* 发送消息*/ ProducerRecord record = new ProducerRecord<>(topicName, "hello" + i, "world" + i); + /* 发送消息*/ producer.send(record); } diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/utils/CustomPartitioner.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/partitioners/CustomPartitioner.java similarity index 88% rename from code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/utils/CustomPartitioner.java rename to code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/partitioners/CustomPartitioner.java index 1842406..9936ae7 100644 --- a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/utils/CustomPartitioner.java +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/partitioners/CustomPartitioner.java @@ -1,11 +1,13 @@ -package com.heibaiying.producers.utils; +package com.heibaiying.producers.partitioners; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; - +/** + * 自定义分区器 + */ public class CustomPartitioner implements Partitioner { private int passLine;