first commit

This commit is contained in:
张乾
2024-10-16 13:06:13 +08:00
parent 2393162ba9
commit c47809d1ff
41 changed files with 9189 additions and 0 deletions

View File

@ -0,0 +1,226 @@
因收到Google相关通知网站将会择期关闭。相关通知内容
10 广播变量 & 累加器:共享变量是用来做什么的?
你好,我是吴磊。
今天是国庆第一天,首先祝你节日快乐。专栏上线以来,有不少同学留言说期待后续内容,所以国庆期间我们仍旧更新正文内容,让我们一起把基础知识模块收个尾。
学习过RDD常用算子之后回顾这些算子你会发现它们都是作用Apply在RDD之上的。RDD的计算以数据分区为粒度依照算子的逻辑Executors以相互独立的方式完成不同数据分区的计算与转换。
不难发现对于Executors来说分区中的数据都是局部数据。换句话说在同一时刻隶属于某个Executor的数据分区对于其他Executors来说是不可见的。
不过在做应用开发的时候总会有一些计算逻辑需要访问“全局变量”比如说全局计数器而这些全局变量在任意时刻对所有的Executors都是可见的、共享的。那么问题来了像这样的全局变量或者说共享变量Spark又是如何支持的呢
今天这一讲我就来和你聊聊Spark共享变量。按照创建与使用方式的不同Spark提供了两类共享变量分别是广播变量Broadcast variables和累加器Accumulators。接下来我们就正式进入今天的学习去深入了解这两种共享变量的用法、以及它们各自的适用场景。
广播变量Broadcast variables
我们先来说说广播变量。广播变量的用法很简单给定普通变量x通过调用SparkContext下的broadcast API即可完成广播变量的创建我们结合代码例子看一下。
val list: List[String] = List("Apache", "Spark")
// sc为SparkContext实例
val bc = sc.broadcast(list)
在上面的代码示例中我们先是定义了一个字符串列表list它包含“Apache”和“Spark”这两个单词。然后我们使用broadcast函数来创建广播变量bcbc封装的内容就是list列表。
// 读取广播变量内容
bc.value
// List[String] = List(Apache, Spark)
// 直接读取列表内容
list
// List[String] = List(Apache, Spark)
使用broadcast API创建广播变量
广播变量创建好之后通过调用它的value函数我们就可以访问它所封装的数据内容。可以看到调用bc.value的效果这与直接访问字符串列表list的效果是完全一致的。
看到这里你可能会问“明明通过访问list变量就可以直接获取字符串列表为什么还要绕个大弯儿先去封装广播变量然后又通过它的value函数来获取同样的数据内容呢”实际上这是个非常好的问题要回答这个问题咱们需要做个推演看看直接访问list变量会产生哪些弊端。
在前面的几讲中我们换着花样地变更Word Count的计算逻辑。尽管Word Count都快被我们“玩坏了”不过一以贯之地沿用同一个实例有助于我们通过对比迅速掌握新的知识点、技能点。因此为了让你迅速掌握广播变量的“精髓”咱们不妨“故技重施”继续在Word Count这个实例上做文章。
普通变量的痛点
这一次为了对比使用广播变量前后的差异我们把Word Count变更为“定向计数”。
所谓定向计数它指的是只对某些单词进行计数例如给定单词列表list我们只对文件wikiOfSpark.txt当中的“Apache”和“Spark”这两个单词做计数其他单词我们可以忽略。结合[第1讲]Word Count的完整代码这样的计算逻辑很容易实现如下表所示。
import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
// 创建单词列表list
val list: List[String] = List("Apache", "Spark")
// 使用list列表对RDD进行过滤
val cleanWordRDD: RDD[String] = wordRDD.filter(word => list.contains(word))
// 把RDD元素转换为KeyValue的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
// 获取计算结果
wordCounts.collect
// Array[(String, Int)] = Array((Apache,34), (Spark,63))
将上述代码丢进spark-shell我们很快就能算出在wikiOfSpark.txt文件中“Apache”这个单词出现了34次而“Spark”则出现了63次。虽说得出计算结果挺容易的不过知其然还要知其所以然接下来咱们一起来分析一下这段代码在运行时是如何工作的。
如上图所示list变量本身是在Driver端创建的它并不是分布式数据集如lineRDD、wordRDD的一部分。因此在分布式计算的过程中Spark需要把list变量分发给每一个分布式任务Task从而对不同数据分区的内容进行过滤。
在这种工作机制下如果RDD并行度较高、或是变量的尺寸较大那么重复的内容分发就会引入大量的网络开销与存储开销而这些开销会大幅削弱作业的执行性能。为什么这么说呢
要知道Driver端变量的分发是以Task为粒度的系统中有多少个Task变量就需要在网络中分发多少次。更要命的是每个Task接收到变量之后都需要把它暂存到内存以备后续过滤之用。换句话说在同一个Executor内部多个不同的Task多次重复地缓存了同样的内容拷贝毫无疑问这对宝贵的内存资源是一种巨大的浪费。
RDD并行度较高意味着RDD的数据分区数量较多而Task数量与分区数相一致这就代表系统中有大量的分布式任务需要执行。如果变量本身尺寸较大大量分布式任务引入的网络开销与内存开销会进一步升级。在工业级应用中RDD的并行度往往在千、万这个量级在这种情况下诸如list这样的变量会在网络中分发成千上万次作业整体的执行效率自然会很差 。
面对这样的窘境,我们有没有什么办法,能够避免同一个变量的重复分发与存储呢?答案当然是肯定的,这个时候,我们就可以祭出广播变量这个“杀手锏”。
广播变量的优势
想要知道广播变量到底有啥优势,我们可以先用广播变量重写一下前面的代码实现,然后再做个对比,很容易就能发现广播变量为什么能解决普通变量的痛点。
import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
// 创建单词列表list
val list: List[String] = List("Apache", "Spark")
// 创建广播变量bc
val bc = sc.broadcast(list)
// 使用bc.value对RDD进行过滤
val cleanWordRDD: RDD[String] = wordRDD.filter(word => bc.value.contains(word))
// 把RDD元素转换为KeyValue的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
// 获取计算结果
wordCounts.collect
// Array[(String, Int)] = Array((Apache,34), (Spark,63))
可以看到代码的修改非常简单我们先是使用broadcast函数来封装list变量然后在RDD过滤的时候调用bc.value来访问list变量内容。你可不要小看这个改写尽管代码的改动微乎其微几乎可以忽略不计但在运行时整个计算过程却发生了翻天覆地的变化。
在使用广播变量之前list变量的分发是以Task为粒度的而在使用广播变量之后变量分发的粒度变成了以Executors为单位同一个Executor内多个不同的Tasks只需访问同一份数据拷贝即可。换句话说变量在网络中分发与存储的次数从RDD的分区数量锐减到了集群中Executors的个数。
要知道在工业级系统中Executors个数与RDD并行度相比二者之间通常会相差至少两个数量级。在这样的量级下广播变量节省的网络与内存开销会变得非常可观省去了这些开销对作业的执行性能自然大有裨益。
好啦到现在为止我们讲解了广播变量的用法、工作原理以及它的优势所在。在日常的开发工作中当你遇到需要多个Task共享同一个大型变量如列表、数组、映射等数据结构的时候就可以考虑使用广播变量来优化你的Spark作业。接下来我们继续来说说Spark支持的第二种共享变量累加器。
累加器Accumulators
累加器顾名思义它的主要作用是全局计数Global counter。与单机系统不同在分布式系统中我们不能依赖简单的普通变量来完成全局计数而是必须依赖像累加器这种特殊的数据结构才能达到目的。
与广播变量类似累加器也是在Driver端定义的但它的更新是通过在RDD算子中调用add函数完成的。在应用执行完毕之后开发者在Driver端调用累加器的value函数就能获取全局计数结果。按照惯例咱们还是通过代码来熟悉累加器的用法。
聪明的你可能已经猜到了我们又要对Word Count“动手脚”了。在第1讲的Word Count中我们过滤掉了空字符串然后对文件wikiOfSpark.txt中所有的单词做统计计数。
不过这一次,我们在过滤掉空字符的同时,还想知道文件中到底有多少个空字符串,这样我们对文件中的“脏数据”就能做到心中有数了。
注意这里对于空字符串的计数不是主代码逻辑它的计算结果不会写入到Word Count最终的统计结果。所以只是简单地去掉filter环节是无法实现空字符计数的。
那么你自然会问“不把filter环节去掉怎么对空字符串做统计呢”别着急这样的计算需求正是累加器可以施展拳脚的地方。你可以先扫一眼下表的代码实现然后我们再一起来熟悉累加器的用法。
import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
// 定义Long类型的累加器
val ac = sc.longAccumulator("Empty string")
// 定义filter算子的判定函数f注意f的返回类型必须是Boolean
def f(x: String): Boolean = {
if(x.equals("")) {
// 当遇到空字符串时累加器加1
ac.add(1)
return false
} else {
return true
}
}
// 使用f对RDD进行过滤
val cleanWordRDD: RDD[String] = wordRDD.filter(f)
// 把RDD元素转换为KeyValue的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
// 收集计数结果
wordCounts.collect
// 作业执行完毕通过调用value获取累加器结果
ac.value
// Long = 79
与第1讲的Word Count相比这里的代码主要有4处改动
使用SparkContext下的longAccumulator来定义Long类型的累加器
定义filter算子的判定函数f当遇到空字符串时调用add函数为累加器计数
以函数f为参数调用filter算子对RDD进行过滤
作业完成后调用累加器的value函数获取全局计数结果。
你不妨把上面的代码敲入到spark-shell里直观体验下累加器的用法与效果ac.value给出的结果是79这说明以空格作为分隔符切割源文件wikiOfSpark.txt之后就会留下79个空字符串。
另外你还可以验证wordCounts这个RDD它包含所有单词的计数结果不过你会发现它的元素并不包含空字符串这与我们预期的计算逻辑是一致的。
除了上面代码中用到的longAccumulatorSparkContext还提供了doubleAccumulator和collectionAccumulator这两种不同类型的累加器用于满足不同场景下的计算需要感兴趣的话你不妨自己动手亲自尝试一下。
其中doubleAccumulator用于对Double类型的数值做全局计数而collectionAccumulator允许开发者定义集合类型的累加器相比数值类型集合类型可以为业务逻辑的实现提供更多的灵活性和更大的自由度。
不过就这3种累加器来说尽管类型不同但它们的用法是完全一致的。都是先定义累加器变量然后在RDD算子中调用add函数从而更新累加器状态最后通过调用value函数来获取累加器的最终结果。
好啦,到这里,关于累加器的用法,我们就讲完了。在日常的开发中,当你遇到需要做全局计数的场景时,别忘了用上累加器这个实用工具。
重点回顾
今天的内容讲完了,我们一起来做个总结。今天这一讲,我们重点讲解了广播变量与累加器的用法与适用场景。
广播变量由Driver端定义并初始化各个Executors以只读Read only的方式访问广播变量携带的数据内容。累加器也是由Driver定义的但Driver并不会向累加器中写入任何数据内容累加器的内容更新完全是由各个Executors以只写Write only的方式来完成而Driver仅以只读的方式对更新后的内容进行访问。
关于广播变量你首先需要掌握它的基本用法。给定任意类型的普通变量你都可以使用SparkContext下面的broadcast API来创建广播变量。接下来在RDD的转换与计算过程中你可以通过调用广播变量的value函数来访问封装的数据内容从而辅助RDD的数据处理。
需要额外注意的是在Driver与Executors之间普通变量的分发与存储是以Task为粒度的因此它所引入的网络与内存开销会成为作业执行性能的一大隐患。在使用广播变量的情况下数据内容的分发粒度变为以Executors为单位。相比前者广播变量的优势高下立判它可以大幅度消除前者引入的网络与内存开销进而在整体上提升作业的执行效率。
关于累加器首先你要清楚它的适用场景当你需要做全局计数的时候累加器会是个很好的帮手。其次你需要掌握累加器的具体用法可以分为这样3步
使用SparkContext下的[long | double | collection]Accumulator来定义累加器
在RDD的转换过程中调用add函数更新累加器状态
在作业完成后调用value函数获取累加器的全局结果。
每课一练
在使用累加器对空字符串做全局计数的代码中,请你用普通变量去替换累加器,试一试,在不使用累加器的情况,能否得到预期的计算结果?
累加器提供了Long、Double和Collection三种类型的支持那么广播变量在类型支持上有限制吗除了普通类型、集合类型之外广播变量还支持其他类型吗比如Spark支持在RDD之上创建广播变量吗
欢迎你在留言区跟我交流互动,也推荐你把这一讲的内容分享给你身边的朋友,说不定就能帮他解决一个难题。

