大数据技术 Java curator操作zookeeper获取kafka
沉沙 2018-09-27 来源 : 阅读 1140 评论 0

摘要:本篇教程探讨了大数据技术 Java curator操作zookeeper获取kafka,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术 Java curator操作zookeeper获取kafka,希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<

Java curator操作zookeeper获取kafka

Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。


Curator的Maven依赖
    
        org.apache.curator
        curator-recipes
        2.3.0
    

本地启动kafka
启动zookeeper
./bin/zookeeper-server-start.sh config/zookeeper.properties 
启动kafka
./bin/kafka-server-start.sh config/server.properties
启动kafkaManager
sudo ./bin/kafka-manager 



代码
项目结构

先看测试
package com.xinxiucan.test;

import java.util.List;

import com.xinxiucan.api.BrokerApi;
import com.xinxiucan.api.TopicApi;
import com.xinxiucan.zookeeper.ZKBean;

public class AppTest {

    public static void main(String[] args) {
        ZKBean zk = new ZKBean();
        zk.setZkURL("127.0.0.1:2181");
        List ids = BrokerApi.findBrokerIds(zk);
        for (String id : ids) {
            String idInfo = BrokerApi.findBrokerById(zk, id);
            System.out.println(idInfo);
        }

        List topicNames = TopicApi.findAllTopicName(zk);
        for (String topicName : topicNames) {
            System.out.println("topicName:" + topicName);
        }
        
        List topics = TopicApi.findAllTopics(zk);
        for (String topic : topics) {
            System.out.println("topic:" + topic);
        }

    }
}
运行结果
{"jmx_port":-1,"timestamp":"1501636761404","endpoints":["PLAINTEXT://can:9092"],"host":"can","version":3,"port":9092}
topicName:test_xin_1
topicName:xin
topic:{"version":1,"partitions":{"0":[0]}}
topic:{"version":1,"partitions":{"0":[0]}}

剩余代码
ZKBean
package com.xinxiucan.zookeeper;

import java.io.Serializable;

public class ZKBean implements Serializable {

    private static final long serialVersionUID = -6057956208558425192L;

    private int id = -1;

    private String name;

    private String zkURL;

    private String version = "0.8.2.2";

    private boolean jmxEnable;

    private String jmxAuthUsername;

    private String jmxAuthPassword;

    private boolean jmxWithSsl;

    private int zkConnectionTimeout = 30;

    private int maxRetryCount = 3;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getZkURL() {
        return zkURL;
    }

    public void setZkURL(String zkURL) {
        this.zkURL = zkURL;
    }

    public boolean isJmxEnable() {
        return jmxEnable;
    }

    public void setJmxEnable(boolean jmxEnable) {
        this.jmxEnable = jmxEnable;
    }

    public String getJmxAuthUsername() {
        return jmxAuthUsername;
    }

    public void setJmxAuthUsername(String jmxAuthUsername) {
        this.jmxAuthUsername = jmxAuthUsername;
    }

    public String getJmxAuthPassword() {
        return jmxAuthPassword;
    }

    public void setJmxAuthPassword(String jmxAuthPassword) {
        this.jmxAuthPassword = jmxAuthPassword;
    }

    public boolean isJmxWithSsl() {
        return jmxWithSsl;
    }

    public void setJmxWithSsl(boolean jmxWithSsl) {
        this.jmxWithSsl = jmxWithSsl;
    }

    public int getZkConnectionTimeout() {
        return zkConnectionTimeout;
    }

    public void setZkConnectionTimeout(int zkConnectionTimeout) {
        this.zkConnectionTimeout = zkConnectionTimeout;
    }

    public int getMaxRetryCount() {
        return maxRetryCount;
    }

    public void setMaxRetryCount(int maxRetryCount) {
        this.maxRetryCount = maxRetryCount;
    }

    public String getVersion() {
        return version;
    }

    public void setVersion(String version) {
        this.version = version;
    }

    @Override
    public int hashCode() {
        return this.id;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        if (this == obj) {
            return true;
        }
        if (obj instanceof ZKBean) {
            ZKBean anotherZKCluster = (ZKBean) obj;
            return this.id == anotherZKCluster.getId();
        } else {
            return false;
        }
    }

}
ZKConnectionFactory
package com.xinxiucan.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class ZKConnectionFactory {

    public static CuratorFramework buildCuratorFramework(ZKBean zk) {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(zk.getZkConnectionTimeout() * 1000, zk.getMaxRetryCount());
        CuratorFramework client = CuratorFrameworkFactory.newClient(zk.getZkURL(), retryPolicy);
        client.start();
        return client;
    }

}
BrokerApi
package com.xinxiucan.api;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.curator.framework.CuratorFramework;

