kafka消费者

This commit is contained in:
罗祥 2019-05-28 15:46:00 +08:00
parent f5b3cd52a8
commit d1f25185d1
13 changed files with 440 additions and 13 deletions

View File

@ -0,0 +1,52 @@
package com.heibaiying.consumers;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
* Kafka消费者异步提交
*/
public class ConsumerASyn {
public static void main(String[] args) {
String topic = "Hello-Kafka";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
/*异步提交并定义回调*/
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.out.println("错误处理");
offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n",
x.topic(), x.partition(), y.offset()));
}
}
});
}
} finally {
consumer.close();
}
}
}

View File

@ -0,0 +1,49 @@
package com.heibaiying.consumers;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
/**
* Kafka消费者同步加异步提交
*/
public class ConsumerASynAndSyn {
public static void main(String[] args) {
String topic = "Hello-Kafka";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
// 异步提交
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 因为即将要关闭消费者所以要用同步提交保证提交成功
consumer.commitSync();
} finally {
consumer.close();
}
}
}
}

View File

@ -0,0 +1,54 @@
package com.heibaiying.consumers;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Kafka消费者异步提交特定偏移量
*/
public class ConsumerASynWithOffsets {
public static void main(String[] args) {
String topic = "Hello-Kafka";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
/*记录每个主题的每个分区的偏移量*/
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset()+1, "no metaData");
/*TopicPartition重写过hashCode和equals方法所以能够保证同一主题和分区的实例不会被重复添加*/
offsets.put(topicPartition, offsetAndMetadata);
}
/*提交特定偏移量*/
consumer.commitAsync(offsets, null);
}
} finally {
consumer.close();
}
}
}

View File

@ -0,0 +1,67 @@
package com.heibaiying.consumers;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
import java.util.Scanner;
/**
* Kafka消费者和消费者组
*/
public class ConsumerExit {
public static void main(String[] args) {
String topic = "Hello-Kafka";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
/*调用wakeup优雅的退出*/
final Thread mainThread = Thread.currentThread();
new Thread(() -> {
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
if ("exit".equals(sc.next())) {
consumer.wakeup();
try {
/*等待主线程完成提交偏移量、关闭消费者等操作*/
mainThread.join();
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n",
record.topic(), record.partition(), record.key(), record.value(), record.offset());
}
}
} catch (WakeupException e) {
//对于wakeup()调用引起的WakeupException异常可以不必处理
} finally {
consumer.close();
System.out.println("consumer关闭");
}
}
}

View File

@ -0,0 +1,46 @@
package com.heibaiying.consumers;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
/**
* Kafka消费者和消费者组
*/
public class ConsumerGroup {
public static void main(String[] args) {
String topic = "Hello-Kafka";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
/*指定分组ID*/
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/*订阅主题(s)*/
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
/*轮询获取数据*/
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n",
record.topic(), record.partition(), record.key(), record.value(), record.offset());
}
}
} finally {
consumer.close();
}
}
}

View File

@ -0,0 +1,43 @@
package com.heibaiying.consumers;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
/**
* Kafka消费者同步提交
*/
public class ConsumerSyn {
public static void main(String[] args) {
String topic = "Hello-Kafka";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
/*同步提交*/
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}

View File

@ -0,0 +1,58 @@
package com.heibaiying.consumers;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.*;
public class RebalanceListener {
public static void main(String[] args) {
String topic = "Hello-Kafka";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
/*该方法会在消费者停止读取消息之后,再均衡开始之前就调用*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("再均衡即将触发");
// 提交当前偏移量
consumer.commitSync(offsets);
}
/*该方法会在重新分配分区之后,消费者开始读取消息之前被调用*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
});
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1, "no metaData");
/*TopicPartition重写过hashCode和equals方法所以能够保证同一主题和分区的实例不会被重复添加*/
offsets.put(topicPartition, offsetAndMetadata);
}
consumer.commitAsync(offsets, null);
}
} finally {
consumer.close();
}
}
}

View File

@ -0,0 +1,56 @@
package com.heibaiying.consumers;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* 独立消费者
*/
public class StandaloneConsumer {
public static void main(String[] args) {
String topic = "Kafka-Partitioner-Test";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props);
List<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
/*可以指定读取哪些分区 如这里假设只读取主题的0分区*/
for (PartitionInfo partition : partitionInfos) {
if (partition.partition()==0){
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
}
// 为消费者指定分区
consumer.assign(partitions);
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<Integer, String> record : records) {
System.out.printf("partition = %s, key = %d, value = %s\n",
record.partition(), record.key(), record.value());
}
consumer.commitSync();
}
}
}

View File

@ -3,7 +3,6 @@ package com.heibaiying.producers;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/*
* Kafka生产者示例异步发送消息
@ -15,15 +14,15 @@ public class ProducerASyn {
String topicName = "Hello-Kafka";
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.200.226:9092");
props.put("bootstrap.servers", "hadoop001:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/*创建生产者*/
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
/*异步发送消息*/
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
/*异步发送消息,并监听回调*/
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {

View File

@ -18,16 +18,16 @@ public class ProducerSyn {
String topicName = "Hello-Kafka";
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.200.226:9092");
props.put("bootstrap.servers", "hadoop001:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/*创建生产者*/
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
/*同步发送消息*/
try {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
/*同步发送消息*/
RecordMetadata metadata = producer.send(record).get();
System.out.printf("topic=%s, partition=%d, offset=%s \n",
metadata.topic(), metadata.partition(), metadata.offset());

View File

@ -14,24 +14,25 @@ public class ProducerWithPartitioner {
String topicName = "Kafka-Partitioner-Test";
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.200.226:9092");
props.put("bootstrap.servers", "hadoop001:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class", "com.heibaiying.producers.utils.CustomPartitioner");
/*传递自定义分区器*/
props.put("partitioner.class", "com.heibaiying.producers.partitioners.CustomPartitioner");
/*传递分区器所需的参数*/
props.put("pass.line", 6);
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i <= 10; i++) {
/*异步发送消息*/
String score = "score:" + i;
ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, i, score);
/*异步发送消息*/
producer.send(record, (metadata, exception) ->
System.out.printf("%s, partition=%d, \n", score, metadata.partition()));
}
/*关闭生产者*/
producer.close();
}
}

View File

@ -16,15 +16,15 @@ public class SimpleProducer {
String topicName = "Hello-Kafka";
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.200.226:9092");
props.put("bootstrap.servers", "hadoop001:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/*创建生产者*/
org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
/* 发送消息*/
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "hello" + i, "world" + i);
/* 发送消息*/
producer.send(record);
}

View File

@ -1,11 +1,13 @@
package com.heibaiying.producers.utils;
package com.heibaiying.producers.partitioners;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 自定义分区器
*/
public class CustomPartitioner implements Partitioner {
private int passLine;