diff --git a/code/Kafka/kafka-basis/pom.xml b/code/Kafka/kafka-basis/pom.xml new file mode 100644 index 0000000..d46efa3 --- /dev/null +++ b/code/Kafka/kafka-basis/pom.xml @@ -0,0 +1,38 @@ + + + 4.0.0 + + com.heibaiying + kafka-basis + 1.0 + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + + + + + org.apache.kafka + kafka-clients + 2.2.0 + + + org.slf4j + slf4j-nop + 1.7.25 + + + + + \ No newline at end of file diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerASyn.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerASyn.java new file mode 100644 index 0000000..0bebb09 --- /dev/null +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerASyn.java @@ -0,0 +1,43 @@ +package com.heibaiying.producers; + +import org.apache.kafka.clients.producer.*; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +/* + * Kafka生产者示例——异步发送消息 + */ +public class ProducerASyn { + + public static void main(String[] args) { + + String topicName = "Hello-Kafka"; + + Properties props = new Properties(); + props.put("bootstrap.servers", "192.168.200.226:9092"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + /*创建生产者*/ + Producer producer = new KafkaProducer<>(props); + + for (int i = 0; i < 10; i++) { + /*异步发送消息*/ + ProducerRecord record = new ProducerRecord<>(topicName, "k" + i, "world" + i); + producer.send(record, new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception != null) { + System.out.println("进行异常处理"); + } else { + System.out.printf("topic=%s, partition=%d, offset=%s \n", + metadata.topic(), metadata.partition(), metadata.offset()); + } + } + }); + } + + /*关闭生产者*/ + producer.close(); + } +} \ No newline at end of file diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerSyn.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerSyn.java new file mode 100644 index 0000000..0fa05ad --- /dev/null +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerSyn.java @@ -0,0 +1,42 @@ +package com.heibaiying.producers; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +/* + * Kafka生产者示例——同步发送消息 + */ +public class ProducerSyn { + + public static void main(String[] args) { + + String topicName = "Hello-Kafka"; + + Properties props = new Properties(); + props.put("bootstrap.servers", "192.168.200.226:9092"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + /*创建生产者*/ + Producer producer = new KafkaProducer<>(props); + + for (int i = 0; i < 10; i++) { + /*同步发送消息*/ + try { + ProducerRecord record = new ProducerRecord<>(topicName, "k" + i, "world" + i); + RecordMetadata metadata = producer.send(record).get(); + System.out.printf("topic=%s, partition=%d, offset=%s \n", + metadata.topic(), metadata.partition(), metadata.offset()); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + } + + /*关闭生产者*/ + producer.close(); + } +} \ No newline at end of file diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerWithPartitioner.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerWithPartitioner.java new file mode 100644 index 0000000..b707323 --- /dev/null +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/ProducerWithPartitioner.java @@ -0,0 +1,37 @@ +package com.heibaiying.producers; + +import org.apache.kafka.clients.producer.*; + +import java.util.Properties; + +/* + * Kafka生产者示例——异步发送消息 + */ +public class ProducerWithPartitioner { + + public static void main(String[] args) { + + String topicName = "Kafka-Partitioner-Test"; + + Properties props = new Properties(); + props.put("bootstrap.servers", "192.168.200.226:9092"); + props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + props.put("partitioner.class", "com.heibaiying.producers.utils.CustomPartitioner"); + props.put("pass.line", 6); + + Producer producer = new KafkaProducer<>(props); + + for (int i = 0; i <= 10; i++) { + /*异步发送消息*/ + String score = "score:" + i; + ProducerRecord record = new ProducerRecord<>(topicName, i, score); + producer.send(record, (metadata, exception) -> + System.out.printf("%s, partition=%d, \n", score, metadata.partition())); + } + + /*关闭生产者*/ + producer.close(); + } +} \ No newline at end of file diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/SimpleProducer.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/SimpleProducer.java new file mode 100644 index 0000000..e0429ec --- /dev/null +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/SimpleProducer.java @@ -0,0 +1,34 @@ +package com.heibaiying.producers; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Properties; + +/* + * Kafka生产者示例 + */ + +public class SimpleProducer { + + public static void main(String[] args) { + + String topicName = "Hello-Kafka"; + + Properties props = new Properties(); + props.put("bootstrap.servers", "192.168.200.226:9092"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + /*创建生产者*/ + org.apache.kafka.clients.producer.Producer producer = new KafkaProducer<>(props); + + for (int i = 0; i < 10; i++) { + /* 发送消息*/ + ProducerRecord record = new ProducerRecord<>(topicName, "hello" + i, "world" + i); + producer.send(record); + } + + /*关闭生产者*/ + producer.close(); + } +} \ No newline at end of file diff --git a/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/utils/CustomPartitioner.java b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/utils/CustomPartitioner.java new file mode 100644 index 0000000..1842406 --- /dev/null +++ b/code/Kafka/kafka-basis/src/main/java/com/heibaiying/producers/utils/CustomPartitioner.java @@ -0,0 +1,29 @@ +package com.heibaiying.producers.utils; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; + +import java.util.Map; + + +public class CustomPartitioner implements Partitioner { + + private int passLine; + + @Override + public void configure(Map configs) { + passLine = (Integer) configs.get("pass.line"); + } + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + return (Integer) key >= passLine ? 1 : 0; + } + + @Override + public void close() { + System.out.println("分区器关闭"); + } + + +} diff --git a/notes/Kafka生产者详解.md b/notes/Kafka生产者详解.md index e69de29..f5cfd19 100644 --- a/notes/Kafka生产者详解.md +++ b/notes/Kafka生产者详解.md @@ -0,0 +1,82 @@ +#### 1. 启动Kakfa + +Kafka的运行依赖于zookeeper,需要预先启动,可以启动Kafka内置的zookeeper,也可以启动自己安装的。 + +```shell +# zookeeper启动命令 +bin/zkServer.sh start + +# 内置zookeeper启动命令 +bin/zookeeper-server-start.sh config/zookeeper.properties +``` + +启动单节点kafka用于测试: + +```shell +# bin/kafka-server-start.sh config/server.properties +``` + +#### 2. 创建topic + +```shell +# 创建用于测试主题 +bin/kafka-topics.sh --create \ + --bootstrap-server hadoop001:9092 \ + --replication-factor 1 --partitions 1 \ + --topic Hello-Kafka + +# 查看所有主题 + bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092 +``` + +#### 3. 启动消费者 + + 启动一个控制台消费者用于观察写入情况,启动命令如下: + +```shell +# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning +``` + + + +```shell +topic=Hello-Kafka, partition=0, offset=40 +topic=Hello-Kafka, partition=0, offset=41 +topic=Hello-Kafka, partition=0, offset=42 +topic=Hello-Kafka, partition=0, offset=43 +topic=Hello-Kafka, partition=0, offset=44 +topic=Hello-Kafka, partition=0, offset=45 +topic=Hello-Kafka, partition=0, offset=46 +topic=Hello-Kafka, partition=0, offset=47 +topic=Hello-Kafka, partition=0, offset=48 +topic=Hello-Kafka, partition=0, offset=49 +``` + + + + + +```shell + bin/kafka-topics.sh --create \ + --bootstrap-server hadoop001:9092 \ + --replication-factor 1 --partitions 2 \ + --topic Kafka-Partitioner-Test +``` + + + +```shell +score:6, partition=1, +score:7, partition=1, +score:8, partition=1, +score:9, partition=1, +score:10, partition=1, +score:0, partition=0, +score:1, partition=0, +score:2, partition=0, +score:3, partition=0, +score:4, partition=0, +score:5, partition=0, +分区器关闭 +``` + diff --git a/pictures/kafka-simple-producer.png b/pictures/kafka-simple-producer.png new file mode 100644 index 0000000..0ff4531 Binary files /dev/null and b/pictures/kafka-simple-producer.png differ