本文共 9717 字,大约阅读时间需要 32 分钟。
创建topic
bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic 18BD12-1查看topicbin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181查看topic的结构bin/kafka-topics.sh --describe --zookeeper node01:2181,node02:2181,node03:2181 --topic 18BD12-1模拟生产者生产数据bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic 18BD12修改分区数量bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --alter --topic 18BD12 --partitions 4模拟消费者消费数数据bin/ kafka-console-consumer.sh --from-beginning --topic 18BD12 --zookeeper node01:2181,node02:2181,node03:2181添加配置 bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1删除配置bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages删除topickafka-topics.sh --zookeeper zkhost:port --delete --topic topicName生产者
package com.czxy.producer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/** * 用于生产数据到kafka集群 */public class Producer1 { public static void main(String[] args) { // 1. 配置kafka集群环境 Properties props = new Properties(); // kafka 服务器地址 props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); // 消息确认机制 props.put("acks", "all"); //重试机制 props.put("retries", 0); // 批量发送的大小 props.put("batch.size", 16384); // 消息延迟 props.put("linger.ms", 1); // 批量的缓冲区大小 props.put("buffer.memory", 33554432); // kafka key value 缓冲区 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 自定义分区// props.put("partitioner.class", "Producer.ProducerPartition"); // 2. 实例一个生产者对象 KafkaProducerkafkaProducer = new KafkaProducer (props); // 3. 通过生产者将数据发送到kafka集群 for (int i = 0; i < 10; i++) { //1、没有指定分区编号,没有指定key,时采用轮询方式存户数据 ProducerRecord producerRecord = new ProducerRecord("18BD12_1", "JAV2AAPi" + i); //2、没有指定分区编号,指定key时,数据分发策略为对key求取hash值,这个值与分区数量取余,于数就是分区编号。 //ProducerRecord producerRecord = new ProducerRecord("18BD12_1", "key", "JAV2AAPi" + i); //3、指定分区编号,所有数据输入到指定的分区内 //ProducerRecord producerRecord = new ProducerRecord("18BD12_1", 1, "", "JAV2AAPi" + i); //4、定义分区策略 //ProducerRecord producerRecord = new ProducerRecord("18BD12_1", 1, "", "JAV2AAPi" + i); kafkaProducer.send(producerRecord); } // 关闭生产者 kafkaProducer.close(); }}
消费者
自动提交offset
package com.czxy.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;/** * 消费者 */public class Consumer1 { public static void main(String[] args) { // 1.添加配置文件 Properties props = new Properties(); // 添加kafka服务器 props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); // 指定消费组 props.put("group.id", "test"); //以下两行代码 ---消费者自动提交offset值 props.put("enable.auto.commit", "true"); //自动提交的周期 props.put("auto.commit.interval.ms", "1000"); // kafka key value 反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 2.实例消费对象 KafkaConsumerkafkaConsumer = new KafkaConsumer<>(props); // 3.设置读哪个topic kafkaConsumer.subscribe(Arrays.asList("18BD12_1")); // 循环遍历拉取数据 while (true) { // 4.拉取数据并输出 //获取到所有的数据 ConsumerRecords consumerRecords = kafkaConsumer.poll(1000); // 循环遍历数据 获取一条数据 for (ConsumerRecord record : consumerRecords) { String value = record.value(); System.out.println(value); } } }}
手动提交offset
package com.czxy.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.awt.event.KeyAdapter;import java.util.Arrays;import java.util.Properties;/** * 手动提交offset */public class Consumer2 { public static void main(String[] args) { // 1.添加配置文件 Properties props = new Properties(); // 添加kafka服务器 props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); // 指定消费组 props.put("group.id", "test1"); //以下两行代码 ---消费者自动提交offset值 props.put("enable.auto.commit", "false"); //自动提交的周期// props.put("auto.commit.interval.ms", "1000"); // kafka key value 反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //2.实例消费者对象 KafkaConsumerkafkaConsumer = new KafkaConsumer<>(props); //3.指定topic kafkaConsumer.subscribe(Arrays.asList("18BD12_1")); // 循环遍历 while (true){ //4.循环拉取数据并输出 ConsumerRecords consumerRecords = kafkaConsumer.poll(1000); for (ConsumerRecord record : consumerRecords) { System.out.println(record.value()); } // 手动提交offset kafkaConsumer.commitAsync(); } }}
以分区为单位进行读取
package com.czxy.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import java.util.Arrays;import java.util.List;import java.util.Properties;import java.util.Set;/*以分区为单位进行消费 */public class ConsumerPartition { public static void main(String[] args) { // 1.添加配置文件 Properties props = new Properties(); // 添加kafka服务器 props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); // 指定消费组 props.put("group.id", "test1"); //以下两行代码 ---消费者自动提交offset值 props.put("enable.auto.commit", "false"); //自动提交的周期// props.put("auto.commit.interval.ms", "1000"); // kafka key value 反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 2.实例消费对象 KafkaConsumerkafkaConsumer = new KafkaConsumer<>(props); // 3.添加topic kafkaConsumer.subscribe(Arrays.asList("18BD12_1")); while (true) { // 4.循环获取数据 // 根据分区读取数据 ConsumerRecords consumerRecords = kafkaConsumer.poll(1000); // 获取所有的分区 Set partitions = consumerRecords.partitions(); // 遍历分区 for (TopicPartition partition : partitions) { // 获取每个分区的数据 List > records = consumerRecords.records(partition); // 遍历分区所有数据 获取每一条数据 for (ConsumerRecord record : records) { System.out.println(record.value()); } // 5.手动提交offset(一个分区提交一次) kafkaConsumer.commitAsync(); } } }}
消费制定的分区
package Consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import java.util.Arrays;import java.util.List;import java.util.Properties;import java.util.Set;/*消费指定分区的数据 */public class ConsumerMyPartition { public static void main(String[] args){ //1配置文件 Properties props = new Properties(); //指定kafka服务器 props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); //消费组 props.put("group.id", "test1"); //以下两行代码 ---消费者自动提交offset值 props.put("enable.auto.commit", "false"); //自动提交的周期 //props.put("auto.commit.interval.ms", "1000"); //kafka key 和value的反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //2消费者 KafkaConsumerkafkaConsumer = new KafkaConsumer (props); //3、设置topic // 添加消费配置 TopicPartition topicPartition0 = new TopicPartition("18BD12",0); TopicPartition topicPartition2 = new TopicPartition("18BD12",1); kafkaConsumer.assign(Arrays.asList(topicPartition0,topicPartition2)); while (true){ //4、拉取数据,并输出 ConsumerRecords consumerRecords = kafkaConsumer.poll(1000); //通过数据获取到多有的分区 0 2 Set partitions = consumerRecords.partitions(); //遍历所有分区,或得到一个分区 for (TopicPartition partition : partitions) { //获取每个分区的数据,多条数据 List > records = consumerRecords.records(partition); //遍历分区内的所有数据,或得到一条 for (ConsumerRecord record : records) { System.out.println(record.value()+" "+record.partition()); } //手动提交offset kafkaConsumer.commitSync(); } } } }
转载地址:http://iakzi.baihongyu.com/