皮皮网
皮皮网

【租赁源码】【最全源码网站】【sendto内核源码】spark源码 视频

来源:知识竞赛答题系统源码 发表时间:2024-12-22 16:40:16

1.Spark Core读取ES的码视分区问题分析
2.Spark ML系列RandomForestClassifier RandomForestClassificationModel随机森林原理示例源码分析
3.Spark-Submit 源码剖析
4.Spark源码解析2-YarnCluster模式启动

spark源码 视频

Spark Core读取ES的分区问题分析

       撰写本文的初衷是因近期一位星球球友面试时,面试官询问了Spark分析ES数据时,码视生成的码视RDD分区数与哪些因素相关。

       初步推测,码视这与分片数有关,码视但具体关系是码视租赁源码什么呢?以下是两种可能的关系:

       1).类似于KafkaRDD的分区与kafka topic分区数的关系,一对一。码视

       2).ES支持游标查询,码视那么是码视否可以对较大的ES索引分片进行拆分,形成多个RDD分区呢?

       下面,码视我将与大家共同探讨源码,码视了解具体情况。码视

       1.Spark Core读取ES

       ES官网提供了elasticsearch-hadoop插件,码视对于ES 7.x,码视hadoop和Spark版本的码视支持如下:

       在此,我使用的ES版本为7.1.1,测试用的Spark版本为2.3.1,没有问题。整合es和spark,导入相关依赖有两种方式:

       a,导入整个elasticsearch-hadoop包

       b,仅导入spark模块的最全源码网站

       为了方便测试,我在本机启动了一个单节点的ES实例,简单的测试代码如下:

       可以看到,Spark Core读取RDD主要有两种形式的API:

       a,esRDD。这种返回的是一个tuple2类型的RDD,第一个元素是id,第二个是一个map,包含ES的document元素。

       b,esJsonRDD。这种返回的也是一个tuple2类型的RDD,第一个元素依然是id,第二个是json字符串。

       尽管这两种RDD的类型不同,但它们都是ScalaEsRDD类型。

       要分析Spark Core读取ES的并行度,只需分析ScalaEsRDD的getPartitions函数。

       2.源码分析

       首先,导入源码github.com/elastic/elasticsearch-hadoop这个gradle工程,可以直接导入idea,然后切换到7.x版本。sendto内核源码

       接下来,找到ScalaEsRDD,发现getPartitions方法是在其父类中实现的,方法内容如下:

       esPartitions是一个lazy型的变量:

       这种声明的原因是什么呢?

       lazy+transient的原因大家可以思考一下。

       RestService.findPartitions方法只是创建客户端获取分片等信息,然后调用,分两种情况调用两个方法:

       a).findSlicePartitions

       这个方法实际上是在5.x及以后的ES版本,同时配置了

       之后,才会执行。实际上就是将ES的分片按照指定大小进行拆分,必然要先进行分片大小统计,然后计算出拆分的分区数,最后生成分区信息。具体代码如下:

       实际上,分片就是通过游标方式,对_doc进行排序,然后按照分片计算得到的分区偏移进行数据读取,组装过程是通过SearchRequestBuilder.assemble方法实现的。

       这个实际上会浪费一定的性能,如果真的要将ES与Spark结合,建议合理设置分片数。昆仑app源码

       b).findShardPartitions方法

       这个方法没有疑问,一个RDD分区对应于ES index的一个分片。

       3.总结

       以上就是Spark Core读取ES数据时,分片和RDD分区的对应关系分析。默认情况下,一个ES索引分片对应Spark RDD的一个分区。如果分片数过大,且ES版本在5.x及以上,可以配置参数

       进行拆分。

Spark ML系列RandomForestClassifier RandomForestClassificationModel随机森林原理示例源码分析

       Spark ML中的随机森林分类器(RandomForestClassifier)是一个集成学习方法的分类模型。通过使用多个决策树,它进行自助采样与特征随机选择来构建预测模型。其优势在于能够高效处理大量高维数据,对缺失值和噪声具有鲁棒性,并能评估特征重要性,同时训练过程可并行执行提高速度。参数设置如决策树数量、深度和特征选择策略直接影响模型性能和泛化能力,需根据具体问题和数据集调优以获得最佳效果。

       RandomForestClassifier用于Spark ML分类任务,封装在特定类中,obs源码demo支持数据处理与模型训练过程的关键方法。可调整参数优化模型表现,例如特征选择与决策树设置。模型通过构建包含数据转换与训练的Pipeline流程实现自动训练。

       以下为基本示例代码:

        1. 加载数据集并构建特征向量和标签索引。

        2. 将数据集划分为训练集与测试集。

        3. 创建RandomForestClassifier实例,并设定关键参数。

        4. 构建Pipeline并训练模型。

        5. 对测试集进行预测,并评估模型性能,常用指标如多分类准确率。

       代码示例中包含实现RandomForestClassifier类的构造与基本用法,如类成员、常量声明和模型对象定义等。此部分源码用于构造随机森林模型的抽象概念与实现基础。

Spark-Submit 源码剖析

       直奔主题吧:

       常规Spark提交任务脚本如下:

       其中几个关键的参数:

       再看下cluster.conf配置参数,如下:

       spark-submit提交一个job到spark集群中,大致的经历三个过程:

       代码总Main入口如下:

       Main支持两种模式CLI:SparkSubmit;SparkClass

       首先是checkArgument做参数校验

       而sparksubmit则是通过buildCommand来创建

       buildCommand核心是AbstractCommandBuilder类

       继续往下剥洋葱AbstractCommandBuilder如下:

       定义Spark命令创建的方法一个抽象类,SparkSubmitCommandBuilder刚好是实现类如下

       SparkSubmit种类可以分为以上6种。SparkSubmitCommandBuilder有两个构造方法有参数和无参数:

       有参数中根据参数传入拆分三种方式,然后通过OptionParser解析Args,构造参数创建对象后核心方法是通过buildCommand,而buildCommand又是通过buildSparkSubmitCommand来生成具体提交。

       buildSparkSubmitCommand会返回List的命令集合,分为两个部分去创建此List,

       第一个如下加入Driver_memory参数

       第二个是通过buildSparkSubmitArgs方法构建的具体参数是MASTER,DEPLOY_MODE,FILES,CLASS等等,这些就和我们上面截图中是对应上的。是通过OptionParser方式获取到。

       那么到这里的话buildCommand就生成了一个完成sparksubmit参数的命令List

       而生成命令之后执行的任务开启点在org.apache.spark.deploy.SparkSubmit.scala

       继续往下剥洋葱SparkSubmit.scala代码入口如下:

       SparkSubmit,kill,request都支持,后两个方法知识支持standalone和Mesos集群方式下。dosubmit作为函数入口,其中第一步是初始化LOG,然后初始化解析参数涉及到类

       SparkSubmitArguments作为参数初始化类,继承SparkSubmitArgumentsParser类

       其中env是测试用的,参数解析如下,parse方法继承了SparkSubmitArgumentsParser解析函数查找 args 中设置的--选项和值并解析为 name 和 value ,如 --master yarn-client 会被解析为值为 --master 的 name 和值为 yarn-client 的 value 。

       这之后调用SparkSubmitArguments#handle(MASTER, "yarn-client")进行处理。

       这个函数也很简单,根据参数 opt 及 value,设置各个成员的值。接上例,parse 中调用 handle("--master", "yarn-client")后,在 handle 函数中,master 成员将被赋值为 yarn-client。

       回到SparkSubmit.scala通过SparkSubmitArguments生成了args,然后调用action来匹配动作是submit,kill,request_status,print_version。

       直接看submit的action,doRunMain执行入口

       其中prepareSubmitEnvironment初始化环境变量该方法返回一个四元 Tuple ,分别表示子进程参数、子进程 classpath 列表、系统属性 map 、子进程 main 方法。完成了提交环境的准备工作之后,接下来就将启动子进程。

       runMain则是执行入口,入参则是执行参数SparkSubmitArguments

       Main执行非常的简单:几个核心步骤

       先是打印一串日志(可忽略),然后是创建了loader是把依赖包jar全部导入到项目中

       然后是MainClass的生成,异常处理是ClassNotFoundException和NoClassDeffoundError

       再者是生成Application,根据MainClass生成APP,最后调用start执行

       具体执行是SparkApplication.scala,那么继续往下剥~

       仔细阅读下SparkApplication还是挺深的,所以打算另外写篇继续深入研读~

Spark源码解析2-YarnCluster模式启动

       YARN 模式运行机制主要体现在Yarn Cluster 模式和Yarn Client 模式上。在Yarn Cluster模式下,SparkSubmit、ApplicationMaster 和 CoarseGrainedExecutorBackend 是独立的进程,而Driver 是独立的线程;Executor 和 YarnClusterApplication 是对象。在Yarn Client模式下,SparkSubmit、ApplicationMaster 和 YarnCoarseGrainedExecutorBackend 也是独立的进程,而Executor和Driver是对象。

       在源码中,SparkSubmit阶段首先执行Spark提交命令,底层执行的是开启SparkSubmit进程的命令。代码中,SparkSubmit从main()开始,根据运行模式获取后续要反射调用的类名赋给元组中的ChildMainClass。如果是Yarn Cluster模式,则为YarnClusterApplication;如果是Yarn Client模式,则为主类用户自定义的类。接下来,获取ChildMainClass后,通过反射调用main方法的过程,反射获取类然后通过构造器获取一个示例并多态为SparkApplication,再调用它的start方法。随后调用YarnClusterApplication的start方法。在YarnClient中,new一个Client对象,其中包含了yarnClient = YarnClient.createYarnClient属性,这是Yarn在SparkSubmit中的客户端,yarnClient在第行初始化和开始,即连接Yarn集群或RM。之后就可以通过这个客户端与Yarn的RM进行通信和提交应用,即调用run方法。

       ApplicationMaster阶段主要涉及开启一个Driver新线程、AM向RM注册、AM向RM申请资源并处理、封装ExecutorBackend启动命令以及AM向NM通信提交命令由NM启动ExecutorBackend。在ApplicationMaster进程中,首先开启Driver线程,开始运行用户自定义代码,创建Spark程序入口SparkContext,接着创建RDD,生成job,划分阶段提交Task等操作。

       在申请资源之前,AM主线程创建了Driver的终端引用,作为参数传入createAllocator(),因为Executor启动后需要向Driver反向注册,所以启动过程必须封装Driver的EndpointRef。AM主线程向RM申请获取可用资源Container,并处理这些资源。ExecutorBackend阶段尚未完成,后续内容待补充。

相关栏目:热点