皮皮网
皮皮网

【c 2048源码】【await指标源码】【redis源码驱动】spark调度源码_spark 调度

来源:JavaSprit网页源码 发表时间:2024-12-22 16:50:37

1.大数据面试题:Spark的调度源调度任务执行流程
2.Spark repartition和coalesce的区别
3.Spark Yarn 调度器Scheduler详解
4.数据底座-Rundeck任务调度系统使用手册
5.Job的基本概念和实现原理

spark调度源码_spark 调度

大数据面试题:Spark的任务执行流程

       面试题来源:

       主要探讨Spark的工作机制,包括工作流程、调度源调度调度流程、调度源调度任务调度原理、调度源调度任务提交和执行流程,调度源调度以及Spark在YARN环境下的调度源调度c 2048源码任务调度流程。此外,调度源调度还会涉及Spark job提交过程、调度源调度Spark On YARN流程中的调度源调度Client与Cluster模式,以及Spark的调度源调度执行机制。

       参考答案:

       Spark运行流程以SparkContext为总入口。调度源调度在SparkContext初始化时,调度源调度Spark创建DAGScheduler和TaskScheduler以进行作业和任务调度。调度源调度await指标源码

       运行流程概览如下:

       1)当程序提交后,调度源调度SparkSubmit进程与Master通信,调度源调度构建运行环境并启动SparkContext。SparkContext向资源管理器(如Standalone、Mesos或YARN)注册并申请执行资源。

       2)资源管理器分配Executor资源,Standalone模式下通过StandaloneExecutorBackend启动Executor。Executor运行状态会定期上报给资源管理器。

       3)SparkContext将程序构建为DAG图,将DAG分解为Stage,并将Taskset发送给TaskScheduler。Executor从SparkContext申请Task,redis源码驱动TaskScheduler将Task分发给Executor执行。同时,应用程序代码也发送至Executor。

       4)Task在Executor上执行完毕后释放资源。

       总结:

       Spark的运行架构具有以下特点:

       1)高效的数据并行处理能力,通过DAGScheduler和TaskScheduler进行任务分解和调度。

       2)灵活的资源管理,通过与资源管理器的交互,实现资源的高效分配和利用。

       3)动态的资源调度机制,确保任务能够被迅速、有效地执行。源码常用文件

       4)简洁的API和编程模型,使得开发者可以快速实现并行计算任务。

       通过这些流程和特点,Spark提供了一种高效、灵活和易于使用的并行计算框架,适用于大数据处理和分析场景。

