大数据技术全面解读 基础部分之zookeeper(下)
沉沙 2018-10-10 来源 : 阅读 1741 评论 0

摘要:本篇教程探讨了大数据技术全面解读 基础部分之zookeeper(下),希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

本篇教程探讨了大数据技术全面解读 基础部分之zookeeper(下),希望阅读本篇文章以后大家有所收获,帮助大家对大数据技术的理解更加深入。

<

一、集群自启动脚本

  1.关闭zk

[root@localhost bin]# jps
3104 Jps
2805 QuorumPeerMain
[root@localhost bin]# kill -9 2805

  //kill或者stop都是可以的

  2.远程执行命令


[root@localhost bin]# ssh 192.168.137.138 /opt/zookeeper/zookeeper-3.4.5/bin/zkServer.sh start
root@192.168.137.138's password: 
JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.4.5/bin/../conf/zoo.cfg
Starting zookeeper ... /opt/zookeeper/zookeeper-3.4.5/bin/zkServer.sh: 第 103 行:[: /tmp/zookeeper: 期待二元表达式
STARTED



  出现以上中文的地方只需要修改一下zoo.cfg,把多余的配置注释即可!

  当然,这样还是无法启动!因为ssh过去是以一个bash的方式过去的(不会执行/etc/profile,而正常登录是会执行的,也可以打开相应的脚本进行查看),也就是PATH不在了,导致JAVA_HOME等找不到了!

  EXPORT所定义的变量对自己所在的shell以及子shell生效

  这里就需要用到之前说到的source命令了:https://www.cns.com/pkufork/p/linux_source.html

ssh 192.168.137.138 "source /etc/profile&&/opt/zookeeper/zookeeper-3.4.5/bin/zkServer.sh start"

  //如果不使用引号,将会以空格作为命令的分割!

  3.配置免密登录

  在其中一台机器上(这里是192.168.137.128)

ssh-keygen

  //之后enter即可

ssh-copy-id 192.168.137.128
ssh-copy-id 192.168.137.138
ssh-copy-id 192.168.137.148

  4.一键启动脚本

cd /root
mkdir bin
cd bin
vim startZK.sh

 


#!/bin/bash
SERVERS="192.168.137.128 192.168.137.138 192.168.137.148"
echo "start zk..."
for SERVER in $SERVERS
do
    ssh $SERVER "source /etc/profile&&/opt/zookeeper/zookeeper-3.4.5/bin/zkServer.sh start"
done



chmod +x startZK.sh 

  这样,通过startZK.sh就能一键启动了!(/root/bin默认在PATH中了!)
二、zk的Java客户端



   1.引入maven依赖

        
            org.apache.zookeeper
            zookeeper
            3.4.6
        
    

  2.测试程序是否能通


public class SimpleZK {
    // 设置连接字符串(可以用逗号隔开多个),失败时会自动尝试多个
    private static final String coonectString = "192.168.137.128:2181,192.168.137.138:2181,192.168.137.148:2181";
    // 超时时间
    private static final int sessionTimeout = 2000;
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        ZooKeeper zkClient = new ZooKeeper(coonectString, sessionTimeout, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                // 收到通知事件后的回调函数
                System.out.println(watchedEvent.getType()+"---"+watchedEvent.getPath());
            }
        });
        // zk的增删改查(这里使用最底层的原生操作,zkclient的待补充)
        String node = zkClient.create("/app2", "hellozk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

    }

}



  完美:

    

  3.增删改查实例


package com.zk;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;


import java.io.IOException;
import java.util.List;

/**
 * 测试zk的Java客户端
 *
 * @author zcc ON 2018/1/17
 **/
public class SimpleZK {
    // 设置连接字符串(可以用逗号隔开多个),失败时会自动尝试多个
    private static final String coonectString = "192.168.137.128:2181,192.168.137.138:2181,192.168.137.148:2181";
    // 超时时间
    private static final int sessionTimeout = 2000;
    // 初始变量
    ZooKeeper zkClient = null;