import com.xinxiucan.KafkaZKPath;
import com.xinxiucan.zookeeper.ZKBean;
import com.xinxiucan.zookeeper.ZKConnectionFactory;

/**
 * 获取broker信息
 * @author xinxiucan
 */
public class BrokerApi {

    /**
     * 根据zk获取Broker ID列表
     * @param zkBean
     * @return
     */
    public static List findBrokerIds(ZKBean zkBean) {
        CuratorFramework client = null;
        try {
            client = ZKConnectionFactory.buildCuratorFramework(zkBean);
            List brokerIdStrs = client.getChildren().forPath(KafkaZKPath.BROKER_IDS_PATH);
            return brokerIdStrs;
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            if (client != null) {
                client.close();
            }
        }
    }

    /**
     * 根据zk&brokerId获取Broker信息
     * @param zkBean
     * @return
     */
    public static String findBrokerById(ZKBean zkBean, String id) {
        CuratorFramework client = null;
        try {
            client = ZKConnectionFactory.buildCuratorFramework(zkBean);
            String brokerIdPath = KafkaZKPath.getBrokerSummeryPath(id);
            String brokerInfo = new String(client.getData().forPath(brokerIdPath), "UTF-8");
            return brokerInfo;
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            if (client != null) {
                client.close();
            }
        }
    }

    public static Map findAllBrokerSummarys(ZKBean zkBean) {
        CuratorFramework client = null;
        try {
            client = ZKConnectionFactory.buildCuratorFramework(zkBean);
            List brokerIds = client.getChildren().forPath(KafkaZKPath.BROKER_IDS_PATH);
            if (brokerIds == null || brokerIds.size() == 0) {
                return null;
            }
            Map brokerMap = new HashMap();
            for (String brokerId : brokerIds) {
                String brokerIdPath = KafkaZKPath.getBrokerSummeryPath(brokerId);
                String brokerInfo = new String(client.getData().forPath(brokerIdPath), "UTF-8");
                brokerMap.put(brokerId, brokerInfo);
            }
            return brokerMap;
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            if (client != null) {
                client.close();
            }
        }
    }

}
TopicApi
package com.xinxiucan.api;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;

import com.xinxiucan.KafkaZKPath;
import com.xinxiucan.ReplicasAssignmentBuilder;
import com.xinxiucan.util.Object2Array;
import com.xinxiucan.zookeeper.ZKBean;
import com.xinxiucan.zookeeper.ZKConnectionFactory;


/**
 * 获取Topic信息
 * @author can
 */
public class TopicApi {

    /**
     * 创建Topic
     * @param zkBean
     * @param topic
     * @param partitionsCount
     * @param replicationFactorCount
     * @param topicConfig
     */
    public static void createTopic(ZKBean zkBean, String topic, int partitionsCount, int replicationFactorCount,
            Map topicConfig) {
        CuratorFramework client = null;
        try {
            client = ZKConnectionFactory.buildCuratorFramework(zkBean);
            List brokerIds = client.getChildren().forPath(KafkaZKPath.BROKER_IDS_PATH);
            List ids = new ArrayList<>();
            for (String str : brokerIds) {
                int i = Integer.parseInt(str);
                ids.add(i);
            }
            Map<String, List> replicasAssignment = ReplicasAssignmentBuilder.assignReplicasToBrokers(ids, partitionsCount,
                    replicationFactorCount);
            Map topicSummaryMap = new HashMap();
            topicSummaryMap.put("version", 1);
            topicSummaryMap.put("partitions", replicasAssignment);
            byte[] topicSummaryData = Object2Array.objectToByteArray(topicSummaryMap);
            String topicSummaryPath = KafkaZKPath.getTopicSummaryPath(topic);
            Stat stat = client.checkExists().forPath(topicSummaryPath);
            if (stat == null) {
                // create
                client.create().creatingParentsIfNeeded().forPath(topicSummaryPath, topicSummaryData);
            } else {
                // update
                client.setData().forPath(topicSummaryPath, topicSummaryData);
            }

            Map topicConfigMap = new HashMap();
            topicConfigMap.put("version", 1);
            topicConfigMap.put("config", topicConfig);
            byte[] topicConfigData = Object2Array.objectToByteArray(topicConfigMap);

            String topicConfigPath = KafkaZKPath.getTopicConfigPath(topic);
            Stat stat2 = client.checkExists().forPath(topicConfigPath);
            if (stat2 == null) {
                // create
                client.create().creatingParentsIfNeeded().forPath(topicConfigPath, topicConfigData);
            } else {
                // update
                client.setData().forPath(topicConfigPath, topicConfigData);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            if (client != null) {
                client.close();
            }
        }
    }

    /**
     * 删除Topic
     * @param zkBean
     * @param topicName
     */
    public static void deleteTopic(ZKBean zkBean, String topicName) {
        CuratorFramework client = null;
        try {
            client = ZKConnectionFactory.buildCuratorFramework(zkBean);
            String adminDeleteTopicPath = KafkaZKPath.getDeleteTopicPath(topicName);
            client.create().creatingParentsIfNeeded().forPath(adminDeleteTopicPath, null);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            if (client != null) {
                client.close();
            }
        }
    }

    /**
     * 查询topic
     * @param zkBean
     * @return
     */
    public static List findAllTopics(ZKBean zkBean) {
        CuratorFramework client = null;
        try {
            client = ZKConnectionFactory.buildCuratorFramework(zkBean);
            List topicNames = client.getChildren().forPath(KafkaZKPath.BROKER_TOPICS_PATH);
            List topicSummarys = new ArrayList();
            for (String topicName : topicNames) {
                byte[] topicData = client.getData().forPath(KafkaZKPath.getTopicSummaryPath(topicName));
                topicSummarys.add(new String(topicData, "UTF-8"));
            }
            // add delete flag
            List deleteTopics = client.getChildren().forPath(KafkaZKPath.ADMIN_DELETE_TOPICS_PATH);
            return topicSummarys;
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            if (client != null) {
                client.close();
            }
        }
    }

    public static List findAllTopicName(ZKBean zkBean) {
        CuratorFramework client = null;
        try {
            client = ZKConnectionFactory.buildCuratorFramework(zkBean);
            return client.getChildren().forPath(KafkaZKPath.BROKER_TOPICS_PATH);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            if (client != null) {
                client.close();
            }
        }
    }

    
}
KafkaZKPath
package com.xinxiucan;

public class KafkaZKPath {

    public static final String COMSUMERS_PATH = "/consumers";
    public static final String BROKER_IDS_PATH = "/brokers/ids";
    public static final String BROKER_TOPICS_PATH = "/brokers/topics";
    public static final String CONFIG_TOPICS_PATH = "/config/topics";
    public static final String CONFIG_CHANGES_PATH = "/config/changes";
    public static final String CONTROLLER_PATH = "/controller";
    public static final String CONTROLLER_EPOCH_PATH = "/controller_epoch";
    public static final String ADMIN_PATH = "/admin";
    public static final String ADMIN_REASSGIN_PARTITIONS_PATH = "/admin/reassign_partitions";
    public static final String ADMIN_DELETE_TOPICS_PATH = "/admin/delete_topics";
    public static final String ADMIN_PREFERED_REPLICA_ELECTION_PATH = "/admin/preferred_replica_election";

    public static String getTopicSummaryPath(String topic) {
        return BROKER_TOPICS_PATH + "/" + topic;
    }

    public static String getTopicPartitionsStatePath(String topic,String partitionId) {
        return getTopicSummaryPath(topic) + "/partitions/" + partitionId+ "/state";
    }

    public static String getTopicConfigPath(String topic) {
        return CONFIG_TOPICS_PATH + "/" + topic;
    }

    public static String getDeleteTopicPath(String topic) {
        return ADMIN_DELETE_TOPICS_PATH + "/" + topic;
    }
    
    public static String getBrokerSummeryPath(String id) {
        return BROKER_IDS_PATH + "/" + id;
    }
    
    public static String getConsumerOffsetPath(String groupId) {
        return  COMSUMERS_PATH + "/" + groupId +"/offsets";
    }
    
    public static String getTopicConsumerOffsetPath(String groupId,String topic){
        return  COMSUMERS_PATH + "/" + groupId +"/offsets/"+topic;
    }
    