Spark repartition和coalesce的区别

       æœ‰äº›æ—¶å€™ï¼Œåœ¨å¾ˆå¤špartition的时候,我们想减少点partition的数量,不然写到HDFS上的文件数量也会很多很多。

        我们使用reparation呢,还是coalesce。所以我们得了解这两个算子的内在区别。

        要知道,repartition是一个消耗比较昂贵的操作算子,Spark出了一个优化版的repartition叫做coalesce,它可以尽量避免数据迁移,

        但是你只能减少RDD的partition.

        举个例子,有如下数据节点分布:

        用coalesce,将partition减少到2个:

        注意,Node1 和 Node3 不需要移动原始的数据

        The repartition algorithm does a full shuffle and creates new partitions with data that’s distributed evenly.

        Let’s create a DataFrame with the numbers from 1 to .

        repartition 算法会做一个full shuffle然后均匀分布地创建新的partition。我们创建一个1-数字的DataFrame测试一下。

        刚开始数据是这样分布的:

        我们做一个full shuffle,将其repartition为2个。

        这是在我机器上数据分布的情况:

        Partition A: 1, 3, 4, 6, 7, 9, ,

        Partition B: 2, 5, 8,

        The repartition method makes new partitions and evenly distributes the data in the new partitions (the data distribution is more even for larger data sets).

        repartition方法让新的partition均匀地分布了数据(数据量大的情况下其实会更均匀)

        coalesce用已有的partition去尽量减少数据shuffle。

        repartition创建新的partition并且使用 full shuffle。

        coalesce会使得每个partition不同数量的数据分布(有些时候各个partition会有不同的size)

        然而,repartition使得每个partition的数据大小都粗略地相等。

        coalesce 与 repartition的区别(我们下面说的coalesce都默认shuffle参数为false的情况)

        repartition(numPartitions:Int):RDD[T]和coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T] repartition只是coalesce接口中shuffle为true的实现

        有1w的小文件,资源也为--executor-memory 2g --executor-cores 2 --num-executors 5。

        repartition(4):产生shuffle。这时会启动5个executor像之前介绍的那样依次读取1w个分区的文件,然后按照某个规则%4,写到4个文件中,这样分区的4个文件基本毫无规律,比较均匀。

        coalesce(4):这个coalesce不会产生shuffle。那启动5个executor在不发生shuffle的时候是如何生成4个文件呢,其实会有1个或2个或3个甚至更多的executor在空跑(具体几个executor空跑与spark调度有关,与数据本地性有关,与spark集群负载有关),他并没有读取任何数据!

        1.如果结果产生的文件数要比源RDD partition少,用coalesce是实现不了的,例如有4个小文件(4个partition),你要生成5个文件用coalesce实现不了,也就是说不产生shuffle,无法实现文件数变多。

        2.如果你只有1个executor(1个core),源RDD partition有5个,你要用coalesce产生2个文件。那么他是预分partition到executor上的,例如0-2号分区在先executor上执行完毕,3-4号分区再次在同一个executor执行。其实都是同一个executor但是前后要串行读不同数据。与用repartition(2)在读partition上有较大不同(串行依次读0-4号partition 做%2处理)。

        T表有G数据 有个partition 资源也为--executor-memory 2g --executor-cores 2 --num-executors 5。我们想要结果文件只有一个

Spark Yarn 调度器Scheduler详解

       ä¸€ã€è°ƒåº¦å™¨çš„选择

       åœ¨Yarn中有三种调度器可以选择:FIFO Scheduler,Capacity Scheduler,FairS cheduler。

        FIFO Scheduler把应用按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,以此类推。

        FIFO Scheduler是最简单也是最容易理解的调度器,也不需要任何配置,但它并不适用于共享集群。大的应用可能会占用所有集群资源,这就导致其它应用被阻塞。在共享集群中,更适合采用Capacity Scheduler或Fair Scheduler,这两个调度器都允许大任务和小任务在提交的同时获得一定的系统资源。

        下面 “Yarn调度器对比图” 展示了这几个调度器的区别,从图中可以看出,在FIFO 调度器中,小任务会被大任务阻塞。

        而对于Capacity调度器,有一个专门的队列用来运行小任务,但是为小任务专门设置一个队列会预先占用一定的集群资源,这就导致大任务的执行时间会落后于使用FIFO调度器时的时间。

        在Fair调度器中,我们不需要预先占用一定的系统资源,Fair调度器会为所有运行的job动态的调整系统资源。如下图所示,当第一个大job提交时,只有这一个job在运行,此时它获得了所有集群资源;当第二个小任务提交后,Fair调度器会分配一半资源给这个小任务,让这两个任务公平的共享集群资源。

        需要注意的是,在下图Fair调度器中,从第二个任务提交到获得资源会有一定的延迟,因为它需要等待第一个任务释放占用的Container。小任务执行完成之后也会释放自己占用的资源,大任务又获得了全部的系统资源。最终的效果就是Fair调度器即得到了高的资源利用率又能保证小任务及时完成。

        Yarn调度器对比图:

        二、Capacity Scheduler(容器调度器)的配置

        2.1 容器调度介绍

        Capacity 调度器允许多个组织共享整个集群,每个组织可以获得集群的一部分计算能力。通过为每个组织分配专门的队列,然后再为每个队列分配一定的集群资源,这样整个集群就可以通过设置多个队列的方式给多个组织提供服务了。除此之外,队列内部又可以垂直划分,这样一个组织内部的多个成员就可以共享这个队列资源了,在一个队列内部,资源的调度是采用的是先进先出(FIFO)策略。

        通过上面那幅图,我们已经知道一个job可能使用不了整个队列的资源。然而如果这个队列中运行多个job,如果这个队列的资源够用,那么就分配给这些job,如果这个队列的资源不够用了呢?其实Capacity调度器仍可能分配额外的资源给这个队列,这就是 “弹性队列”(queue elasticity) 的概念。

        在正常的操作中,Capacity调度器不会强制释放Container,当一个队列资源不够用时,这个队列只能获得其它队列释放后的Container资源。当然,我们可以为队列设置一个最大资源使用量,以免这个队列过多的占用空闲资源,导致其它队列无法使用这些空闲资源,这就是”弹性队列”需要权衡的地方。

        2.2 容器调度的配置

        假设我们有如下层次的队列:

        root

        ├── prod

        └── dev

        ├── eng

        └── science

        下面是一个简单的Capacity调度器的配置文件,文件名为capacity-scheduler.xml。在这个配置中,在root队列下面定义了两个子队列prod和dev,分别占%和%的容量。需要注意,一个队列的配置是通过属性yarn.sheduler.capacity..指定的,代表的是队列的继承树,如root.prod队列,一般指capacity和maximum-capacity。

        我们可以看到,dev队列又被分成了eng和science两个相同容量的子队列。dev的maximum-capacity属性被设置成了%,所以即使prod队列完全空闲dev也不会占用全部集群资源,也就是说,prod队列仍有%的可用资源用来应急。我们注意到,eng和science两个队列没有设置maximum-capacity属性,也就是说eng或science队列中的job可能会用到整个dev队列的所有资源(最多为集群的%)。而类似的,prod由于没有设置maximum-capacity属性,它有可能会占用集群全部资源。

        Capacity容器除了可以配置队列及其容量外,我们还可以配置一个用户或应用可以分配的最大资源数量、可以同时运行多少应用、队列的ACL认证等。

        2.3 队列的设置

        关于队列的设置,这取决于我们具体的应用。比如,在MapReduce中,我们可以通过mapreduce.job.queuename属性指定要用的队列。如果队列不存在,我们在提交任务时就会收到错误。如果我们没有定义任何队列,所有的应用将会放在一个default队列中。

        注意:对于Capacity调度器,我们的队列名必须是队列树中的最后一部分,如果我们使用队列树则不会被识别。比如,在上面配置中,我们使用prod和eng作为队列名是可以的,但是如果我们用root.dev.eng或者dev.eng是无效的。

        三、Fair Scheduler(公平调度器)的配置

        3.1 公平调度

        Fair调度器的设计目标是为所有的应用分配公平的资源(对公平的定义可以通过参数来设置)。在上面的 “Yarn调度器对比图” 展示了一个队列中两个应用的公平调度;当然,公平调度在也可以在多个队列间工作。举个例子,假设有两个用户A和B,他们分别拥有一个队列。当A启动一个job而B没有任务时,A会获得全部集群资源;当B启动一个job后,A的job会继续运行,不过一会儿之后两个任务会各自获得一半的集群资源。如果此时B再启动第二个job并且其它job还在运行,则它将会和B的第一个job共享B这个队列的资源,也就是B的两个job会用于四分之一的集群资源,而A的job仍然用于集群一半的资源,结果就是资源最终在两个用户之间平等的共享。过程如下图所示:

        3.2 启用Fair Scheduler

        调度器的使用是通过yarn-site.xml配置文件中的yarn.resourcemanager.scheduler.class参数进行配置的,默认采用Capacity Scheduler调度器。如果我们要使用Fair调度器,需要在这个参数上配置FairScheduler类的全限定名:org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler。

        3.3 队列的配置

        Fair调度器的配置文件位于类路径下的fair-scheduler.xml文件中,这个路径可以通过yarn.scheduler.fair.allocation.file属性进行修改。若没有这个配置文件,Fair调度器采用的分配策略,这个策略和3.1节介绍的类似:调度器会在用户提交第一个应用时为其自动创建一个队列,队列的名字就是用户名,所有的应用都会被分配到相应的用户队列中。

        我们可以在配置文件中配置每一个队列,并且可以像Capacity 调度器一样分层次配置队列。比如,参考capacity-scheduler.xml来配置fair-scheduler:

        队列的层次是通过嵌套元素实现的。所有的队列都是root队列的孩子,即使我们没有配到元素里。在这个配置中,我们把dev队列有分成了eng和science两个队列。

        Fair调度器中的队列有一个权重属性(这个权重就是对公平的定义),并把这个属性作为公平调度的依据。在这个例子中,当调度器分配集群:资源给prod和dev时便视作公平,eng和science队列没有定义权重,则会被平均分配。这里的权重并不是百分比,我们把上面的和分别替换成2和3,效果也是一样的。注意,对于在没有配置文件时按用户自动创建的队列,它们仍有权重并且权重值为1。

        每个队列内部仍可以有不同的调度策略。队列的默认调度策略可以通过顶级元素进行配置,如果没有配置,默认采用公平调度。

        尽管是Fair调度器,其仍支持在队列级别进行FIFO调度。每个队列的调度策略可以被其内部的元素覆盖,在上面这个例子中,prod队列就被指定采用FIFO进行调度,所以,对于提交到prod队列的任务就可以按照FIFO规则顺序的执行了。需要注意,prod和dev之间的调度仍然是公平调度,同样eng和science也是公平调度。

        尽管上面的配置中没有展示,每个队列仍可配置最大、最小资源占用数和最大可运行的应用的数量。

        3.4 队列的设置

        Fair调度器采用了一套基于规则的系统来确定应用应该放到哪个队列。在上面的例子中,元素定义了一个规则列表,其中的每个规则会被逐个尝试直到匹配成功。例如,上例第一个规则specified,则会把应用放到它指定的队列中,若这个应用没有指定队列名或队列名不存在,则说明不匹配这个规则,然后尝试下一个规则。primaryGroup规则会尝试把应用放在以 用户所在的Unix组名 命名的队列中,如果没有这个队列,不创建队列转而尝试下一个规则。当前面所有规则不满足时,则触发default规则,把应用放在dev.eng队列中。

        当然,我们可以不配置queuePlacementPolicy规则,调度器则默认采用如下规则:

       ä¸Šé¢è§„则可以归结成一句话,除非队列被准确的定义,否则会以用户名为队列名创建队列。

        还有一个简单的配置策略可以使得所有的应用放入同一个队列(default),这样就可以让所有应用之间平等共享集群而不是在用户之间。这个配置的定义如下:

       å®žçŽ°ä¸Šé¢åŠŸèƒ½æˆ‘们还可以不使用配置文件,直接设置yarn.scheduler.fair.user-as-default-queue=false,这样应用便会被放入default 队列,而不是各个用户名队列。另外,我们还可以设置yarn.scheduler.fair.allow-undeclared-pools=false,这样用户就无法创建队列了。

        3.5 抢占(Preemption)

        当一个job提交到一个繁忙集群中的空队列时,job并不会马上执行,而是阻塞直到正在运行的job释放系统资源。为了使提交job的执行时间更具预测性(可以设置等待的超时时间),Fair调度器支持抢占。

        抢占就是允许调度器杀掉占用超过其应占份额资源队列的containers,这些containers资源便可被分配到应该享有这些份额资源的队列中。需要注意抢占会降低集群的执行效率,因为被终止的containers需要被重新执行。

        可以通过设置一个全局的参数yarn.scheduler.fair.preemption=true来启用抢占功能。此外,还有两个参数用来控制抢占的过期时间(这两个参数默认没有配置,需要至少配置一个来允许抢占Container):

        - minimum share preemption timeout

        - fair share preemption timeout

        如果队列在minimum share preemption timeout指定的时间内未获得最小的资源保障,调度器就会抢占containers。我们可以通过配置文件中的顶级元素为所有队列配置这个超时时间;我们还可以在元素内配置元素来为某个队列指定超时时间。

        与之类似,如果队列在fair share preemption timeout指定时间内未获得平等的资源的一半(这个比例可以配置),调度器则会进行抢占containers。这个超时时间可以通过顶级元素和元素级元素分别配置所有队列和某个队列的超时时间。上面提到的比例可以通过(配置所有队列)和(配置某个队列)进行配置,默认是0.5。