View File

@ -0,0 +1,235 @@
因收到Google相关通知网站将会择期关闭。相关通知内容
20 Hive + Spark强强联合分布式数仓的不二之选
你好,我是吴磊。
在数据源与数据格式以及数据转换那两讲第15、16讲我们介绍了在Spark SQL之上做数据分析应用开发的一般步骤。
这里我们简单回顾一下首先我们通过SparkSession read API从分布式文件系统创建DataFrame。然后通过创建临时表并使用SQL语句或是直接使用DataFrame API来进行各式各样的数据转换、过滤、聚合等操作。最后我们再用SparkSession的write API把计算结果写回分布式文件系统。
实际上直接与文件系统交互仅仅是Spark SQL数据应用的常见场景之一。Spark SQL另一类非常典型的场景是与Hive做集成、构建分布式数据仓库。我们知道数据仓库指的是一类带有主题、聚合层次较高的数据集合它的承载形式往往是一系列Schema经过精心设计的数据表。在数据分析这类场景中数据仓库的应用非常普遍。
在Hive与Spark这对“万金油”组合中Hive擅长元数据管理而Spark的专长是高效的分布式计算二者的结合可谓是“强强联合”。今天这一讲我们就来聊一聊Spark与Hive集成的两类方式一类是从Spark的视角出发我们称之为Spark with Hive而另一类则是从Hive的视角出发业界的通俗说法是Hive on Spark。
Hive架构与基本原理
磨刀不误砍柴工在讲解这两类集成方式之前我们不妨先花点时间来了解一下Hive的架构和工作原理避免不熟悉Hive的同学听得云里雾里。
Hive是Apache Hadoop社区用于构建数据仓库的核心组件它负责提供种类丰富的用户接口接收用户提交的SQL查询语句。这些查询语句经过Hive的解析与优化之后往往会被转化为分布式任务并交付Hadoop MapReduce付诸执行。
Hive是名副其实的“集大成者”它的核心部件其实主要是User Interface1和Driver3。而不论是元数据库4、存储系统5还是计算引擎6Hive都以“外包”、“可插拔”的方式交给第三方独立组件所谓“把专业的事交给专业的人去做”如下图所示。
Hive的User Interface为开发者提供SQL接入服务具体的接入途径有Hive Server 22、CLI和Web InterfaceWeb界面入口。其中CLI与Web Interface直接在本地接收SQL查询语句而Hive Server 2则通过提供JDBC/ODBC客户端连接允许开发者从远程提交SQL查询请求。显然Hive Server 2的接入方式更为灵活应用也更为广泛。
我们以响应一个SQL查询为例看一看Hive是怎样工作的。接收到SQL查询之后Hive的Driver首先使用其Parser组件将查询语句转化为ASTAbstract Syntax Tree查询语法树
紧接着Planner组件根据AST生成执行计划而Optimizer则进一步优化执行计划。要完成这一系列的动作Hive必须要能拿到相关数据表的元信息才行比如表名、列名、字段类型、数据文件存储路径、文件格式等等。而这些重要的元信息通通存储在一个叫作“Hive Metastore”4的数据库中。
本质上Hive Metastore其实就是一个普通的关系型数据库RDBMS它可以是免费的MySQL、Derby也可以是商业性质的Oracle、IBM DB2。实际上除了用于辅助SQL语法解析、执行计划的生成与优化Metastore的重要作用之一是帮助底层计算引擎高效地定位并访问分布式文件系统中的数据源。
这里的分布式文件系统可以是Hadoop生态的HDFS也可以是云原生的Amazon S3。而在执行方面Hive目前支持3类计算引擎分别是Hadoop MapReduce、Tez和Spark。
当Hive采用Spark作为底层的计算引擎时我们就把这种集成方式称作“Hive on Spark”。相反当Spark仅仅是把Hive当成是一种元信息的管理工具时我们把Spark与Hive的这种集成方式叫作“Spark with Hive”。
你可能会觉得很困惑“这两种说法听上去差不多嘛两种集成方式到底有什么本质的不同呢”接下来我们就按照“先易后难”的顺序先来说说“Spark with Hive”这种集成方式然后再去介绍“Hive on Spark”。
Spark with Hive
在开始正式学习Spark with Hive之前我们先来说说这类集成方式的核心思想。前面我们刚刚说过Hive Metastore利用RDBMS来存储数据表的元信息如表名、表类型、表数据的Schema、表分区数据的存储路径、以及存储格式等等。形象点说Metastore就像是“户口簿”它记录着分布式文件系统中每一份数据集的“底细”。
Spark SQL通过访问Hive Metastore这本“户口簿”即可扩充数据访问来源。而这就是Spark with Hive集成方式的核心思想。直白点说在这种集成模式下Spark是主体Hive Metastore不过是Spark用来扩充数据来源的辅助工具。厘清Spark与Hive的关系有助于我们后面区分Hive on Spark与Spark with Hive之间的差异。
作为开发者我们可以通过3种途径来实现Spark with Hive的集成方式它们分别是
创建SparkSession访问本地或远程的Hive Metastore
通过Spark内置的spark-sql CLI访问本地Hive Metastore
通过Beeline客户端访问Spark Thrift Server。
SparkSession + Hive Metastore
为了更好地理解Hive与Spark的关系我们先从第一种途径也就是通过SparkSession访问Hive Metastore说起。首先我们使用如下命令来启动Hive Metastore。
hive --service metastore
Hive Metastore启动之后我们需要让Spark知道Metastore的访问地址也就是告诉他数据源的“户口簿”藏在什么地方。
要传递这个消息我们有两种办法。一种是在创建SparkSession的时候通过config函数来明确指定hive.metastore.uris参数。另一种方法是让Spark读取Hive的配置文件hive-site.xml该文件记录着与Hive相关的各种配置项其中就包括hive.metastore.uris这一项。把hive-site.xml拷贝到Spark安装目录下的conf子目录Spark即可自行读取其中的配置内容。
接下来我们通过一个小例子来演示第一种用法。假设Hive中有一张名为“salaries”的薪资表每条数据都包含id和salary两个字段表数据存储在HDFS那么在spark-shell中敲入下面的代码我们即可轻松访问Hive中的数据表。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
val hiveHost: String = _
// 创建SparkSession实例
val spark = SparkSession.builder()
.config("hive.metastore.uris", s"thrift://hiveHost:9083")
.enableHiveSupport()
.getOrCreate()
// 读取Hive表创建DataFrame
val df: DataFrame = spark.sql(“select * from salaries”)
df.show
/** 结果打印
+---+------+
| id|salary|
+---+------+
| 1| 26000|
| 2| 30000|
| 4| 25000|
| 3| 20000|
+---+------+
*/
在[第16讲]我们讲过利用createTempView函数从数据文件创建临时表的方法临时表创建好之后我们就可以使用SparkSession的sql API来提交SQL查询语句。连接到Hive Metastore之后咱们就可以绕过第一步直接使用sql API去访问Hive中现有的表是不是很方便
更重要的是createTempView函数创建的临时表它的生命周期仅限于Spark作业内部这意味着一旦作业执行完毕临时表也就不复存在没有办法被其他应用复用。Hive表则不同它们的元信息已经持久化到Hive Metastore中不同的作业、应用、甚至是计算引擎如Spark、Presto、Impala等等都可以通过Hive Metastore来访问Hive表。
总结下来在SparkSession + Hive Metastore这种集成方式中Spark对于Hive的访问仅仅涉及到Metastore这一环节对于Hive架构中的其他组件Spark并未触及。换句话说在这种集成方式中Spark仅仅是“白嫖”了Hive的Metastore拿到数据集的元信息之后Spark SQL自行加载数据、自行处理如下图所示。
在第一种集成方式下通过sql API你可以直接提交复杂的SQL语句也可以在创建DataFrame之后再使用第16讲提到的各种算子去实现业务逻辑。
spark-sql CLI + Hive Metastore
不过你可能会说“既然是搭建数仓那么能不能像使用普通数据库那样直接输入SQL查询绕过SparkSession的sql API呢
答案自然是肯定的接下来我们就来说说Spark with Hive的第二种集成方式spark-sql CLI + Hive Metastore。与spark-shell、spark-submit类似spark-sql也是Spark内置的系统命令。将配置好hive.metastore.uris参数的hive-site.xml文件放到Spark安装目录的conf下我们即可在spark-sql中直接使用SQL语句来查询或是处理Hive表。
显然在这种集成模式下Spark和Hive的关系与刚刚讲的SparkSession + Hive Metastore一样本质上都是Spark通过Hive Metastore来扩充数据源。
不过相比前者spark-sql CLI的集成方式多了一层限制那就是在部署上spark-sql CLI与Hive Metastore必须安装在同一个计算节点。换句话说spark-sql CLI只能在本地访问Hive Metastore而没有办法通过远程的方式来做到这一点。
在绝大多数的工业级生产系统中不同的大数据组件往往是单独部署的Hive与Spark也不例外。由于Hive Metastore可用于服务不同的计算引擎如前面提到的Presto、Impala因此为了减轻节点的工作负载Hive Metastore往往会部署到一台相对独立的计算节点。
在这样的背景下不得不说spark-sql CLI本地访问的限制极大地削弱了它的适用场景这也是spark-sql CLI + Hive Metastore这种集成方式几乎无人问津的根本原因。不过这并不妨碍我们学习并了解它这有助于我们对Spark与Hive之间的关系加深理解。
Beeline + Spark Thrift Server
说到这里你可能会追问“既然spark-sql CLI有这样那样的限制那么还有没有其他集成方式既能够部署到生产系统又能让开发者写SQL查询呢”答案自然是“有”Spark with Hive集成的第三种途径就是使用Beeline客户端去连接Spark Thrift Server从而完成Hive表的访问与处理。
Beeline原本是Hive客户端通过JDBC接入Hive Server 2。Hive Server 2可以同时服务多个客户端从而提供多租户的Hive查询服务。由于Hive Server 2的实现采用了Thrift RPC协议框架因此很多时候我们又把Hive Server 2称为“Hive Thrift Server 2”。
通过Hive Server 2接入的查询请求经由Hive Driver的解析、规划与优化交给Hive搭载的计算引擎付诸执行。相应地查询结果再由Hiver Server 2返还给Beeline客户端如下图右侧的虚线框所示。
Spark Thrift Server脱胎于Hive Server 2在接收查询、多租户服务、权限管理等方面这两个服务端的实现逻辑几乎一模一样。它们最大的不同在于SQL查询接入之后的解析、规划、优化与执行。
我们刚刚说过Hive Server 2的“后台”是Hive的那套基础架构。而SQL查询在接入到Spark Thrift Server之后它首先会交由Spark SQL优化引擎进行一系列的优化。
在第14讲我们提过借助于Catalyst与Tungsten这对“左膀右臂”Spark SQL对SQL查询语句先后进行语法解析、语法树构建、逻辑优化、物理优化、数据结构优化、以及执行代码优化等等。然后Spark SQL将优化过后的执行计划交付给Spark Core执行引擎付诸运行。
不难发现SQL查询在接入Spark Thrift Server之后的执行路径与DataFrame在Spark中的执行路径是完全一致的。
理清了Spark Thrift Server与Hive Server 2之间的区别与联系之后接下来我们来说说Spark Thrift Server的启动与Beeline的具体用法。要启动Spark Thrift Server我们只需调用Spark提供的start-thriftserver.sh脚本即可。
// SPARK_HOME环境变量指向Spark安装目录
cd $SPARK_HOME/sbin
// 启动Spark Thrift Server
./start-thriftserver.sh
脚本执行成功之后Spark Thrift Server默认在10000端口监听JDBC/ODBC的连接请求。有意思的是关于监听端口的设置Spark复用了Hive的hive.server2.thrift.port参数。与其他的Hive参数一样hive.server2.thrift.port同样要在hive-site.xml配置文件中设置。
一旦Spark Thrift Server启动成功我们就可以在任意节点上通过Beeline客户端来访问该服务。在客户端与服务端之间成功建立连接Connections之后咱们就能在Beeline客户端使用SQL语句处理Hive表了。需要注意的是在这种集成模式下SQL语句背后的优化与计算引擎是Spark。
/**
用Beeline客户端连接Spark Thrift Server
其中hostname是Spark Thrift Server服务所在节点
*/
beeline -u “jdbc:hive2://hostname:10000”
好啦到此为止Spark with Hive这类集成方式我们就讲完了。
为了巩固刚刚学过的内容咱们趁热打铁一起来做个简单的小结。不论是SparkSession + Hive Metastore、spark-sql CLI + Hive Metastore还是Beeline + Spark Thrift ServerSpark扮演的角色都是执行引擎而Hive的作用主要在于通过Metastore提供底层数据集的元数据。不难发现在这类集成方式中Spark唱“主角”而Hive唱“配角”。
Hive on Spark
说到这里你可能会好奇“对于Hive社区与Spark社区来说大家都是平等的那么有没有Hive唱主角而Spark唱配角的时候呢”还真有这就是Spark与Hive集成的另一种形式Hive on Spark。
基本原理
在这一讲的开头我们简单介绍了Hive的基础架构。Hive的松耦合设计使得它的Metastore、底层文件系统、以及执行引擎都是可插拔、可替换的。
在执行引擎方面Hive默认搭载的是Hadoop MapReduce但它同时也支持Tez和Spark。所谓的“Hive on Spark”实际上指的就是Hive采用Spark作为其后端的分布式执行引擎如下图所示。
从用户的视角来看使用Hive on MapReduce或是Hive on Tez与使用Hive on Spark没有任何区别执行引擎的切换对用户来说是完全透明的。不论Hive选择哪一种执行引擎引擎仅仅负责任务的分布式计算SQL语句的解析、规划与优化通通由Hive的Driver来完成。
为了搭载不同的执行引擎Hive还需要做一些简单的适配从而把优化过的执行计划“翻译”成底层计算引擎的语义。
举例来说在Hive on Spark的集成方式中Hive在将SQL语句转换为执行计划之后还需要把执行计划“翻译”成RDD语义下的DAG然后再把DAG交付给Spark Core付诸执行。从第14讲到现在我们一直在强调Spark SQL除了扮演数据分析子框架的角色之外还是Spark新一代的优化引擎。
在Hive on Spark这种集成模式下Hive与Spark衔接的部分是Spark Core而不是Spark SQL这一点需要我们特别注意。这也是为什么相比Hive on SparkSpark with Hive的集成在执行性能上会更胜一筹。毕竟Spark SQL + Spark Core这种原装组合相比Hive Driver + Spark Core这种适配组合在契合度上要更高一些。
集成实现
分析完原理之后接下来我们再来说说Hive on Spark的集成到底该怎么实现。
首先既然我们想让Hive搭载Spark那么我们事先得准备好一套完备的Spark部署。对于Spark的部署模式Hive不做任何限定Spark on Standalone、Spark on Yarn或是Spark on Kubernetes都是可以的。
Spark集群准备好之后我们就可以通过修改hive-site.xml中相关的配置项来轻松地完成Hive on Spark的集成如下表所示。
其中hive.execution.engine用于指定Hive后端执行引擎可选值有“mapreduce”、“tez”和“spark”显然将该参数设置为“spark”即表示采用Hive on Spark的集成方式。
确定了执行引擎之后接下来我们自然要告诉Hive“Spark集群部署在哪里”spark.master正是为了实现这个目的。另外为了方便Hive调用Spark的相关脚本与Jar包我们还需要通过spark.home参数来指定Spark的安装目录。
配置好这3个参数之后我们就可以用Hive SQL向Hive提交查询请求而Hive则是先通过访问Metastore在Driver端完成执行计划的制定与优化然后再将其“翻译”为RDD语义下的DAG最后把DAG交给后端的Spark去执行分布式计算。
当你在终端看到“Hive on Spark”的字样时就证明Hive后台的执行引擎确实是Spark如下图所示。
当然除了上述3个配置项以外Hive还提供了更多的参数用于微调它与Spark之间的交互。对于这些参数你可以通过访问Hive on Spark配置项列表来查看。不仅如此在第12讲我们详细介绍了Spark自身的基础配置项这些配置项都可以配置到hive-site.xml中方便你更细粒度地控制Hive与Spark之间的集成。
重点回顾
好啦,到此为止,今天的内容就全部讲完啦!内容有点多,我们一起来做个总结。
今天这一讲你需要了解Spark与Hive常见的两类集成方式Spark with Hive和Hive on Spark。前者由Spark社区主导以Spark为主、Hive为辅后者则由Hive社区主导以Hive为主、Spark为辅。两类集成方式各有千秋适用场景各有不同。
在Spark with Hive这类集成方式中Spark主要是利用Hive Metastore来扩充数据源从而降低分布式文件的管理与维护成本如路径管理、分区管理、Schema维护等等。
对于Spark with Hive我们至少有3种途径来实现Spark与Hive的集成分别是SparkSession + Hive Metastorespark-sql CLI + Hive Metastore和Beeline + Spark Thrift Server。对于这3种集成方式我把整理了表格供你随时查看。
与Spark with Hive相对另一类集成方式是Hive on Spark。这种集成方式本质上是Hive社区为Hive用户提供了一种新的选项这个选项就是在执行引擎方面除了原有的MapReduce与Tez开发者还可以选择执行性能更佳的Spark。
因此在Spark大行其道的当下习惯使用Hive的团队与开发者更愿意去尝试和采用Spark作为后端的执行引擎。
熟悉了不同集成方式的区别与适用场景之后在日后的工作中当你需要将Spark与Hive做集成的时候就可以做到有的放矢、有章可循加油。
每课一练
在Hive on Spark的部署模式下用另外一套Spark部署去访问Hive Metastore比如通过创建SparkSession并访问Hive Metastore来扩充数据源。那么在这种情况下你能大概说一说用户代码的执行路径吗
尽管咱们专栏的主题是Spark但我强烈建议你学习并牢记Hive的架构设计。松耦合的设计理念让Hive本身非常轻量的同时还给予了Hive极大的扩展能力。也正因如此Hive才能一直牢牢占据开源数仓霸主的地位。Hive的设计思想是非常值得我们好好学习的这样的设计思想可以推而广之应用到任何需要考虑架构设计的地方不论是前端、后端还是大数据与机器学习。
欢迎你在留言区跟我交流互动,也欢迎把这一讲的内容分享给更多同事、朋友。

