kafka生产者

This commit is contained in:
罗祥 2019-05-28 13:11:44 +08:00
parent c870ed381d
commit f5b3cd52a8
8 changed files with 305 additions and 0 deletions

View File

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.heibaiying</groupId>
<artifactId>kafka-basis</artifactId>
<version>1.0</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,43 @@
package com.heibaiying.producers;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/*
* Kafka生产者示例异步发送消息
*/
public class ProducerASyn {
public static void main(String[] args) {
String topicName = "Hello-Kafka";
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.200.226:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/*创建生产者*/
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
/*异步发送消息*/
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("进行异常处理");
} else {
System.out.printf("topic=%s, partition=%d, offset=%s \n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
}
/*关闭生产者*/
producer.close();
}
}

View File

@ -0,0 +1,42 @@
package com.heibaiying.producers;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/*
* Kafka生产者示例同步发送消息
*/
public class ProducerSyn {
public static void main(String[] args) {
String topicName = "Hello-Kafka";
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.200.226:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/*创建生产者*/
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
/*同步发送消息*/
try {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
RecordMetadata metadata = producer.send(record).get();
System.out.printf("topic=%s, partition=%d, offset=%s \n",
metadata.topic(), metadata.partition(), metadata.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/*关闭生产者*/
producer.close();
}
}

View File

@ -0,0 +1,37 @@
package com.heibaiying.producers;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
/*
* Kafka生产者示例异步发送消息
*/
public class ProducerWithPartitioner {
public static void main(String[] args) {
String topicName = "Kafka-Partitioner-Test";
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.200.226:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.heibaiying.producers.utils.CustomPartitioner");
props.put("pass.line", 6);
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i <= 10; i++) {
/*异步发送消息*/
String score = "score:" + i;
ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, i, score);
producer.send(record, (metadata, exception) ->
System.out.printf("%s, partition=%d, \n", score, metadata.partition()));
}
/*关闭生产者*/
producer.close();
}
}

View File

@ -0,0 +1,34 @@
package com.heibaiying.producers;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/*
* Kafka生产者示例
*/
public class SimpleProducer {
public static void main(String[] args) {
String topicName = "Hello-Kafka";
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.200.226:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/*创建生产者*/
org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
/* 发送消息*/
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "hello" + i, "world" + i);
producer.send(record);
}
/*关闭生产者*/
producer.close();
}
}

View File

@ -0,0 +1,29 @@
package com.heibaiying.producers.utils;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
private int passLine;
@Override
public void configure(Map<String, ?> configs) {
passLine = (Integer) configs.get("pass.line");
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return (Integer) key >= passLine ? 1 : 0;
}
@Override
public void close() {
System.out.println("分区器关闭");
}
}

View File

@ -0,0 +1,82 @@
#### 1. 启动Kakfa
Kafka的运行依赖于zookeeper需要预先启动可以启动Kafka内置的zookeeper,也可以启动自己安装的。
```shell
# zookeeper启动命令
bin/zkServer.sh start
# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties
```
启动单节点kafka用于测试
```shell
# bin/kafka-server-start.sh config/server.properties
```
#### 2. 创建topic
```shell
# 创建用于测试主题
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 --partitions 1 \
--topic Hello-Kafka
# 查看所有主题
bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
```
#### 3. 启动消费者
启动一个控制台消费者用于观察写入情况,启动命令如下:
```shell
# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning
```
```shell
topic=Hello-Kafka, partition=0, offset=40
topic=Hello-Kafka, partition=0, offset=41
topic=Hello-Kafka, partition=0, offset=42
topic=Hello-Kafka, partition=0, offset=43
topic=Hello-Kafka, partition=0, offset=44
topic=Hello-Kafka, partition=0, offset=45
topic=Hello-Kafka, partition=0, offset=46
topic=Hello-Kafka, partition=0, offset=47
topic=Hello-Kafka, partition=0, offset=48
topic=Hello-Kafka, partition=0, offset=49
```
```shell
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 --partitions 2 \
--topic Kafka-Partitioner-Test
```
```shell
score:6, partition=1,
score:7, partition=1,
score:8, partition=1,
score:9, partition=1,
score:10, partition=1,
score:0, partition=0,
score:1, partition=0,
score:2, partition=0,
score:3, partition=0,
score:4, partition=0,
score:5, partition=0,
分区器关闭
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB