【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解

时间:2021-09-17 07:08:10

ZooKeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解


ZooKeeper介绍

  • 概述

    ZooKeeper是一个分布式协调服务开源框架,主要用来解决分布式集群中应用系统的一致性问题。例如怎样避免同时操作同一数据造成脏读的问题

    ZooKeeper本质上是一个分布式的小文件存储系统(ZooKeeper上面的每个文件内容最好不要超过1M),提供基于类似文件系统的目录树方式的数据存储,并且可以对树中的节点进行有效管理,从而用来维护监控你存储的数据的状态变化,通过监控这些数据状态的变化,从而可以达到基于数据的集群管理。比如:统一命名服务(dubbo)分布式配置管理(solr的配置集中管理)分布式消息队列(sub/pub)分布式锁分布式协调等功能。
  • 架构图

    Leader:ZooKeeper集群工作的核心=。事务请求(写操作)的唯一调度处理者保证集群事务处理的顺序性,集群内部各个服务器的调度者。(对于create,setData,delete等有写操作的请求,则需要统一转发给leader处理,leader需要决定编号、执行操作,这个过程称为一个事务)

    Follower:处理客户端非事务(读操作)请求,转发事务请求给Leader,参与集群Leader选举投票,2n-1台可以做集群投票。此外,针对访问量较大的ZooKeeper集群,还可新增观察者角色。

    Observer:观察者角色。观察ZooKeeper集群的最新状态变化并将这些状态同步过来,其对于非事务请求可以进行独立处理,对于事务请求则会转发给Leader服务器进行处理。(说白了就是增加并发的读请求)

【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解


