博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
快速入门kafka④ 常用命令及API使用
阅读量:3951 次
发布时间:2019-05-24

本文共 9717 字,大约阅读时间需要 32 分钟。

常用命令:

创建topic

bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic 18BD12-1
查看topic
bin/kafka-topics.sh  --list --zookeeper node01:2181,node02:2181,node03:2181
查看topic的结构
bin/kafka-topics.sh --describe --zookeeper node01:2181,node02:2181,node03:2181 --topic   18BD12-1
模拟生产者生产数据
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic 18BD12
修改分区数量
bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --alter --topic 18BD12 --partitions 4
模拟消费者消费数数据
bin/ kafka-console-consumer.sh --from-beginning --topic 18BD12  --zookeeper node01:2181,node02:2181,node03:2181
添加配置
 bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1
删除配置
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages
删除topic
kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName

JavaAPI使用:

生产者

package com.czxy.producer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/** * 用于生产数据到kafka集群 */public class Producer1 {    public static void main(String[] args) {        // 1. 配置kafka集群环境        Properties props = new Properties();        // kafka 服务器地址        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");        // 消息确认机制        props.put("acks", "all");        //重试机制        props.put("retries", 0);        // 批量发送的大小        props.put("batch.size", 16384);        // 消息延迟        props.put("linger.ms", 1);        // 批量的缓冲区大小        props.put("buffer.memory", 33554432);        // kafka key value 缓冲区        props.put("key.serializer",                "org.apache.kafka.common.serialization.StringSerializer");        props.put("value.serializer",                "org.apache.kafka.common.serialization.StringSerializer");        // 自定义分区//        props.put("partitioner.class", "Producer.ProducerPartition");         // 2. 实例一个生产者对象        KafkaProducer
kafkaProducer = new KafkaProducer
(props); // 3. 通过生产者将数据发送到kafka集群 for (int i = 0; i < 10; i++) { //1、没有指定分区编号,没有指定key,时采用轮询方式存户数据 ProducerRecord producerRecord = new ProducerRecord("18BD12_1", "JAV2AAPi" + i); //2、没有指定分区编号,指定key时,数据分发策略为对key求取hash值,这个值与分区数量取余,于数就是分区编号。 //ProducerRecord producerRecord = new ProducerRecord("18BD12_1", "key", "JAV2AAPi" + i); //3、指定分区编号,所有数据输入到指定的分区内 //ProducerRecord producerRecord = new ProducerRecord("18BD12_1", 1, "", "JAV2AAPi" + i); //4、定义分区策略 //ProducerRecord producerRecord = new ProducerRecord("18BD12_1", 1, "", "JAV2AAPi" + i); kafkaProducer.send(producerRecord); } // 关闭生产者 kafkaProducer.close(); }}

消费者

自动提交offset

package com.czxy.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;/** * 消费者 */public class Consumer1 {    public static void main(String[] args) {        // 1.添加配置文件        Properties props = new Properties();        // 添加kafka服务器        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");        // 指定消费组        props.put("group.id", "test");        //以下两行代码 ---消费者自动提交offset值        props.put("enable.auto.commit", "true");        //自动提交的周期        props.put("auto.commit.interval.ms", "1000");        // kafka key  value 反序列化        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        // 2.实例消费对象        KafkaConsumer
kafkaConsumer = new KafkaConsumer<>(props); // 3.设置读哪个topic kafkaConsumer.subscribe(Arrays.asList("18BD12_1")); // 循环遍历拉取数据 while (true) { // 4.拉取数据并输出 //获取到所有的数据 ConsumerRecords
consumerRecords = kafkaConsumer.poll(1000); // 循环遍历数据 获取一条数据 for (ConsumerRecord
record : consumerRecords) { String value = record.value(); System.out.println(value); } } }}

手动提交offset