    public static String getTopicPartitionConsumerOffsetPath(String groupId, String topic,String partitionId){
        return  COMSUMERS_PATH + "/" + groupId +"/offsets/"+topic + "/" + partitionId;
    }
    
    public static String getTopicPartitionConsumerOwnerPath(String groupId, String topic, String partitionId) {
        return  COMSUMERS_PATH + "/" + groupId +"/owners/"+topic + "/" + partitionId;
    }
    
    
}
ReplicasAssignmentBuilder
package com.xinxiucan;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

public class ReplicasAssignmentBuilder {

    public static Map<String, List> assignReplicasToBrokers(List brokerListSet, int nPartitions, int nReplicationFactor) {
        return assignReplicasToBrokers(brokerListSet, nPartitions, nReplicationFactor, -1, -1);
    }

    /**
     * There are 2 goals of replica assignment:
     * 1. Spread the replicas evenly among brokers.
     * 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
     *
     * To achieve this goal, we:
     * 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
     * 2. Assign the remaining replicas of each partition with an increasing shift.
     *
     * Here is an example of assigning
     * broker-0  broker-1  broker-2  broker-3  broker-4
     * p0        p1        p2        p3        p4       (1st replica)
     * p5        p6        p7        p8        p9       (1st replica)
     * p4        p0        p1        p2        p3       (2nd replica)
     * p8        p9        p5        p6        p7       (2nd replica)
     * p3        p4        p0        p1        p2       (3nd replica)
     * p7        p8        p9        p5        p6       (3nd replica)
     */
    public static Map<String, List> assignReplicasToBrokers(List brokerListSet, int nPartitions, int nReplicationFactor,
            int fixedStartIndex, int startPartitionId) {
        // sort
        brokerListSet.sort(new Comparator() {

            @Override
            public int compare(Integer o1, Integer o2) {
                return o1 - o2;
            }

        });
        if (nPartitions <= 0) {
            throw new RuntimeException("num of partition cannot less than 1");
        }
        if (nReplicationFactor <= 0) {
            throw new RuntimeException("num of replication factor cannot less than 1");
        }
        if (nReplicationFactor > brokerListSet.size()) {
            throw new RuntimeException("broker num is less than replication factor num");
        }

        Random random = new Random();

        Map<String, List> result = new HashMap<String, List>();
        int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : random.nextInt(brokerListSet.size());
        int currentPartitionId = startPartitionId >= 0 ? startPartitionId : 0;
        int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : random.nextInt(brokerListSet.size());

        for (int i = 0; i < nPartitions; i++) {
            if (currentPartitionId > 0 && ((currentPartitionId % brokerListSet.size()) == 0)) {
                nextReplicaShift++;
            }
            int firstReplicaIndex = (currentPartitionId + startIndex) % brokerListSet.size();
            List replicaList = new ArrayList();
            replicaList.add(brokerListSet.get(firstReplicaIndex));
            for (int j = 0; j < (nReplicationFactor - 1); j++) {
                replicaList.add(brokerListSet.get(getReplicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerListSet.size())));
            }
            result.put("" + currentPartitionId, getReverseList(replicaList));
            currentPartitionId++;
        }

        return result;
    }

    private static List getReverseList(List list) {
        List result = new ArrayList();
        for (int i = list.size() - 1; i >= 0; i--) {
            result.add(list.get(i));
        }
        return result;
    }

    private static int getReplicaIndex(int firstReplicaIndex, int nextReplicaShift, int replicaIndex, int nBrokers) {

        int shift = 1 + (nextReplicaShift + replicaIndex) % (nBrokers - 1);
        int result = (firstReplicaIndex + shift) % nBrokers;
        // System.out.println(firstReplicaIndex+"|"+nextReplicaShift+"|"+replicaIndex+"="+result);
        return result;
    }

    private static void printAssignReplicas(Map<String, List> map, List brokerListSet) {
        Map<Integer, List> brokerMap = new HashMap<Integer, List>();
        for (int i = 0; i < brokerListSet.size(); i++) {
            List partitions = new ArrayList();
            for (int j = 0; j < map.size(); j++) {
                List brokerIds = map.get(j + "");
                if (brokerIds.contains(i)) {
                    partitions.add("" + j);
                }
            }
            brokerMap.put(i, partitions);
        }
    }

}

   

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程