View File

@ -0,0 +1,186 @@
因收到Google相关通知网站将会择期关闭。相关通知内容
32 Window操作&Watermark流处理引擎提供了哪些优秀机制
你好,我是吴磊。
在上一讲我们从原理的角度出发学习了Structured Streaming的计算模型与容错机制。深入理解这些基本原理会帮我们开发流处理应用打下坚实的基础。
在“流动的Word Count”[那一讲]我们演示了在Structured Streaming框架下如何做流处理开发的一般流程。基于readStream API与writeStream API我们可以像读写DataFrame那样轻松地从Source获取数据流并把处理过的数据写入Sink。
今天这一讲咱们从功能的视角出发继续来聊一聊Structured Streaming流处理引擎都为开发者都提供了哪些特性与能力让你更灵活地设计并实现流处理应用。
Structured Streaming怎样坐享其成
学习过计算模型之后我们知道不管是Batch mode的多个Micro-batch、多个作业的执行方式还是Continuous mode下的一个Long running job这些作业的执行计划最终都会交付给Spark SQL与Spark Core付诸优化与执行。
而这会带来两个方面的收益。一方面凡是Spark SQL支持的开发能力不论是丰富的DataFrame算子还是灵活的SQL查询Structured Streaming引擎都可以拿来即用。基于之前学过的内容我们可以像处理普通的DataFrame那样对基于流数据构建的DataFrame做各式各样的转换与聚合。
另一方面既然开发入口同为DataFrame那么流处理应用同样能够享有Spark SQL提供的“性能红利”。在Spark SQL学习模块我们学习过Catalyst优化器与Tungsten这两个组件会对用户代码做高度优化从而提升应用的执行性能。
因此就框架的功能来说我们可以简单地概括为Spark SQL所拥有的能力Structured Streaming都有。不过除了基本的数据处理能力以外为了更好地支持流计算场景Structured Streaming引擎还提供了一些专门针对流处理的计算能力比如说Window操作、Watermark与延迟数据处理等等。
Window操作
我们先来说说Window操作它指的是Structured Streaming引擎会基于一定的时间窗口对数据流中的消息进行消费并处理。这是什么意思呢首先我们需要了解两个基本概念Event Time和Processing Time也即事件时间和处理时间。
所谓事件时间它指的是消息生成的时间比如我们在netcat中敲入“Apache Spark”的时间戳是“2021-10-01 09:30:00”那么这个时间就是消息“Apache Spark”的事件时间。
而处理时间它指的是这个消息到达Structured Streaming引擎的时间因此也有人把处理时间称作是到达时间Arrival Time也即消息到达流处理系统的时间。显然处理时间要滞后于事件时间。
所谓Window操作实际上就是Structured Streaming引擎基于事件时间或是处理时间以固定间隔划定时间窗口然后以窗口为粒度处理消息。在窗口的划分上Structured Streaming支持两种划分方式一种叫做Tumbling Window另一种叫做Sliding Window。
我们可以用一句话来记住二者之间的区别Tumbling Window划分出来的时间窗口“不重不漏”而Sliding Window划分出来的窗口可能会重叠、也可能会有遗漏如下图所示。
不难发现Sliding Window划分出来的窗口是否存在“重、漏”取决于窗口间隔Interval与窗口大小Size之间的关系。Tumbling Window与Sliding Window并无优劣之分完全取决于应用场景与业务需要。
干讲理论总是枯燥无趣接下来咱们对之前的“流动的Word Count”稍作调整来演示Structured Streaming中的Window操作。为了让演示的过程更加清晰明了这里我们采用Tumbling Window的划分方式Sliding Window留给你作为课后作业。
为了完成实验我们还是需要准备好两个终端。第一个终端用于启动spark-shell并提交流处理代码而第二个终端用于启动netcat、输入数据流。要基于窗口去统计单词我们仅需调整数据处理部分的代码readStream与writeStreamUpdate Mode部分的代码不需要任何改动。因此为了聚焦Window操作的学习我这里仅贴出了有所变动的部分。
df = df.withColumn("inputs", split($"value", ","))
// 提取事件时间
.withColumn("eventTime", element_at(col("inputs"),1).cast("timestamp"))
// 提取单词序列
.withColumn("words", split(element_at(col("inputs"),2), " "))
// 拆分单词
.withColumn("word", explode($"words"))
// 按照Tumbling Window与单词做分组
.groupBy(window(col("eventTime"), "5 minute"), col("word"))
// 统计计数
.count()
为了模拟事件时间我们在netcat终端输入的消息会同时包含时间戳和单词序列。两者之间以逗号分隔而单词与单词之间还是用空格分隔如下表所示。
因此,对于输入数据的处理,我们首先要分别提取出时间戳和单词序列,然后再把单词序列展开为单词。接下来,我们按照时间窗口与单词做分组,这里需要我们特别关注这行代码:
// 按照Tumbling Window与单词做分组
.groupBy(window(col("eventTime"), "5 minute"), col("word"))
其中window(col(“eventTime”), “5 minute”)的含义就是以事件时间为准以5分钟为间隔创建Tumbling时间窗口。显然window函数的第一个参数就是创建窗口所依赖的时间轴而第二个参数则指定了窗口大小Size。说到这里你可能会问“如果我想创建Sliding Window该怎么做呢
其实非常简单只需要在window函数的调用中再添加第三个参数即可也就是窗口间隔Interval。比如说我们还是想创建大小为5分钟的窗口但是使用以3分钟为间隔进行滑动的方式去创建那么我们就可以这样来实现window(col(“eventTime”), “5 minute”, “3 minute”)。是不是很简单?
完成基于窗口和单词的分组之后我们就可以继续调用count来完成计数了。不难发现代码中的大多数转换操作实际上都是我们常见的DataFrame算子这也印证了这讲开头说的Structured Streaming先天优势就是能坐享其成享有Spark SQL提供的“性能红利”。
代码准备好之后我们就可以把它们陆续敲入到spark-shell并等待来自netcat的数据流。切换到netcat终端并陆续注意是陆续输入刚刚的文本内容我们就可以在spark-shell终端看到如下的计算结果。
可以看到与“流动的Word Count”不同这里的统计计数是以窗口5分钟为粒度的。对于每一个时间窗口来说Structured Streaming引擎都会把事件时间落入该窗口的单词统计在内。不难推断随着时间向前推进已经计算过的窗口将不会再有状态上的更新。
比方说当引擎处理到“2021-10-01 09:39:00,Spark Streaming”这条消息记作消息39理论上前一个窗口“{2021-10-01 09:30:00, 2021-10-01 09:35:00}”记作窗口30-35的状态也就是不同单词的统计计数应该不会再有变化。
说到这里你可能会有这样的疑问“那不见得啊如果在消息39之后引擎又接收到一条事件时间落在窗口30-35的消息那该怎么办呢”要回答这个问题我们还得从Late data和Structured Streaming的Watermark机制说起。
Late data与Watermark
我们先来说Late data所谓Late data它指的是那些事件时间与处理时间不一致的消息。虽然听上去有点绕但通过下面的图解我们就能瞬间理解Late data的含义。
通常来说,消息生成的时间,与消息到达流处理引擎的时间,应该是一致的。也即先生成的消息先到达,而后生成的消息后到达,就像上图中灰色部分消息所示意的那样。
不过在现实情况中总会有一些消息因为网络延迟或者这样那样的一些原因它们的处理时间与事件时间存在着比较大的偏差。这些消息到达引擎的时间甚至晚于那些在它们之后才生成的消息。像这样的消息我们统称为“Late data”如图中红色部分的消息所示。
由于有Late data的存在流处理引擎就需要一个机制来判定Late data的有效性从而决定是否让晚到的消息参与到之前窗口的计算。
就拿红色的“Spark is cool”消息来说在它到达Structured Streaming引擎的时候属于它的事件时间窗口“{2021-10-01 09:30:00, 2021-10-01 09:35:00}”已经关闭了。那么在这种情况下Structured Streaming到底要不要用消息“Spark is cool”中的单词去更新窗口30-35的状态单词计数
为了解决Late data的问题Structured Streaming采用了一种叫作Watermark的机制来应对。为了让你能够更容易地理解Watermark机制的原理在去探讨它之前我们先来澄清两个极其相似但是又完全不同的概念水印和水位线。
要说清楚水印和水位线,咱们不妨来做个思想实验。假设桌子上有一盒鲜牛奶、一个吸管、还有一个玻璃杯。我们把盒子开个口,把牛奶全部倒入玻璃杯,接着,把吸管插入玻璃杯,然后通过吸管喝一口新鲜美味的牛奶。好啦,实验做完了,接下来,我们用它来帮我们澄清概念。
如图所示,最开始的时候,我们把牛奶倒到水印标示出来的高度,然后用吸管喝牛奶。不过,不论我们通过吸管喝多少牛奶,水印位置的牛奶痕迹都不会消失,也就是说,水印的位置是相对固定的。而水位线则不同,我们喝得越多,水位线下降得就越快,直到把牛奶喝光,水位线降低到玻璃杯底部。
好啦澄清了水印与水位线的概念之后我们还需要把这两个概念与流处理中的概念对应上。毕竟“倒牛奶”的思想实验是用来辅助我们学习Watermark机制的。
首先水印与水位线对标的都是消息的事件时间。水印相当于系统当前接收到的所有消息中最大的事件时间。而水位线指的是水印对应的事件时间减去用户设置的容忍值。为了叙述方便我们把这个容忍值记作T。在Structured Streaming中我们把水位线对应的事件时间称作Watermark如下图所示。
显然在流处理引擎不停地接收消息的过程中水印与水位线也会相应地跟着变化。这个过程跟我们刚刚操作的“倒牛奶、喝牛奶”的过程很像。每当新到消息的事件时间大于当前水印的时候系统就会更新水印这就好比我们往玻璃杯里倒牛奶一直倒到最大事件时间的位置。然后我们用吸管喝牛奶吸掉深度为T的牛奶让水位线下降到Watermark的位置。
把不同的概念关联上之后接下来我们来正式地介绍Structured Streaming的Watermark机制。我们刚刚说过Watermark机制是用来决定哪些Late data可以参与过往窗口状态的更新而哪些Late data则惨遭抛弃。
如果用文字去解释Watermark机制很容易把人说得云里雾里因此咱们不妨用一张流程图来阐释这个过程。
可以看到当有新消息到达系统后Structured Streaming首先判断它的事件时间是否大于水印。如果事件时间大于水印的话Watermark机制则相应地更新水印与水位线也就是最大事件时间与Watermark。
相反假设新到消息的事件时间在当前水印以下那么系统进一步判断消息的事件时间与“Watermark时间窗口下沿”的关系。所谓“Watermark时间窗口下沿”它指的是Watermark所属时间窗口的起始时间。
咱们来举例说明假设Watermark为“2021-10-01 09:34:00”且事件时间窗口大小为5分钟那么Watermark所在时间窗口就是[“2021-10-01 09:30:00”“2021-10-01 09:35:00”]也即窗口30-35。这个时候“Watermark时间窗口下沿”就是窗口30-35的起始时间也就是“2021-10-01 09:30:00”如下图所示。
对于最新到达的消息如果其事件时间大于“Watermark时间窗口下沿”则消息可以参与过往窗口的状态更新否则消息将被系统抛弃不再参与计算。换句话说凡是事件时间小于“Watermark时间窗口下沿”的消息系统都认为这样的消息来得太迟了没有资格再去更新以往计算过的窗口。
不难发现在这个过程中延迟容忍度T是Watermark机制中的决定性因素它决定了“多迟”的消息可以被系统容忍并接受。那么问题来了既然T是由用户设定的那么用户通过什么途径来设定这个T呢再者在Structured Streaming的开发框架下Watermark机制要如何生效呢
其实要开启Watermark机制、并设置容忍度T我们只需一行代码即可搞定。接下来我们就以刚刚“带窗口的流动Word Count”为例演示并说明Watermark机制的具体用法。
df = df.withColumn("inputs", split($"value", ","))
// 提取事件时间
.withColumn("eventTime", element_at(col("inputs"),1).cast("timestamp"))
// 提取单词序列
.withColumn("words", split(element_at(col("inputs"),2), " "))
// 拆分单词
.withColumn("word", explode($"words"))
// 启用Watermark机制指定容忍度T为10分钟
.withWatermark("eventTime", "10 minute")
// 按照Tumbling Window与单词做分组
.groupBy(window(col("eventTime"), "5 minute"), col("word"))
// 统计计数
.count()
可以看到,除了“.withWatermark(“eventTime”, “10 minute”)”这一句代码其他部分与“带窗口的流动Word Count”都是一样的。这里我们用withWatermark函数来启用Watermark机制该函数有两个参数第一个参数是事件时间而第二个参数就是由用户指定的容忍度T。
为了演示Watermark机制产生的效果接下来咱们对netcat输入的数据流做一些调整如下表所示。注意消息7“Test Test”和消息8“Spark is cool”都是Late data。
基于我们刚刚对于Watermark机制的分析在容忍度T为10分钟的情况下Late data消息8“Spark is cool”会被系统接受并消费而消息7“Test Test”则将惨遭抛弃。你不妨先花点时间自行推断出这一结论然后再来看后面的结果演示。
上图中左侧是输入消息7“Test Test”时spark-shell端的输出可以看到消息7被系统丢弃没能参与计算。而右侧是消息8“Spark is cool”对应的执行结果可以看到“Spark”、“is”、“cool”这3个单词成功地更新了之前窗口30-35的状态注意这里的“Spark”计数为3而不是1
重点回顾
好啦今天的内容到这里就讲完了我们一起来做个总结。首先我们需要知道在数据处理方面Structured Streaming完全可以复用Spark SQL现有的功能与性能优势。因此开发者完全可以“坐享其成”使用DataFrame算子或是SQL语句来完成流数据的处理。
再者我们需要特别关注并掌握Structured Streaming的Window操作与Watermark机制。Structured Streaming支持两类窗口一个是“不重不漏”的Tumbling Window另一个是“可重可漏”的Sliding Window。二者并无高下之分作为开发者我们可以使用window函数结合事件时间、窗口大小、窗口间隔等多个参数来灵活地在两种窗口之间进行取舍。
对于Late data的处理Structured Streaming使用Watermark机制来决定其是否参与过往窗口的计算与更新。关于Watermark机制的工作原理我把它整理到了下面的流程图中供你随时查看。
每课一练
请你结合Tumbling Window的代码把Tumbling Window改为Sliding Window。-
对于Watermark机制中的示例请你分析一下为什么消息8“Spark is cool”会被系统接受并处理而消息7“Test Test”却惨遭抛弃
欢迎你在留言区跟我交流讨论,也推荐你把这一讲分享给更多同事、朋友。