package com.czxy.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.awt.event.KeyAdapter;import java.util.Arrays;import java.util.Properties;/** * 手动提交offset */public class Consumer2 {    public static void main(String[] args) {        // 1.添加配置文件        Properties props = new Properties();        // 添加kafka服务器        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");        // 指定消费组        props.put("group.id", "test1");        //以下两行代码 ---消费者自动提交offset值        props.put("enable.auto.commit", "false");        //自动提交的周期//        props.put("auto.commit.interval.ms", "1000");        // kafka key  value 反序列化        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        //2.实例消费者对象        KafkaConsumer
kafkaConsumer = new KafkaConsumer<>(props); //3.指定topic kafkaConsumer.subscribe(Arrays.asList("18BD12_1")); // 循环遍历 while (true){ //4.循环拉取数据并输出 ConsumerRecords
consumerRecords = kafkaConsumer.poll(1000); for (ConsumerRecord
record : consumerRecords) { System.out.println(record.value()); } // 手动提交offset kafkaConsumer.commitAsync(); } }}

以分区为单位进行读取

package com.czxy.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import java.util.Arrays;import java.util.List;import java.util.Properties;import java.util.Set;/*以分区为单位进行消费 */public class ConsumerPartition {    public static void main(String[] args) {        // 1.添加配置文件        Properties props = new Properties();        // 添加kafka服务器        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");        // 指定消费组        props.put("group.id", "test1");        //以下两行代码 ---消费者自动提交offset值        props.put("enable.auto.commit", "false");        //自动提交的周期//        props.put("auto.commit.interval.ms", "1000");        // kafka key  value 反序列化        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        // 2.实例消费对象        KafkaConsumer
kafkaConsumer = new KafkaConsumer<>(props); // 3.添加topic kafkaConsumer.subscribe(Arrays.asList("18BD12_1")); while (true) { // 4.循环获取数据 // 根据分区读取数据 ConsumerRecords
consumerRecords = kafkaConsumer.poll(1000); // 获取所有的分区 Set
partitions = consumerRecords.partitions(); // 遍历分区 for (TopicPartition partition : partitions) { // 获取每个分区的数据 List
> records = consumerRecords.records(partition); // 遍历分区所有数据 获取每一条数据 for (ConsumerRecord
record : records) { System.out.println(record.value()); } // 5.手动提交offset(一个分区提交一次) kafkaConsumer.commitAsync(); } } }}

消费制定的分区

package Consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import java.util.Arrays;import java.util.List;import java.util.Properties;import java.util.Set;/*消费指定分区的数据 */public class ConsumerMyPartition {    public static void main(String[] args){        //1配置文件        Properties props = new Properties();        //指定kafka服务器        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");        //消费组        props.put("group.id", "test1");        //以下两行代码 ---消费者自动提交offset值        props.put("enable.auto.commit", "false");        //自动提交的周期        //props.put("auto.commit.interval.ms",  "1000");        //kafka   key 和value的反序列化        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        //2消费者        KafkaConsumer
kafkaConsumer = new KafkaConsumer
(props); //3、设置topic // 添加消费配置 TopicPartition topicPartition0 = new TopicPartition("18BD12",0); TopicPartition topicPartition2 = new TopicPartition("18BD12",1); kafkaConsumer.assign(Arrays.asList(topicPartition0,topicPartition2)); while (true){ //4、拉取数据,并输出 ConsumerRecords
consumerRecords = kafkaConsumer.poll(1000); //通过数据获取到多有的分区 0 2 Set
partitions = consumerRecords.partitions(); //遍历所有分区,或得到一个分区 for (TopicPartition partition : partitions) { //获取每个分区的数据,多条数据 List
> records = consumerRecords.records(partition); //遍历分区内的所有数据,或得到一条 for (ConsumerRecord
record : records) { System.out.println(record.value()+" "+record.partition()); } //手动提交offset kafkaConsumer.commitSync(); } } } }

2

转载地址:http://iakzi.baihongyu.com/

你可能感兴趣的文章
1009 说反话 (20 分)
查看>>
1010 一元多项式求导 (25 分)
查看>>
1011 A+B 和 C (15 分)
查看>>
1012 数字分类 (20 分)
查看>>
1013 数素数 (20 分)
查看>>
1014 福尔摩斯的约会 (20 分)
查看>>
1015 德才论 (25 分)
查看>>
1016 部分A+B (15 分)
查看>>
1017 A除以B (20 分)
查看>>
1019 数字黑洞 (20 分)
查看>>
1032 挖掘机技术哪家强 (20 分)
查看>>
今夕何夕 HDU - 6112 ( 模拟 )
查看>>
Dividing HDU - 1059 ( 多重背包 - 二进制简化 )
查看>>
Robberies HDU - 2955 ( 0-1背包 )
查看>>
FATE HDU - 2459 ( 二维完全背包 )
查看>>
B. Working out CodeForces - 429B (动态规划)
查看>>
10635 - Prince and Princess UVA-10635 (最长公共子序列的O(nlogn)的解法:LCS转换为LIS)
查看>>
Sizeof和Strlen
查看>>
lower_bound和upper_bound
查看>>
Subsequence POJ - 3061 ( 尺取法 )
查看>>