ZooKeeper基本特性

  • 全局数据一致

    最重要的特性。每个server保存一份相同的数据副本,client无论连接到哪个server,展示的数据都是一致的。
  • 可靠性

    如果消息被其中一台服务器接收,那么将被所有服务器接收。
  • 顺序性

    包括全局有序偏序两种:全局有序是指如果在一台服务器上,消息a在消息b前发布,则在所有server上消息a都将在消息b前被发布;偏序是指如果一个消息b在消息a后被同一个发送者发布,消息a必将排在消息b前面。
  • 数据更新原子性

    一次数据更新要么成功(半数以上节点成功,要么失败,不存在中间状态。
  • 实时性

    ZooKeeper保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。

三台机器ZooKeeper的集群环境搭建

  • 概述

    ZooKeeper集群搭建指的是ZooKeeper分布式模式安装,通常由2n+1台servers组成。这是为了保证Leader选举(基于Paxos算法的实现)能够得到多数的支持,所以ZooKeeper集群的数量一般为奇数。

    ZooKeeper运行需要Java环境,所以需要提前安装jdk

  • 安装Leader+Follower模式集群的步骤

    第一步:下载ZooKeeper压缩包。下载网址:http://archive.apache.org/dist/zookeeper/. 下载完成后,上传到/export/softwares

    第二步:解压ZooKeeper到/export/servers下

cd /export/softwares

tar -zxvf zookeeper-3.4.9.tar.gz -C /export/servers/

  第三步:修改配置文件-----------进入到ZooKeeper的conf文件夹中,备份zoo_sample.cfg,创建zkdatas文件夹,编辑zoo.cfg

cd /export/servers/zookeeper-3.4.9/conf/

cp zoo_sample.cfg zoo.cfg

mkdir -p /export/servers/zookeeper-3.4.9/zkdatas/

vim zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/export/servers/zookeeper-3.4.9/zkdatas ->路径修改为刚刚创建的文件夹
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3 ->删掉#,打开
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1 ->删掉#,打开 server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888 ->添加三行

  第四步:添加myid配置

在第一台机器/export/servers/zookeeper-3.4.9/zkdatas/创建一个文件,名为myid,文件内容为1
echo 1 > /export/servers/zookeeper-3.4.9/zkdatas/myid

  第五步:安装包分发并修改myid的值

在第一台机器上执行
scp -r /export/servers/zookeeper-3.4.9/ node02:/export/servers/
scp -r /export/servers/zookeeper-3.4.9/ node03:/export/servers/
在第二台机器上修改myid值为2
echo 2 > /export/servers/zookeeper-3.4.9/zkdatas/myid
在第三台机器上修改myid值为3
echo 3 > /export/servers/zookeeper-3.4.9/zkdatas/myid

  第六步:三台机器启动ZooKeeper

三台机器都要执行
/export/servers/zookeeper-3.4.9/bin/zkServer.sh start 查看启动状态
/export/servers/zookeeper-3.4.9/bin/zkServer.sh status

ZooKeeper的Shell操作

  • 客户端连接

    运行 zkCli.sh - server ip 进入命令行工具

    输入 help,输出 zk shell 提示

    【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解
  • Shell操作

    创建节点    create [-s] [-e] path data acl

    其中,-s-e 分别指定节点特性、顺序或临时节点,若不指定,则表示持久节点。acl用来进行权限控制。

    创建永久节点

    【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解

    创建永久顺序节点

    【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解

    创建临时节点(客户端与服务端断开连接时,临时节点消失)

    【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解

    创建临时顺序节点

    【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解

  读取节点    ls path [watch]

           get path [watch]

           ls2 path [watch]

   与读取相关的命令有 ls 命令和 get 命令。

   ls 命令可以列出ZooKeeper指定节点下的所有子节点,只能查看指定节点下的第一级的所有子节点;

   get 命令可以获取ZooKeeper指定节点的数据内容和属性信息。

  更新节点    set path data [version]

   data就是要更新的新内容,version表示数据版本。

  删除节点    delete path data [version]

   若删除节点存在子节点,那么无法删除该节点,必须先删除子节点,再删除父节点。

   rmr path:可以递归删除节点


ZooKeeper的数据模型

  • 概述

    ZooKeeper当中每一个节点都称之为一个znode,每个znode既具有文件夹的特性,又具有文件的特性(临时节点不能有子节点)

数据结构

【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解

图中每一个节点称为一个Znode,每个Znode由3部分组成

1.stat:此为状态信息,描述该Znode的版本、权限等信息

2.data:与该Znode关联的数据

3.children:该Znode下的子节点


ZooKeeper节点的四种模型

  • PERSISTENT:永久节点
  • EPHEMERAL:临时节点
  • PERSISTENT_SEQUENTIAL:永久节点、序列化
  • EPHEMERAL_SEQUENTIAL:临时节点、序列化

ZooKeeper的watch机制

  • 概述

    ZooKeeper允许客户端向服务端注册一个 Watcher 监听,当服务端的一些事情触发了这个 Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。

    触发事件种类有很多,比如节点创建节点删除节点改变子节点改变等。

    总的来说,可以概括 Watcher 为以下三个过程客户端向服务端注册 Watcher服务端事件发生触发 Warcher客户端回调 Watcher 得到触发事件情况

  • watch机制特点

    一次性触发:事件发生触发监听,一个 Watcher Event 就会被发送到设置监听的客户端,这种效果是一次性的,后续再发生同样的事件不会再触发。

    事件封装:ZooKeeper使用 WatchedEvent 对象来封装服务端事件并传递。WatchedEvent 包含了每一个事件的三个基本属性:通知状态(KeeperState)、事件类型(EventType)和节点路径(Path)。

    event 异步发送:Watcher 的通知事件从服务端发送到客户端是异步的。

    先注册再触发:ZooKeeper中的 watch 机制,必须客户端先去服务端注册监听,这样事件发送才会触发监听,通知给客户端

  • 通知状态和事件类型

    【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解

  • Shell客户端设置watch机制

    设置节点数据变动监听

    【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解

    通过另一个客户端更改节点数据

    【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解

    此时设置的监听节点收到通知

    【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解


ZooKeeper的JavaAPI

  • 导包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bigdata13</artifactId>
<groupId>cn.itcast.bigdata13</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion> <artifactId>day01_zk</artifactId> <dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.google.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!--Java编译插件-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
  • 在IDEA关于节点的操作
package cn.itcast.zk.demo1;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.jupiter.api.Test; public class zkOperate {
//以下方法
}

创建永久节点


/*
* 创建一个永久节点
* */
@Test
public void createNode() throws Exception {
//定义我们的重试机制
ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000,3); //得到客户端
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181",exponentialBackoffRetry); curatorFramework.start(); curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/bigdata/hello/abc","helloworld".getBytes()); curatorFramework.close();
}

创建临时节点

	/*创建一个临时节点
* */
@Test
public void createTempNode() throws Exception {
ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(5000,5); CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181",exponentialBackoffRetry); curatorFramework.start(); curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/mytempNodenew","tempNode".getBytes()); Thread.sleep(8000); curatorFramework.close();
}

修改节点数据

	/*
修改节点数据
*/
@Test
public void updateNodeData() throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node01:2181",new ExponentialBackoffRetry(5000,5)); curatorFramework.start(); curatorFramework.setData().forPath("/abc","bbb".getBytes()); curatorFramework.close();
}

查询节点数据

	/**
* 查询节点数据
*/
@Test
public void getData() throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node01:2181",new ExponentialBackoffRetry(5000,5)); curatorFramework.start(); byte[] bytes = curatorFramework.getData().forPath("/bigdata/hello/abc");
String s = new String(bytes);
System.out.println(s); curatorFramework.close();
}

