This commit is contained in:
罗祥 2019-05-21 17:36:54 +08:00
parent 0dd9d8862f
commit 38c890d5e3
6 changed files with 231 additions and 59 deletions

View File

@ -143,7 +143,7 @@ TODO
## 十、Kafka
1. [Kafka 核心概念介绍](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Kafka核心概念介绍.md)
2. 基于Zookeeper搭建Kafka高可用集群
2. [基于Zookeeper搭建Kafka高可用集群](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/基于Zookeeper搭建Kafka高可用集群.md)
3. Kafka生产者详解
4. Kafka消费者详解
5. Kafka 副本机制以及选举原理剖析
@ -152,7 +152,7 @@ TODO
## 十一、Zookeeper
1. Zookeeper 简介及核心概念
2. Zookeeper集群搭建Zookeeper
2. [Zookeeper单机环境和集群环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Zookeeper单机环境和集群环境搭建.md)
3. Zookeeper分布式锁实现方案
4. 集群升级、迁移深入分析 Zookeeper
5. Zab协议及选举机制

View File

@ -10,45 +10,47 @@ object SparkSqlApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate()
val empDF = spark.read.json("/usr/file/json/emp.json")
empDF.createOrReplaceTempView("emp")
empDF.show()
empDF.select(count("ename")).show()
empDF.select(countDistinct("deptno")).show()
empDF.select(approx_count_distinct("ename", 0.1)).show()
empDF.select(first("ename"), last("job")).show()
empDF.select(min("sal"), max("sal")).show()
empDF.select(sum("sal")).show()
empDF.select(sumDistinct("sal")).show()
empDF.select(avg("sal")).show()
val deptDF = spark.read.json("/usr/file/json/dept.json")
deptDF.createOrReplaceTempView("dept")
deptDF.printSchema()
// 1.定义联结表达式
val joinExpression = empDF.col("deptno") === deptDF.col("deptno")
// 2.联结查询
empDF.join(deptDF, joinExpression).select("ename", "dname").show()
spark.sql("SELECT ename,dname FROM emp JOIN dept ON emp.deptno = dept.deptno").show()
// 总体方差 均方差 总体标准差 样本标准差
empDF.select(var_pop("sal"), var_samp("sal"), stddev_pop("sal"), stddev_samp("sal")).show()
empDF.join(deptDF, joinExpression, "outer").show()
spark.sql("SELECT * FROM emp FULL OUTER JOIN dept ON emp.deptno = dept.deptno").show()
empDF.join(deptDF, joinExpression, "left_outer").show()
spark.sql("SELECT * FROM emp LEFT OUTER JOIN dept ON emp.deptno = dept.deptno").show()
// 偏度和峰度
empDF.select(skewness("sal"), kurtosis("sal")).show()
empDF.join(deptDF, joinExpression, "right_outer").show()
spark.sql("SELECT * FROM emp RIGHT OUTER JOIN dept ON emp.deptno = dept.deptno").show()
// 计算两列的 皮尔逊相关系数 样本协方差 总体协方差
empDF.select(corr("empno", "sal"), covar_samp("empno", "sal"),
covar_pop("empno", "sal")).show()
empDF.join(deptDF, joinExpression, "left_semi").show()
spark.sql("SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno").show()
empDF.agg(collect_set("job"), collect_list("ename")).show()
empDF.join(deptDF, joinExpression, "left_anti").show()
spark.sql("SELECT * FROM emp LEFT ANTI dept ON emp.deptno = dept.deptno").show()
empDF.groupBy("deptno", "job").count().show()
spark.sql("SELECT deptno, job, count(*) FROM emp GROUP BY deptno, job").show()
empDF.groupBy("deptno").agg(count("ename").alias("人数"), sum("sal").alias("总工资")).show()
spark.sql("SELECT deptno, count(ename) ,sum(sal) FROM emp GROUP BY deptno").show()
empDF.groupBy("deptno").agg("ename"->"count","sal"->"sum").show()
/*你绝对应该使用交叉连接100确定这是你需要的。 在Spark中定义交叉连接时有一个原因需要明确。 他们很危险!
高级用户可以将会话级配置spark.sql.crossJoin.enable设置为true以便允许交叉连接而不发出警告或者Spark没有尝试为您执行另一个连接*/
empDF.join(deptDF, joinExpression, "cross").show()
spark.sql("SELECT * FROM emp CROSS JOIN dept ON emp.deptno = dept.deptno").show()
spark.sql("SELECT * FROM graduateProgram NATURAL JOIN person").show()
}
}

View File

@ -49,3 +49,12 @@
### 九、Zookeeper
1. [Zookeeper单机环境和集群环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Zookeeper单机环境和集群环境搭建.md)
### 十、Kafka
1. [基于Zookeeper搭建Kafka高可用集群](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/基于Zookeeper搭建Kafka高可用集群.md)

View File

