diff --git a/README.md b/README.md index 4d90b51..20d1133 100644 --- a/README.md +++ b/README.md @@ -143,7 +143,7 @@ TODO ## 十、Kafka 1. [Kafka 核心概念介绍](https://github.com/heibaiying/BigData-Notes/blob/master/notes/Kafka核心概念介绍.md) -2. 基于Zookeeper搭建Kafka高可用集群 +2. [基于Zookeeper搭建Kafka高可用集群](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/基于Zookeeper搭建Kafka高可用集群.md) 3. Kafka生产者详解 4. Kafka消费者详解 5. Kafka 副本机制以及选举原理剖析 @@ -152,7 +152,7 @@ TODO ## 十一、Zookeeper 1. Zookeeper 简介及核心概念 -2. Zookeeper集群搭建Zookeeper +2. [Zookeeper单机环境和集群环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Zookeeper单机环境和集群环境搭建.md) 3. Zookeeper分布式锁实现方案 4. 集群升级、迁移深入分析 Zookeeper 5. Zab协议及选举机制 diff --git a/code/spark/spark-core/src/main/java/rdd/scala/SparkSqlApp.scala b/code/spark/spark-core/src/main/java/rdd/scala/SparkSqlApp.scala index b40b0de..ac5e6f3 100644 --- a/code/spark/spark-core/src/main/java/rdd/scala/SparkSqlApp.scala +++ b/code/spark/spark-core/src/main/java/rdd/scala/SparkSqlApp.scala @@ -10,45 +10,47 @@ object SparkSqlApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate() + val empDF = spark.read.json("/usr/file/json/emp.json") empDF.createOrReplaceTempView("emp") - empDF.show() - empDF.select(count("ename")).show() - empDF.select(countDistinct("deptno")).show() - empDF.select(approx_count_distinct("ename", 0.1)).show() - empDF.select(first("ename"), last("job")).show() - empDF.select(min("sal"), max("sal")).show() - empDF.select(sum("sal")).show() - empDF.select(sumDistinct("sal")).show() - empDF.select(avg("sal")).show() + val deptDF = spark.read.json("/usr/file/json/dept.json") + deptDF.createOrReplaceTempView("dept") + + deptDF.printSchema() + + // 1.定义联结表达式 + val joinExpression = empDF.col("deptno") === deptDF.col("deptno") + // 2.联结查询 + empDF.join(deptDF, joinExpression).select("ename", "dname").show() + spark.sql("SELECT ename,dname FROM emp JOIN dept ON emp.deptno = dept.deptno").show() - // 总体方差 均方差 总体标准差 样本标准差 - empDF.select(var_pop("sal"), var_samp("sal"), stddev_pop("sal"), stddev_samp("sal")).show() + empDF.join(deptDF, joinExpression, "outer").show() + spark.sql("SELECT * FROM emp FULL OUTER JOIN dept ON emp.deptno = dept.deptno").show() + empDF.join(deptDF, joinExpression, "left_outer").show() + spark.sql("SELECT * FROM emp LEFT OUTER JOIN dept ON emp.deptno = dept.deptno").show() - // 偏度和峰度 - empDF.select(skewness("sal"), kurtosis("sal")).show() + empDF.join(deptDF, joinExpression, "right_outer").show() + spark.sql("SELECT * FROM emp RIGHT OUTER JOIN dept ON emp.deptno = dept.deptno").show() - // 计算两列的 皮尔逊相关系数 样本协方差 总体协方差 - empDF.select(corr("empno", "sal"), covar_samp("empno", "sal"), - covar_pop("empno", "sal")).show() + empDF.join(deptDF, joinExpression, "left_semi").show() + spark.sql("SELECT * FROM emp LEFT SEMI JOIN dept ON emp.deptno = dept.deptno").show() - empDF.agg(collect_set("job"), collect_list("ename")).show() + empDF.join(deptDF, joinExpression, "left_anti").show() + spark.sql("SELECT * FROM emp LEFT ANTI dept ON emp.deptno = dept.deptno").show() - - empDF.groupBy("deptno", "job").count().show() - spark.sql("SELECT deptno, job, count(*) FROM emp GROUP BY deptno, job").show() - - empDF.groupBy("deptno").agg(count("ename").alias("人数"), sum("sal").alias("总工资")).show() - spark.sql("SELECT deptno, count(ename) ,sum(sal) FROM emp GROUP BY deptno").show() - - - empDF.groupBy("deptno").agg("ename"->"count","sal"->"sum").show() + /*你绝对应该使用交叉连接,100%确定这是你需要的。 在Spark中定义交叉连接时,有一个原因需要明确。 他们很危险! + 高级用户可以将会话级配置spark.sql.crossJoin.enable设置为true,以便允许交叉连接而不发出警告,或者Spark没有尝试为您执行另一个连接。*/ + empDF.join(deptDF, joinExpression, "cross").show() + spark.sql("SELECT * FROM emp CROSS JOIN dept ON emp.deptno = dept.deptno").show() + + + spark.sql("SELECT * FROM graduateProgram NATURAL JOIN person").show() } } diff --git a/notes/Linux中大数据常用软件安装指南.md b/notes/Linux中大数据常用软件安装指南.md index d613dd5..c9718c4 100644 --- a/notes/Linux中大数据常用软件安装指南.md +++ b/notes/Linux中大数据常用软件安装指南.md @@ -49,3 +49,12 @@ +### 九、Zookeeper + +1. [Zookeeper单机环境和集群环境搭建](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/Zookeeper单机环境和集群环境搭建.md) + + + +### 十、Kafka + +1. [基于Zookeeper搭建Kafka高可用集群](https://github.com/heibaiying/BigData-Notes/blob/master/notes/installation/基于Zookeeper搭建Kafka高可用集群.md) \ No newline at end of file diff --git a/notes/SparkSQL联结操作.md b/notes/SparkSQL联结操作.md new file mode 100644 index 0000000..1c1d36c --- /dev/null +++ b/notes/SparkSQL联结操作.md @@ -0,0 +1,57 @@ +## 一、简介 + + + +## 二、 数据准备 + +分别创建员工和部门datafame,并注册为临时视图,代码如下: + +```scala +val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate() + +val empDF = spark.read.json("/usr/file/json/emp.json") +empDF.createOrReplaceTempView("emp") + +val deptDF = spark.read.json("/usr/file/json/dept.json") +deptDF.createOrReplaceTempView("dept") +``` + +两表字段中所有字段如下: + +```properties +emp员工表 + |-- ENAME: 员工姓名 + |-- DEPTNO: 部门编号 + |-- EMPNO: 员工编号 + |-- HIREDATE: 入职时间 + |-- JOB: 职务 + |-- MGR: 上级编号 + |-- SAL: 薪资 + |-- COMM: 奖金 +``` + +```properties +dept部门表 + |-- DEPTNO: 部门编号 + |-- DNAME: 部门名称 + |-- LOC: 部门所在城市 +``` + +> 注:emp.json,dept.json可以在本仓库的resources目录进行下载。 + + + +## 三、联结操作 + +### 3.1 Inner Joins + +```scala +// 1.定义联结表达式 +val joinExpression = empDF.col("deptno") === deptDF.col("deptno") +// 2.联结查询 +empDF.join(deptDF,joinExpression).select("ename","dname").show() + +// 等价SQL如下: +spark.sql("SELECT ename,dname FROM emp JOIN dept ON emp.deptno = dept.deptno").show() +``` + diff --git a/notes/installation/Zookeeper单机环境和集群环境搭建.md b/notes/installation/Zookeeper单机环境和集群环境搭建.md index 769bf0b..0e97a6f 100644 --- a/notes/installation/Zookeeper单机环境和集群环境搭建.md +++ b/notes/installation/Zookeeper单机环境和集群环境搭建.md @@ -127,13 +127,9 @@ zkServer.sh start 为保证集群高可用,Zookeeper集群的节点数最好是奇数,最少有三个节点,所以这里演示搭建一个三个节点的集群。 -> 以下演示为单机上搭建集群,对于Zookeeper,多机集群搭建步骤和单机一致。 - ### 2.1 修改配置 -拷贝三份zookeeper安装包,分别修改其配置文件`zoo.cfg`,主要是修改`dataDir`、`dataLogDir`以及配置集群信息。 - -如果是多台服务器,则集群中每个节点通讯端口和选举端口可相同,IP地址修改为每个节点所在主机IP即可。 +拷贝三份zookeeper安装包,分别修改其配置文件`zoo.cfg`,主要是修改`dataDir`、`dataLogDir`以及配置集群信息。修改后三份配置文件的内容分别如下: zookeeper01配置: @@ -152,6 +148,8 @@ server.2=127.0.0.1:2288:3388 server.3=127.0.0.1:2289:3389 ``` +> 如果是多台服务器,则集群中每个节点通讯端口和选举端口可相同,IP地址修改为每个节点所在主机IP即可。 + zookeeper02配置,与zookeeper01相比,只有`dataLogDir`和`dataLogDir`不同: ```shell @@ -167,7 +165,7 @@ server.2=127.0.0.1:2288:3388 server.3=127.0.0.1:2289:3389 ``` -zookeeper03配置,与zookeeper01,02相比,只有`dataLogDir`和`dataLogDir`不同: +zookeeper03配置,与zookeeper01,02相比,也只有`dataLogDir`和`dataLogDir`不同: ```shell tickTime=2000 @@ -186,9 +184,7 @@ server.3=127.0.0.1:2289:3389 ### 2.2 标识节点 -分别在三个节点的数据存储目录下新建`myid`文件,并写入对应的节点标识。 - -Zookeeper集群通过`myid`文件识别集群节点,并通过上文配置的节点通信端口和选举端口来进行节点通信,并选举出leader节点,从而搭建出集群。 +分别在三个节点的数据存储目录下新建`myid`文件,并写入对应的节点标识。Zookeeper集群通过`myid`文件识别集群节点,并通过上文配置的节点通信端口和选举端口来进行节点通信,选举出leader节点。 创建存储目录: diff --git a/notes/installation/基于Zookeeper搭建Kafka高可用集群.md b/notes/installation/基于Zookeeper搭建Kafka高可用集群.md index 9fdcad1..3bd1cf2 100644 --- a/notes/installation/基于Zookeeper搭建Kafka高可用集群.md +++ b/notes/installation/基于Zookeeper搭建Kafka高可用集群.md @@ -1,6 +1,130 @@ # 基于Zookeeper搭建Kafka高可用集群 -## 一、Zookeeper集群搭建 +## 一、集群环境搭建 + +为保证集群高可用,Zookeeper集群的节点数最好是奇数,最少有三个节点,所以这里搭建一个三个节点的集群。 + +### 1.1 下载 & 解压 + +下载对应版本Zookeeper,这里我下载的版本`3.4.14`。官方下载地址:https://archive.apache.org/dist/zookeeper/ + +```shell +# 下载 +wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz +# 解压 +tar -zxvf zookeeper-3.4.14.tar.gz +``` + +### 1.2 修改配置 + +拷贝三份zookeeper安装包。分别进入安装目录的`conf`目录,拷贝配置样本`zoo_sample.cfg `为`zoo.cfg`并进行修改,修改后三份配置文件内容分别如下: + +zookeeper01配置: + +```shell +tickTime=2000 +initLimit=10 +syncLimit=5 +dataDir=/usr/local/zookeeper-cluster/data/01 +dataLogDir=/usr/local/zookeeper-cluster/log/01 +clientPort=2181 + +# server.1 这个1是服务器的标识,可以是任意有效数字,标识这是第几个服务器节点,这个标识要写到dataDir目录下面myid文件里 +# 指名集群间通讯端口和选举端口 +server.1=127.0.0.1:2287:3387 +server.2=127.0.0.1:2288:3388 +server.3=127.0.0.1:2289:3389 +``` + +> 如果是多台服务器,则集群中每个节点通讯端口和选举端口可相同,IP地址修改为每个节点所在主机IP即可。 + +zookeeper02配置,与zookeeper01相比,只有`dataLogDir`和`dataLogDir`不同: + +```shell +tickTime=2000 +initLimit=10 +syncLimit=5 +dataDir=/usr/local/zookeeper-cluster/data/02 +dataLogDir=/usr/local/zookeeper-cluster/log/02 +clientPort=2182 + +server.1=127.0.0.1:2287:3387 +server.2=127.0.0.1:2288:3388 +server.3=127.0.0.1:2289:3389 +``` + +zookeeper03配置,与zookeeper01,02相比,也只有`dataLogDir`和`dataLogDir`不同: + +```shell +tickTime=2000 +initLimit=10 +syncLimit=5 +dataDir=/usr/local/zookeeper-cluster/data/03 +dataLogDir=/usr/local/zookeeper-cluster/log/03 +clientPort=2183 + +server.1=127.0.0.1:2287:3387 +server.2=127.0.0.1:2288:3388 +server.3=127.0.0.1:2289:3389 +``` + +> 配置参数说明: +> +> - **tickTime**:用于计算的基础时间单元。比如session超时:N*tickTime; +> - **initLimit**:用于集群,允许从节点连接并同步到 master节点的初始化连接时间,以tickTime的倍数来表示; +> - **syncLimit**:用于集群, master主节点与从节点之间发送消息,请求和应答时间长度(心跳机制); +> - **dataDir**:数据存储位置; +> - **dataLogDir**:日志目录; +> - **clientPort**:用于客户端连接的端口,默认2181 + + + +### 1.3 标识节点 + +分别在三个节点的数据存储目录下新建`myid`文件,并写入对应的节点标识。Zookeeper集群通过`myid`文件识别集群节点,并通过上文配置的节点通信端口和选举端口来进行节点通信,选举出leader节点。 + +创建存储目录: + +```shell +# dataDir +mkdir -vp /usr/local/zookeeper-cluster/data/01 +# dataDir +mkdir -vp /usr/local/zookeeper-cluster/data/02 +# dataDir +mkdir -vp /usr/local/zookeeper-cluster/data/03 +``` + +创建并写入节点标识到`myid`文件: + +```shell +#server1 +echo "1" > /usr/local/zookeeper-cluster/data/01/myid +#server2 +echo "2" > /usr/local/zookeeper-cluster/data/02/myid +#server3 +echo "3" > /usr/local/zookeeper-cluster/data/03/myid +``` + +### 1.4 启动集群 + +分别启动三个节点: + +```shell +# 启动节点1 +/usr/app/zookeeper-cluster/zookeeper01/bin/zkServer.sh start +# 启动节点2 +/usr/app/zookeeper-cluster/zookeeper02/bin/zkServer.sh start +# 启动节点3 +/usr/app/zookeeper-cluster/zookeeper03/bin/zkServer.sh start +``` + +### 1.5 集群验证 + +使用jps查看进程,并且使用`zkServer.sh status`查看集群各个节点状态。如图三个节点进程均启动成功,并且两个节点为follower节点,一个节点为leader节点。 + +