大数据技术全面解读 storm上游数据源 之kafka详解(二)常用命令
沉沙 2018-10-10 来源 : 阅读 1147 评论 0

摘要:本篇教程探讨了大数据技术全面解读 storm上游数据源 之kafka详解(二)常用命令,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术全面解读 storm上游数据源 之kafka详解(二)常用命令,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<

一、kafka常用命令

  1.创建topic

bin/kafka-topics.sh --create --topic topic_1 --partitions 4 --replication-factor 2 --zookeeper mini1:2181

  // 如果配置了PATH可以省略相关命令路径,相关命令参数暂不深入,字面意思也可以大概推断。后续给出完整参数参考。

  2.查看所有topic

bin/kafka-topics.sh --list --zookeeper  mini1:2181

  3.生产者发送消息

bin/kafka-console-producer.sh --broker-list mini1:9092 --topic topic_1

  4.消费者消费消息

bin/kafka-console-consumer.sh --zookeeper mini1:2181 --from-beginning --topic topic_1

  // 可以在Mini2上消费,是分布式的

  5.删除topic

bin/kafka-topics.sh --delete --zookeeper mini1:2181 --topic topic_1
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

  6.查看topic详情

bin/kafka-topics.sh --topic topic_1 --describe --zookeeper mini1:2181

二、JavaAPI

   1.引入依赖




    org.apache.kafka
    kafka_2.12
    0.11.0.2




  2.基本topic的操作

    基本对应命令:

      参考:https://www.cns.com/huxi2b/p/6592862.html

  3.生产者与消费者

    以下的很多配置,都在kafka的3个配置里,详情参考入门篇。

    生产者:

 


package cn.itcast.storm.kafka.simple;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;
import java.util.UUID;

/**
 * 这是一个简单的Kafka producer代码
 * 包含两个功能:
 * 1、数据发送
 * 2、数据按照自定义的partition策略进行发送
 *
 *
 * KafkaSpout的类
 */
public class KafkaProducerSimple {
    public static void main(String[] args) {
        /**
         * 1、指定当前kafka producer生产的数据的目的地
         *  创建topic可以输入以下命令,在kafka集群的任一节点进行创建。
         *  bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test
         */
        String TOPIC = "orderMq";
        /**
         * 2、读取配置文件
         */
        Properties props = new Properties();
        /*
         * key.serializer.class默认为serializer.class
         */
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        /*
         * kafka broker对应的主机,格式为host1:port1,host2:port2
         */
        props.put("metadata.broker.list", "kafka01:9092,kafka02:9092,kafka03:9092");
        /*
         * request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1
         * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。
         * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
         * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。
         * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。
         * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。
         * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。
         * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失
         */
        props.put("request.required.acks", "1");
        /*
         * 可选配置,如果不配置,则使用默认的partitioner partitioner.class
         * 默认值:kafka.producer.DefaultPartitioner
         * 用来把消息分到各个partition中,默认行为是对key进行hash。
         */
        props.put("partitioner.class", "cn.itcast.storm.kafka.MyLogPartitioner");
//        props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
        /**
         * 3、通过配置文件,创建生产者
         */
        Producer producer = new Producer(new ProducerConfig(props));
        /**
         * 4、通过for循环生产数据
         */
        for (int messageNo = 1; messageNo < 100000; messageNo++) {
//            String messageStr = new String(messageNo + "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey," +
//                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
//                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
//                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
//                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
//                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
//                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
//                    "注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发" +
//                    "用来配合自定义的MyLogPartitioner进行数据分发");

            /**
             * 5、调用producer的send方法发送数据
             * 注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发
             */
            producer.send(new KeyedMessage(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "itcast"));
        }
    }
}



 

  消费者:


package cn.itcast.storm.kafka.simple;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class KafkaConsumerSimple implements Runnable {
    public String title;
    public KafkaStream stream;
    public KafkaConsumerSimple(String title, KafkaStream stream) {
        this.title = title;
        this.stream = stream;
    }
    @Override
    public void run() {
        System.out.println("开始运行 " + title);
        ConsumerIterator it = stream.iterator();
        /**
         * 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞
         * 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false
         * */
        while (it.hasNext()) {
            MessageAndMetadata data = it.next();
            String topic = data.topic();
            int partition = data.partition();
            long offset = data.offset();
            String msg = new String(data.message());
            System.out.println(String.format(
                    "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]",
                    title, topic, partition, offset, msg));
        }
        System.out.println(String.format("Consumer: [%s] exiting ...", title));
    }

    public static void main(String[] args) throws Exception{
        Properties props = new Properties();
        props.put("group.id", "dashujujiagoushi");
        props.put("zookeeper.connect", "zk01:2181,zk02:2181,zk03:2181");
        props.put("auto.offset.reset", "largest");
        props.put("auto.commit.interval.ms", "1000");
        props.put("partition.assignment.strategy", "roundrobin");
        ConsumerConfig config = new ConsumerConfig(props);
        String topic1 = "orderMq";
        String topic2 = "paymentMq";
        //只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出
        ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);
        //定义一个map
        Map topicCountMap = new HashMap<>();
        topicCountMap.put(topic1, 3);
        //Map<String, List<KafkaStream> 中String是topic, List<KafkaStream是对应的流
        Map<String, List<KafkaStream>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
        //取出 `kafkaTest` 对应的 streams
        List<KafkaStream> streams = topicStreamsMap.get(topic1);
        //创建一个容量为4的线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        //创建20个consumer threads
        for (int i = 0; i < streams.size(); i++)
            executor.execute(new KafkaConsumerSimple("消费者" + (i + 1), streams.get(i)));
    }
}



  自定义分区:


package cn.itcast.storm.kafka;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
import org.apache.log4j.Logger;


public class MyLogPartitioner implements Partitioner {
    private static Logger logger = Logger.getLogger(MyLogPartitioner.class);

    public MyLogPartitioner(VerifiableProperties props) {
    }

    public int partition(Object obj, int numPartitions) {
        return Integer.parseInt(obj.toString())%numPartitions;
//        return 1;
    }

}



   很明显,上面的代码徒手写是很费劲的,这个时候,就可以请出我们的KafkaSpout来整合storm了!    

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程