View File

@ -0,0 +1,421 @@
因收到Google相关通知网站将会择期关闭。相关通知内容
34 Spark + Kafka流计算中的“万金油”
你好,我是吴磊。
在前面的几讲中咱们不止一次提到就数据源来说Kafka是Structured Streaming最重要的Source之一。在工业级的生产系统中Kafka与Spark这对组合最为常见。因此掌握Kafka与Spark的集成对于想从事流计算方向的同学来说是至关重要的。
今天这一讲咱们就来结合实例说一说Spark与Kafka这对“万金油”组合如何使用。随着业务飞速发展各家公司的集群规模都是有增无减。在集群规模暴涨的情况下资源利用率逐渐成为大家越来越关注的焦点。毕竟不管是自建的Data center还是公有云每台机器都是真金白银的投入。
实例:资源利用率实时计算
咱们今天的实例就和资源利用率的实时计算有关。具体来说我们首先需要搜集集群中每台机器的资源CPU、内存利用率并将其写入Kafka。然后我们使用Spark的Structured Streaming来消费Kafka数据流并对资源利用率数据做初步的分析与聚合。最后再通过Structured Streaming将聚合结果打印到Console、并写回到Kafka如下图所示。
一般来说在工业级应用中上图中的每一个圆角矩形在部署上都是独立的。绿色矩形代表待监测的服务器集群蓝色矩形表示独立部署的Kafka集群而红色的Spark集群也是独立部署的。所谓独立部署它指的是集群之间不共享机器资源如下图所示。
如果你手头上没有这样的部署环境,也不用担心。要完成资源利用率实时计算的实例,咱们不必非要依赖独立部署的分布式集群。实际上,仅在单机环境中,你就可以复现今天的实例。
课程安排
今天这一讲涉及的内容比较多,在正式开始课程之前,咱们不妨先梳理一下课程内容,让你做到心中有数。
对于上图的1、2、3、4这四个步骤我们会结合代码实现分别讲解如下这四个环节
生成CPU与内存消耗数据流写入Kafka-
Structured Streaming消费Kafka数据并做初步聚合-
Structured Streaming将计算结果打印到终端-
Structured Streaming将计算结果写回Kafka以备后用。
除此之外为了照顾不熟悉Kafka的同学咱们还会对Kafka的安装、Topic创建与消费、以及Kafka的基本概念做一个简单的梳理。
速读Kafka的架构与运行机制
在完成前面交代的计算环节之前我们需要了解Kafka都提供了哪些核心功能。
在大数据的流计算生态中Kafka是应用最为广泛的消息中间件Messaging Queue。消息中间件的核心功能有以下三点。
连接消息生产者与消息消费者;-
缓存生产者生产的消息(或者说事件);-
有能力让消费者以最低延迟访问到消息。
所谓消息生产者它指的是事件或消息的来源与渠道。在我们的例子中待监测集群就是生产者。集群中的机器源源不断地生产资源利用率消息。相应地消息的消费者它指的是访问并处理消息的系统。显然在这一讲的例子中消费者是Spark。Structured Streaming读取并处理Kafka中的资源利用率消息对其进行聚合、汇总。
经过前面的分析,我们不难发现,消息中间件的存在,让生产者与消费者这两个系统之间,天然地享有如下三方面的收益。
解耦:双方无需感知对方的存在,二者除了消息本身以外,再无交集;
异步:双方都可以按照自己的“节奏”和“步调”,来生产或是消费消息,而不必受制于对方的处理能力;
削峰:当消费者订阅了多个生产者的消息,且多个生产者同时生成大量消息时,得益于异步模式,消费者可以灵活地消费并处理消息,从而避免计算资源被撑爆的隐患。
好啦了解了Kafka的核心功能与特性之后接下来我们说一说Kafka的系统架构。与大多数主从架构的大数据组件如HDFS、YARN、Spark、Presto、Flink等等不同Kafka为无主架构。也就是说在Kafka集群中没有Master这样一个角色来维护全局的数据状态。
集群中的每台Server被称为Kafka BrokerBroker的职责在于存储生产者生产的消息并为消费者提供数据访问。Broker与Broker之间都是相互独立的彼此不存在任何的依赖关系。
如果就这么平铺直叙去介绍Kafka架构的话难免让你昏昏欲睡所以我们上图解。配合示意图解释Kafka中的关键概念会更加直观易懂。
刚刚说过Kafka为无主架构它依赖ZooKeeper来存储并维护全局元信息。所谓元信息它指的是消息在Kafka集群中的分布与状态。在逻辑上消息隶属于一个又一个的Topic也就是消息的话题或是主题。在上面的示意图中蓝色圆角矩形所代表的消息全部隶属于Topic A而绿色圆角矩形则隶属于Topic B。
而在资源利用率的实例中我们会创建两个Topic一个是CPU利用率cpu-monitor另一个是内存利用率mem-monitor。生产者在向Kafka写入消息的时候需要明确指明消息隶属于哪一个Topic。比方说关于CPU的监控数据应当发往cpu-monitor而对于内存的监控数据则应该发往mem-monitor。
为了平衡不同Broker之间的工作负载在物理上同一个Topic中的消息以分区、也就是Partition为粒度进行存储示意图中的圆角矩形代表的正是一个个数据分区。在Kafka中一个分区实际上就是磁盘上的一个文件目录。而消息则依序存储在分区目录的文件中。
为了提供数据访问的高可用HAHigh Availability在生产者把消息写入主分区Leader之后Kafka会把消息同步到多个分区副本Follower示意图中的步骤1与步骤2演示了这个过程。
一般来说消费者默认会从主分区拉取并消费数据如图中的步骤3所示。而当主分区出现故障、导致数据不可用时Kafka就会从剩余的分区副本中选拔出一个新的主分区来对外提供服务这个过程又称作“选主”。
好啦到此为止Kafka的基础功能和运行机制我们就讲完了尽管这些介绍不足以覆盖Kafka的全貌但是对于初学者来说这些概念足以帮我们进军实战做好Kafka与Spark的集成。
Kafka与Spark集成
接下来咱们就来围绕着“资源利用率实时计算”这个例子手把手地带你实现Kafka与Spark的集成过程。首先第一步我们先来准备Kafka环境。
Kafka环境准备
要配置Kafka环境我们只需要简单的三个步骤即可
安装ZooKeeper、安装Kafka启动ZooKeeper-
修改Kafka配置文件server.properties设置ZooKeeper相关配置项-
启动Kafka创建Topic。
首先,咱们从 ZooKeeper官网与 Kafka官网分别下载二者的安装包。然后依次解压安装包、并配置相关环境变量即可如下表所示。
// 下载ZooKeeper安装包
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
// 下载Kafka安装包
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz
// 把ZooKeeper解压并安装到指定目录
tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz -C /opt/zookeeper
// 把Kafka解压并安装到指定目录
tar -zxvf kafka_2.12-2.8.0.tgz -C /opt/kafka
// 编辑环境变量
vi ~/.bash_profile
/** 输入如下内容到文件中
export ZOOKEEPER_HOME=/opt/zookeeper/apache-zookeeper-3.7.0-bin
export KAFKA_HOME=/opt/kafka/kafka_2.12-2.8.0
export PATH=$PATH:$ZOOKEEPER_HOME/bin:$KAFKA_HOME/bin
*/
// 启动ZooKeeper
zkServer.sh start
接下来我们打开Kafka配置目录下也即$KAFKA_HOME/config的server.properties文件将其中的配置项zookeeper.connect设置为“hostname:2181”也就是主机名加端口号。
如果你把ZooKeeper和Kafka安装到同一个节点那么hostname可以写localhost。而如果是分布式部署hostname要写ZooKeeper所在的安装节点。一般来说ZooKeeper默认使用2181端口来提供服务这里我们使用默认端口即可。
配置文件设置完毕之后我们就可以使用如下命令在多个节点启动Kafka Broker。
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
Kafka启动之后咱们就来创建刚刚提到的两个Topiccpu-monitor和mem-monitor它们分别用来存储CPU利用率消息与内存利用率消息。
kafka-topics.sh --zookeeper hostname:2181/kafka --create
--topic cpu-monitor
--replication-factor 3
--partitions 1
kafka-topics.sh --zookeeper hostname:2181/kafka --create
--topic mem-monitor
--replication-factor 3
--partitions 1
怎么样是不是很简单要创建Topic只要指定ZooKeeper服务地址、Topic名字和副本数量即可。不过这里需要特别注意的是副本数量也就是replication-factor不能超过集群中的Broker数量。所以如果你是本地部署的话也就是所有服务都部署到一台节点那么这里的replication-factor应该设置为1。
好啦到此为止Kafka环境安装、配置完毕。下一步我们就该让生产者去生产资源利用率消息并把消息源源不断地注入Kafka集群了。
消息的生产
在咱们的实例中我们要做的是监测集群中每台机器的资源利用率。因此我们需要这些机器每隔一段时间就把CPU和内存利用率发送出来。而要做到这一点咱们只需要完成一下两个两个必要步骤
每台节点从本机收集CPU与内存使用数据-
把收集到的数据按照固定间隔发送给Kafka集群。-
由于消息生产这部分代码比较长而我们的重点是学习Kafka与Spark的集成因此这里咱们只给出这两个步骤所涉及的关键代码片段。完整的代码实现你可以从这里进行下载。
import java.lang.management.ManagementFactory
import java.lang.reflect.Modifier
def getUsage(mothedName: String): Any = {
// 获取操作系统Java Bean
val operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean
// 获取操作系统对象中声明过的方法
for (method <- operatingSystemMXBean.getClass.getDeclaredMethods) {
method.setAccessible(true)
// 判断是否为我们需要的方法名
if (method.getName.startsWith(mothedName) && Modifier.isPublic(method.getModifiers)) {
// 调用并执行方法获取指定资源CPU或内存的利用率
return method.invoke(operatingSystemMXBean)
}
}
throw new Exception(s"Can not reflect method: ${mothedName}")
}
// 获取CPU利用率
def getCPUUsage(): String = {
var usage = 0.0
try{
// 调用getUsage方法传入getSystemCpuLoad参数获取CPU利用率
usage = getUsage("getSystemCpuLoad").asInstanceOf[Double] * 100
} catch {
case e: Exception => throw e
}
usage.toString
}
// 获取内存利用率
def getMemoryUsage(): String = {
var freeMemory = 0L
var totalMemory = 0L
var usage = 0.0
try{
// 调用getUsage方法传入相关内存参数获取内存利用率
freeMemory = getUsage("getFreePhysicalMemorySize").asInstanceOf[Long]
totalMemory = getUsage("getTotalPhysicalMemorySize").asInstanceOf[Long]
// 用总内存减去空闲内存获取当前内存用量
usage = (totalMemory - freeMemory.doubleValue) / totalMemory * 100
} catch {
case e: Exception => throw e
}
usage.toString
}
利用Java的反射机制获取资源利用率
上面的代码用来获取CPU与内存利用率在这段代码中最核心的部分是利用Java的反射机制来获取操作系统对象的各个公有方法然后通过调用这些公有方法来完成资源利用率的获取
不过看到这你可能会说:“我并不了解Java的反射机制上面的代码看不太懂。”这也没关系只要你能结合注释把上述代码的计算逻辑搞清楚即可获取到资源利用率的数据之后接下来我们就可以把它们发送给Kafka了
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
// 初始化属性信息
def initConfig(clientID: String): Properties = {
val props = new Properties
val brokerList = "localhost:9092"
// 指定Kafka集群Broker列表
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientID)
props
}
val clientID = "usage.monitor.client"
val cpuTopic = "cpu-monitor"
val memTopic = "mem-monitor"
// 定义属性其中包括Kafka集群信息序列化方法等等
val props = initConfig(clientID)
// 定义Kafka Producer对象用于发送消息
val producer = new KafkaProducer[String, String](props)
// 回调函数可暂时忽略
val usageCallback = _
while (true) {
var cpuUsage = new String
var memoryUsage = new String
// 调用之前定义的函数获取CPU内存利用率
cpuUsage = getCPUUsage()
memoryUsage = getMemoryUsage()
// 为CPU Topic生成Kafka消息
val cpuRecord = new ProducerRecord[String, String](cpuTopic, clientID, cpuUsage)
// 为Memory Topic生成Kafka消息
val memRecord = new ProducerRecord[String, String](memTopic, clientID, memoryUsage)
// 向Kafka集群发送CPU利用率消息
producer.send(cpuRecord, usageCallback)
// 向Kafka集群发送内存利用率消息
producer.send(memRecord, usageCallback)
// 设置发送间隔2秒
Thread.sleep(2000)
}
从上面的代码中我们不难发现其中的关键步骤有三步
定义Kafka Producer对象其中需要我们在属性信息中指明Kafka集群相关信息
调用之前定义的函数getCPUUsagegetMemoryUsage获取CPU与内存资源利用率
把资源利用率封装为消息并发送给对应的Topic-
好啦到此为止生产端的事情我们就全部做完啦在待监测的集群中每隔两秒钟每台机器都会向Kafka集群的cpu-monitor和mem-monitor这两个Topic发送即时消息Kafka接收到这些消息之后会把它们落盘到相应的分区中等待着下游也就是Spark的消费
消息的消费
接下来终于要轮到Structured Streaming闪亮登场了在流计算模块的[第一讲]我们就提到Structured Streaming支持多种SourceSocketFileKafka而在这些Source中Kafka的应用最为广泛在用法上相比其他Source从Kafka接收并消费数据并没有什么两样咱们依然是依赖万能的readStream API如下表所示
import org.apache.spark.sql.DataFrame
// 依然是依赖readStream API
val dfCPU:DataFrame = spark.readStream
// format要明确指定Kafka
.format("kafka")
// 指定Kafka集群Broker地址多个Broker用逗号隔开
.option("kafka.bootstrap.servers", "hostname1:9092,hostname2:9092,hostname3:9092")
// 订阅相关的Topic这里以cpu-monitor为例
.option("subscribe", "cpu-monitor")
.load()
对于readStream API的用法想必你早已烂熟于心了上面的代码你应该会觉得看上去很眼熟这里需要我们特别注意的主要有三点
format中需要明确指定Kafka
为kafka.bootstrap.servers键值指定Kafka集群Broker多个Broker之间以逗号分隔
为subscribe键值指定需要消费的Topic名明确Structured Streaming要消费的Topic
挥完上面的三板斧之后我们就得到了用于承载CPU利用率消息的DataFrame有了DataFrame我们就可以利用Spark SQL提供的能力去做各式各样的数据处理再者结合Structured Streaming框架特有的Window和Watermark机制我们还能以时间窗口为粒度做计数统计同时决定多迟的消息我们将不再处理
不过在此之前咱们不妨先来直观看下代码感受一下存在Kafka中的消息长什么样子
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
dfCPU.writeStream
.outputMode("Complete")
// 以Console为Sink
.format("console")
// 每10秒钟触发一次Micro-batch
.trigger(Trigger.ProcessingTime(10.seconds))
.start()
.awaitTermination()
利用上述代码通过终端我们可以直接观察到Structured Streaming获取的Kafka消息从而对亟待处理的消息建立一个感性的认知如下图所示
在上面的数据中除了KeyValue以外其他信息都是消息的元信息也即消息所属Topic所在分区消息的偏移地址录入Kafka的时间等等
在咱们的实例中Key对应的是发送资源利用率数据的服务器节点而Value则是具体的CPU或是内存利用率初步熟悉了消息的Schema与构成之后接下来咱们就可以有的放矢地去处理这些实时的数据流了
对于这些每两秒钟就产生的资源利用率数据假设我们仅关心它们在一定时间内比如10秒钟的平均值那么我们就可以结合Trigger与聚合计算来做到这一点代码如下所示
import org.apache.spark.sql.types.StringType
dfCPU
.withColumn("clientName", $"key".cast(StringType))
.withColumn("cpuUsage", $"value".cast(StringType))
// 按照服务器做分组
.groupBy($"clientName")
// 求取均值
.agg(avg($"cpuUsage").cast(StringType).alias("avgCPUUsage"))
.writeStream
.outputMode("Complete")
// 以Console为Sink
.format("console")
// 每10秒触发一次Micro-batch
.trigger(Trigger.ProcessingTime(10.seconds))
.start()
.awaitTermination()
可以看到我们利用Fixed interval trigger每隔10秒创建一个Micro-batch然后在一个Micro-batch中我们按照发送消息的服务器做分组并计算CPU利用率平均值最后将统计结果打印到终端如下图所示
再次写入Kafka
实际上除了把结果打印到终端外我们还可以把它写回Kafka我们知道Structured Streaming支持种类丰富的Sink除了常用于测试的Console以外还支持FileKafkaForeach(Batch)等等要把数据写回Kafka也不难我们只需在writeStream API中指定format为Kafka并设置相关选项即可如下表所示
dfCPU
.withColumn("key", $"key".cast(StringType))
.withColumn("value", $"value".cast(StringType))
.groupBy($"key")
.agg(avg($"value").cast(StringType).alias("value"))
.writeStream
.outputMode("Complete")
// 指定Sink为Kafka
.format("kafka")
// 设置Kafka集群信息本例中只有localhost一个Kafka Broker
.option("kafka.bootstrap.servers", "localhost:9092")
// 指定待写入的Kafka Topic需事先创建好Topiccpu-monitor-agg-result
.option("topic", "cpu-monitor-agg-result")
// 指定WAL Checkpoint目录地址
.option("checkpointLocation", "/tmp/checkpoint")
.trigger(Trigger.ProcessingTime(10.seconds))
.start()
.awaitTermination()
我们首先指定Sink为Kafka然后通过option选项分别设置Kafka集群信息待写入的Topic名字以及WAL Checkpoint目录将上述代码敲入spark-shellStructured Streaming会每隔10秒钟就从Kafka拉取原始的利用率信息Topiccpu-monitor然后按照服务器做分组聚合最终再把聚合结果写回到KafkaTopiccpu-monitor-agg-result)。
这里有两点需要特别注意一个是读取与写入的Topic要分开以免造成逻辑与数据上的混乱再者细心的你可能已经发现写回Kafka的数据在Schema上必须用keyvalue这两个固定的字段而不能再像写入Console时可以灵活地定义类似于clientNameavgCPUUsage这样的字段名关于这一点还需要你特别关注
重点回顾
好啦到此为止我手把手地带你实现了Kafka与Spark的集成完成了图中涉及的每一个环节也即从消息的生产到写入Kafka再到消息的消费与处理并最终写回Kafka
今天的内容比较多你除了需要掌握集成中的每一个环节与用法外还需要了解一些有关Kafka的基本概念与特性Kafka是应用最为广泛的消息中间件Messaging Queue它的核心功能有三个
连接消息生产者与消息消费者-
缓存生产者生产的消息或者说事件-
有能力让消费者以最低延迟访问到消息-
对于Kafka的一些基本概念你无需死记硬背在需要的时候回顾后面这张架构图即可这张图中清楚地标记了Kafka的基础概念以及消息生产缓存与消费的简易流程
而对于Kafka与Spark两者的集成不管是Structured Streaming通过readStream API消费Kafka消息还是使用writeStream API将计算结果写入Kafka你只需要记住如下几点即可轻松地搭建这对万金油组合
在format函数中指定Kafka为Source或Sink
在option选项中为kafka.bootstrap.servers键值指定Kafka集群Broker
在option选项中设置subscribe或是topic指定读取或是写入的Kafka Topic
每课一练
请你结合本讲中CPU利用率的代码针对内存利用率完成示意图中的各个环节也即内存利用率消息的生产写入Kafka步骤1)、消息的消费与计算步骤23聚合结果再次写入Kafka步骤4)。
欢迎你把今天这讲内容转发给更多同事朋友跟他一起动手试验一下Spark + Kafka的实例我再留言区等你分享