数据底座-Rundeck任务调度系统使用手册

       在进行项目管理和自动化任务调度时,Rundeck 是一个强大且灵活的工具。它允许用户根据业务范围划分项目,通过创建 Job 来实现自动化任务的执行。

       具体而言,创建一个项目时,feign调用源码需要考虑业务范围,将相似或相关的任务归类,以提高管理效率和执行效率。创建 Job 时,应详细定义任务的参数、执行逻辑、依赖关系等关键信息,确保任务能够准确、高效地执行。

       在配置 Spark 相关任务时,首先在指定路径下创建一个 shell 脚本,例如:/opt/maintain/scripts/bms/income-detail/BmsIncomeDetailIceberg.sh。此脚本应包含执行 Spark 任务所需的命令和参数,确保任务能够正确启动并执行。

       为便于复用和管理,可以创建 Spark submit 模板。这个模板应包含通用的 Spark 配置、任务执行路径和所需的环境变量等信息。通过使用模板,可以快速配置和执行 Spark 任务,无需每次都从零开始。

       在配置 Apache Druid 时,关键在于构建合适的表结构和提供正确的执行脚本。json 表结构文件(例如:druid bms-income-detail.json)和脚本文件(druidbms-income-detail.sh)应放在指定的目录下,如 /opt/maintain/druid/bms/income-detail,以确保 Druid 能够正确解析和执行。

       针对 Kerberos 小时过期问题,解决方法在于调整 Kerberos 的相关参数。通过修改 Kerberos 凭证的生命周期设置,可以实现凭证的自动刷新,从而解决过期问题。具体参数调整步骤和策略需根据实际环境和安全策略进行。

       总之,Rundeck 提供了丰富的功能和配置选项,使其成为执行自动化任务和项目管理的理想选择。通过合理利用 Rundeck 的特性,可以极大地提高工作效率和任务执行的可靠性。