@ -0,0 +1,57 @@
## 一、简介
## 二、 数据准备
分别创建员工和部门datafame并注册为临时视图代码如下
```scala
val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate()
val empDF = spark.read.json("/usr/file/json/emp.json")
empDF.createOrReplaceTempView("emp")
val deptDF = spark.read.json("/usr/file/json/dept.json")
deptDF.createOrReplaceTempView("dept")
```
两表字段中所有字段如下:
```properties
emp员工表
|-- ENAME: 员工姓名
|-- DEPTNO: 部门编号
|-- EMPNO: 员工编号
|-- HIREDATE: 入职时间
|-- JOB: 职务
|-- MGR: 上级编号
|-- SAL: 薪资
|-- COMM: 奖金
```
```properties
dept部门表
|-- DEPTNO: 部门编号
|-- DNAME: 部门名称
|-- LOC: 部门所在城市
```
> 注emp.jsondept.json可以在本仓库的resources目录进行下载。
## 三、联结操作
### 3.1 Inner Joins
```scala
// 1.定义联结表达式
val joinExpression = empDF.col("deptno") === deptDF.col("deptno")
// 2.联结查询
empDF.join(deptDF,joinExpression).select("ename","dname").show()
// 等价SQL如下
spark.sql("SELECT ename,dname FROM emp JOIN dept ON emp.deptno = dept.deptno").show()
```

View File

@ -127,13 +127,9 @@ zkServer.sh start
为保证集群高可用Zookeeper集群的节点数最好是奇数最少有三个节点所以这里演示搭建一个三个节点的集群。
> 以下演示为单机上搭建集群对于Zookeeper多机集群搭建步骤和单机一致。
### 2.1 修改配置
拷贝三份zookeeper安装包分别修改其配置文件`zoo.cfg`,主要是修改`dataDir``dataLogDir`以及配置集群信息。
如果是多台服务器则集群中每个节点通讯端口和选举端口可相同IP地址修改为每个节点所在主机IP即可。
拷贝三份zookeeper安装包分别修改其配置文件`zoo.cfg`,主要是修改`dataDir``dataLogDir`以及配置集群信息。修改后三份配置文件的内容分别如下:
zookeeper01配置
@ -152,6 +148,8 @@ server.2=127.0.0.1:2288:3388
server.3=127.0.0.1:2289:3389
```
> 如果是多台服务器则集群中每个节点通讯端口和选举端口可相同IP地址修改为每个节点所在主机IP即可。
zookeeper02配置与zookeeper01相比只有`dataLogDir``dataLogDir`不同:
```shell
@ -167,7 +165,7 @@ server.2=127.0.0.1:2288:3388
server.3=127.0.0.1:2289:3389
```
zookeeper03配置与zookeeper0102相比只有`dataLogDir``dataLogDir`不同:
zookeeper03配置与zookeeper0102相比只有`dataLogDir``dataLogDir`不同:
```shell
tickTime=2000
@ -186,9 +184,7 @@ server.3=127.0.0.1:2289:3389
### 2.2 标识节点
分别在三个节点的数据存储目录下新建`myid`文件,并写入对应的节点标识。
Zookeeper集群通过`myid`文件识别集群节点并通过上文配置的节点通信端口和选举端口来进行节点通信并选举出leader节点从而搭建出集群。
分别在三个节点的数据存储目录下新建`myid`文件,并写入对应的节点标识。Zookeeper集群通过`myid`文件识别集群节点并通过上文配置的节点通信端口和选举端口来进行节点通信选举出leader节点。
创建存储目录:

View File