建立watch机制

/**
* 建立watch机制
*/
@Test
public void watchNode() throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node01:2181",new ExponentialBackoffRetry(5000,5)); curatorFramework.start(); //通过使用TreeCache来监听我们的节点
TreeCache treeCache = new TreeCache(curatorFramework,"/abc");
treeCache.getListenable().addListener(new TreeCacheListener() {
/**
* 这个方法里面实现我们监听的逻辑,所有的监听事件都会回调这个方法
* @param curatorFramework
* @param event
* @throws Exception
*/
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {
ChildData data =event.getData();
if (null != data){
//这个type封装的是我们的事件,比如节点新增,节点修改,节点删除等各种类型的事件
TreeCacheEvent.Type type =event.getType();
switch (type){
case NODE_ADDED:
System.out.println("节点新增被我监听到了");
break;
case INITIALIZED:
System.out.println("初始化操作我监听到了");
break;
case NODE_REMOVED:
System.out.println("节点删除被我监听到了");
break;
case NODE_UPDATED:
System.out.println("节点的修改操作被我监听到了");
break;
default:
System.out.println("没监听到");
break;
} }
}
});
treeCache.start();
Thread.sleep(500000000);
}

网络编程

  • 概述

    网络编程是指用来实现网络互联的不同计算机上运行的程序间可以进行数据交换。对我们来说即如何用编程语言java实现计算机网络中不同计算机之间的通信。
  • 网络通信三要素

    IP地址

    A类:第1个8位表示网络地址,剩下3个8位表示主机地址(主要留给*或者大型企业)

    B类:第2个8位表示网络地址,剩下2个8位表示主机地址(主要分配给中等规模的公司)

    C类:第3个8位表示网络地址,剩下1个8位表示主机地址(分配给小公司或者个人)

    D类:用于IP网络中的组播

    E类:保留作研究用

    端口号

    用于标识进程的逻辑地址,不同进程的标识

    有效端口:0-65535,其中 0-1024 系统使用或保留端口

    传输协议

    通讯的规则

    常见协议:UDP(用户数据报协议)、TCP(传输控制协议)

    【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解
  • 网络模型

    网络模型一般是指OSI七层参考模型和TCP/IP五层参考模型

    【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解

    【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解
  • Socket机制

    概述:又称为套接字,用于描述 IP 地址和端口。应用程序通常通过 Socket 向网络发出请求或者应答网络请求。通信两端都有 Socket,数据在两个 Socket 之间通过 IO 传输。网络编程也称为 Socket 编程、套接字编程,Socket 通信是 Client/Server 模型。

    基于UDP协议的Socket通信

    核心类:DatagramSocket

    基于TCP协议的Socket通信
  • IO 通信模型

    只要有 IO 就会有阻塞或非阻塞的问题,无论这个 IO 是网络的,还是硬盘的。原因在于程序是运行在系统之上的,任何形式的IO操作发起都需要系统内核的支持。

    【Hadoop离线基础总结】zookeeper的介绍以及集群环境搭建、网络编程和RPC的简单了解
  • BIO(阻塞模式)

    BIO 即 blocking IO,是一种阻塞的 IO。BIO 的问题在于 accept()、read()的操作点都是阻塞的。

    服务器线程发起一个 accept 动作,询问操作系统是否有新的 Socket 信息从端口 X 发送过来。注意,是询问操作系统。如果操作系统没有发现有 Socket 从指定的端口 X 来,那么操作系统就会等到。
  • NIO(非阻塞模式)

    NIO 即 non-blocking IO,是一种非阻塞的 IO。三大核心部分为:Channel(通道)、Buffer(缓冲区)、Selector(选择器)
  • 阻塞/非阻塞 同步/非同步

    阻塞和非阻塞这两个概念是程序级别的,主要研究的是如果 IO 资源没有准备好,那么程序该如何处理问题

    同步和非同步这两个概念是操作系统级别的,主要研究的是操作系统收到程序请求 IO 操作后,如果 IO 资源没有准备好,该如何响应程序的问题

RPC(Remote Procedure Call Protocol)

  • 概述

    远程过程调用协议。简单地说,就是客户端在不知道调用细节的情况下,调用存在于远程计算机上的某个过程或函数,就像调用本地应用程序中的意义
  • 主要特质

    网络协议和网络 IO 对其透明

    信息格式对其透明

    跨语言能力
  • RPC原理

    实现 RPC 的程序包括5个部分:UserUser-stubRPCRuntimeServer-stubServer