Job的基本概念和实现原理

       在Spark任务调度的框架中,Job扮演着核心角色。它在RDD执行Action操作时生成,是任务计算的顶层单元。Job主要分为两种类型:Result Job和Map-Stage Job。Result Job负责Action操作的执行和相关数据计算,而Map-Stage Job则在SQL自适应查询计划中发挥作用,预估map操作的输出以优化后续Stage的调度。

       Job的创建始于RDD的Action操作,通过SparkContext的runJob函数调用DAGScheduler的runJob,以ActiveJob类实现。Job的创建过程包括生成jobId(递增整数)和finalStage(区分Job类型的标识),如ResultStage或ShuffleMapStage。每个Job在划分成多个Stage后,按shuffle依赖进行任务调度。

       当Job执行完毕,JobWaiter在DAGScheduler中监控状态,一旦任务完成,状态信息会被保存并传递给Driver。通过JobId,可以识别和调度不同任务,FIFO调度算法会优先考虑JobId较小的任务。

       ActiveJob类是Job的实现,其内部包含了jobId、finalStage等关键信息,如已完成分区数,这决定了Job是否完成。Job的完成状态是通过依赖Stage的执行状态来判断的,因为Stage的TaskSet提交会确保其依赖的Stage已完成。

       总的来说,本文深入剖析了Job的创建原理、状态获取机制以及其核心实现类ActiveJob的工作原理。

相关栏目:焦点

.重点关注