@ -1,6 +1,130 @@
# 基于Zookeeper搭建Kafka高可用集群
## 一、Zookeeper集群搭建
## 一、集群环境搭建
为保证集群高可用Zookeeper集群的节点数最好是奇数最少有三个节点所以这里搭建一个三个节点的集群。
### 1.1 下载 & 解压
下载对应版本Zookeeper这里我下载的版本`3.4.14`。官方下载地址https://archive.apache.org/dist/zookeeper/
```shell
# 下载
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
# 解压
tar -zxvf zookeeper-3.4.14.tar.gz
```
### 1.2 修改配置
拷贝三份zookeeper安装包。分别进入安装目录的`conf`目录,拷贝配置样本`zoo_sample.cfg ``zoo.cfg`并进行修改,修改后三份配置文件内容分别如下:
zookeeper01配置
```shell
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-cluster/data/01
dataLogDir=/usr/local/zookeeper-cluster/log/01
clientPort=2181
# server.1 这个1是服务器的标识可以是任意有效数字标识这是第几个服务器节点这个标识要写到dataDir目录下面myid文件里
# 指名集群间通讯端口和选举端口
server.1=127.0.0.1:2287:3387
server.2=127.0.0.1:2288:3388
server.3=127.0.0.1:2289:3389
```
> 如果是多台服务器则集群中每个节点通讯端口和选举端口可相同IP地址修改为每个节点所在主机IP即可。
zookeeper02配置与zookeeper01相比只有`dataLogDir``dataLogDir`不同:
```shell
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-cluster/data/02
dataLogDir=/usr/local/zookeeper-cluster/log/02
clientPort=2182
server.1=127.0.0.1:2287:3387
server.2=127.0.0.1:2288:3388
server.3=127.0.0.1:2289:3389
```
zookeeper03配置与zookeeper0102相比也只有`dataLogDir``dataLogDir`不同:
```shell
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-cluster/data/03
dataLogDir=/usr/local/zookeeper-cluster/log/03
clientPort=2183
server.1=127.0.0.1:2287:3387
server.2=127.0.0.1:2288:3388
server.3=127.0.0.1:2289:3389
```
> 配置参数说明:
>
> - **tickTime**用于计算的基础时间单元。比如session超时N*tickTime
> - **initLimit**:用于集群,允许从节点连接并同步到 master节点的初始化连接时间以tickTime的倍数来表示
> - **syncLimit**:用于集群, master主节点与从节点之间发送消息请求和应答时间长度心跳机制
> - **dataDir**:数据存储位置;
> - **dataLogDir**:日志目录;
> - **clientPort**用于客户端连接的端口默认2181
### 1.3 标识节点
分别在三个节点的数据存储目录下新建`myid`文件,并写入对应的节点标识。Zookeeper集群通过`myid`文件识别集群节点并通过上文配置的节点通信端口和选举端口来进行节点通信选举出leader节点。
创建存储目录:
```shell
# dataDir
mkdir -vp /usr/local/zookeeper-cluster/data/01
# dataDir
mkdir -vp /usr/local/zookeeper-cluster/data/02
# dataDir
mkdir -vp /usr/local/zookeeper-cluster/data/03
```
创建并写入节点标识到`myid`文件:
```shell
#server1
echo "1" > /usr/local/zookeeper-cluster/data/01/myid
#server2
echo "2" > /usr/local/zookeeper-cluster/data/02/myid
#server3
echo "3" > /usr/local/zookeeper-cluster/data/03/myid
```
### 1.4 启动集群
分别启动三个节点:
```shell
# 启动节点1
/usr/app/zookeeper-cluster/zookeeper01/bin/zkServer.sh start
# 启动节点2
/usr/app/zookeeper-cluster/zookeeper02/bin/zkServer.sh start
# 启动节点3
/usr/app/zookeeper-cluster/zookeeper03/bin/zkServer.sh start
```
### 1.5 集群验证
使用jps查看进程并且使用`zkServer.sh status`查看集群各个节点状态。如图三个节点进程均启动成功并且两个节点为follower节点一个节点为leader节点。
<div align="center"> <img src="https://github.com/heibaiying/BigData-Notes/blob/master/pictures/zookeeper-cluster.png"/> </div>
## 二、Kafka集群搭建
@ -11,15 +135,12 @@ Kafka安装包官方下载地址http://kafka.apache.org/downloads ,本用
```shell
# 下载
wget https://www-eu.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz
```
这里说明一下kafka安装包的命名规则`kafka_2.12-2.2.0.tgz`为例前面的2.12代表Scala的版本号Kafka是采用Scala语言开发的后面的2.2.0代表Kafka的版本号。
```shell
# 解压
tar -xzf kafka_2.12-2.2.0.tgz
```
>这里j解释一下kafka安装包的命名规则`kafka_2.12-2.2.0.tgz`为例前面的2.12代表Scala的版本号Kafka采用Scala语言进行开发后面的2.2.0则代表Kafka的版本号。
### 2.2 拷贝配置文件
进入解压目录的` config`目录下 ,拷贝三份配置文件
@ -67,7 +188,7 @@ zookeeper.connect=hadoop001:2181,hadoop001:2182,hadoop001:2183
### 2.4 启动集群
分别指定不同配置文件启动三个Kafka节点
分别指定不同配置文件启动三个Kafka节点。启动后可以使用jps查看进程此时应该有三个zookeeper进程和三个kafka进程。
```shell
bin/kafka-server-start.sh config/server-1.properties
@ -75,19 +196,6 @@ bin/kafka-server-start.sh config/server-2.properties
bin/kafka-server-start.sh config/server-3.properties
```
启动后使用jps查看进程此时应该有三个zookeeper进程和三个kafka进程
```shell
[root@hadoop001 kafka_2.12-2.2.0]# jps
14288 QuorumPeerMain
18385 Jps
16228 Kafka
17653 Kafka
14374 QuorumPeerMain
16826 Kafka
14446 QuorumPeerMain
```
### 2.5 创建测试主题
创建测试主题:
@ -96,7 +204,7 @@ bin/kafka-server-start.sh config/server-3.properties
bin/kafka-topics.sh --create --bootstrap-server hadoop001:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
```
查看主题信息:
创建后可以使用以下命令查看创建的主题信息:
```shell
bin/kafka-topics.sh --describe --bootstrap-server hadoop001:9092 --topic my-replicated-topic
@ -104,7 +212,7 @@ bin/kafka-topics.sh --describe --bootstrap-server hadoop001:9092 --topic my-repl
![kafka-cluster-shell](D:\BigData-Notes\pictures\kafka-cluster-shell.png)
还可以进一步创建一个消费者和一个生产者进行测试:
你也可以创建一个消费者和生产者进行连通测试:
```shell
# 创建生产者