    /**
     * 初始化方法
     * @throws IOException
     */
    @Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(coonectString, sessionTimeout, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                // 收到通知事件后的回调函数
                System.out.println(watchedEvent.getType()+"---"+watchedEvent.getPath());
                // 开启循环监听(监听调用此方法,此方法又开启监听)
                try {
                    zkClient.getChildren("/", true);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    /**
     * 创建
     */
    public void testCreate() throws IOException, KeeperException, InterruptedException {
        String node = zkClient.create("/app2", "hellozk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    /**
     * 判断是否存在节点
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void testExists() throws KeeperException, InterruptedException {
        // stat就是zk中那一堆数据(null则不存在)
        Stat stat = zkClient.exists("/", false);
    }
    /**
     * 取得子节点
     */
    @Test
    public void testGetChildren() throws KeeperException, InterruptedException {
        List children = zkClient.getChildren("/", true);
        for (String child : children) {
            System.out.println(child);
        }
    }

    /**
     * 获取数据
     */
    @Test
    public void testGetData() throws KeeperException, InterruptedException {
        // 分别是路径,是否监听以及状态版本(null就OK了,取最新)
        byte[] data = zkClient.getData("/app2", false, null);
        System.out.println(new String(data));
    }

    /**
     * 删除数据
     */
    @Test
    public void testDeleteZnode() throws KeeperException, InterruptedException {
        // -1作为版本号参数表示删除所有版本(上层的API是不用传这些参数的)
        zkClient.delete("/app2",-1);
    }

    /**
     * 修改数据
     */
    public void testSetData() throws KeeperException, InterruptedException {
        zkClient.setData("/app2", "hellozkCli".getBytes(), -1);
    }
}


三、应用实例

  1.分布式服务器动态上下线感知(主节点HA)

  大致原理:

    

  大致流程比较清晰,当服务器启动时就去注册信息(例如给出主机与id等信息),给出一个短暂的带序列号的临时节点,这样服务器下线的时候节点便被删除了;而客户端则是去zk获取子节点信息,得到服务器列表并且注册监听,这样当有节点发生改变时变可以感知变化了!

  2.Java代码



  服务都端:


package com.zk;

import org.apache.zookeeper.*;

import java.io.IOException;

/**
 * 分布式服务器动态感知——服务端
 *
 * @author zcc ON 2018/1/17
 **/
public class DistributedServer {

    private static final String coonectString = "192.168.137.128:2181,192.168.137.138:2181,192.168.137.148:2181";
    private static final int sessionTimeout = 2000;
    private static final String parentNode = "/servers";

    private ZooKeeper zk = null;

    /**
     * 获得连接
     * @throws IOException
     */
    public void getConn() throws IOException {
        zk = new ZooKeeper(coonectString, sessionTimeout, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                // 收到通知事件后的回调函数
                System.out.println(watchedEvent.getType()+"---"+watchedEvent.getPath());
                // 开启循环监听(监听调用此方法,此方法又开启监听)
                try {
                    zk.getChildren("/", true);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    /**
     * 注册服务器
     */
    public void registServer(String hostname) throws KeeperException, InterruptedException {
        // 临时有编号的节点,可以重名
        String znode = zk.create(parentNode + "/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname+" is online..."+znode);
    }

    /**
     * 业务逻辑
     */
    public void handleBI(String hostname) throws InterruptedException {
        System.out.println(hostname+" start working...");
        // 模拟保持不关闭状态
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {
        // 获取zk连接
        DistributedServer servers = new DistributedServer();
        servers.getConn();
        // 注册服务器
        servers.registServer(args[0]);
        // 业务逻辑
        servers.handleBI(args[0]);
    }
}



  客户端:


package com.zk;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * 分布式服务器动态感知——客户端
 *
 * @author zcc ON 2018/1/17
 **/
public class DistributedClient {
    private static final String coonectString = "192.168.137.128:2181,192.168.137.138:2181,192.168.137.148:2181";
    private static final int sessionTimeout = 2000;
    private static final String parentNode = "/servers";
    // 注意volatile的使用意义(使每个线程都得到最新值)
    private volatile List serverList;

    private ZooKeeper zk = null;

    /**
     * 获得连接
     * @throws IOException
     */
    public void getConn() throws IOException {
        zk = new ZooKeeper(coonectString, sessionTimeout, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                try {
                    // 回调事件重新更新服务器列表并注册监听
                    getServerList();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    /**
     * 业务逻辑
     */
    public void handleBI() throws InterruptedException {
        System.out.println("client working...");
        // 模拟保持不关闭状态
        Thread.sleep(Long.MAX_VALUE);
    }
    public void getServerList() throws KeeperException, InterruptedException {
        // 获取服务器子节点信息,并且对父节点监听
        List children = zk.getChildren(parentNode, true);
        // 用于存放服务器列表的List
        List servers = new ArrayList<>();
        for (String child : children) {
            // 获取数据(这里无需监听,因为连接上了不需要更换连接了)
            byte[] data = zk.getData(parentNode + "/" + child, false, null);
            servers.add(new String(data));
        }
        // 更新到成员变量,供业务线程使用
        serverList = servers;
        // 打印服务器列表
        System.out.println(serverList);
    }

    public static void main(String[] args) throws Exception {
        // 获取连接
        DistributedClient client = new DistributedClient();
        client.getConn();
        // 获取子节点信息,并监听
        client.getServerList();
        // 启动业务功能
        client.handleBI();
    }
}    

本文由职坐标整理发布,学习更多的大数据技术相关知识,请关注职坐标大技术云计算大技术技术频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved