1.为ä»ä¹Sparkåå±ä¸å¦Hadoop
2.关于笛卡尔积CartesianProduct
3.有什么关于 Spark 的码分书推荐?
4.Spark Core读取ES的分区问题分析
为ä»ä¹Sparkåå±ä¸å¦Hadoop
Sparkæ¯ä¸ä¸ªåºäºRAM计ç®çå¼æºç ComputerClusterè¿ç®ç³»ç»ï¼ç®çæ¯æ´å¿«éå°è¿è¡æ°æ®åæãSparkæ©æçæ ¸å¿é¨å代ç åªæ3ä¸è¡ãSparkæä¾äºä¸HadoopMap/Reduceç¸ä¼¼çåæ£å¼è¿ç®æ¡æ¶ï¼ä½åºäºRAMåä¼å设计ï¼å æ¤å¨äº¤æ¢å¼æ°æ®åæådataminingçWorkloadä¸è¡¨ç°ä¸éã
è¿å ¥å¹´ä»¥åï¼Sparkå¼æºç çæç³»ç»å¤§å¹ å¢é¿ï¼å·²æ为大æ°æ®èç´ææ´»è·çå¼æºç 项ç®ä¹ä¸ãSparkä¹æ以æå¦æ¤å¤çå ³æ³¨ï¼åå 主è¦æ¯å 为Sparkå ·æçé«æ§è½ãé«çµæ´»æ§ãä¸Hadoopçæç³»ç»å®ç¾èåçä¸æ¹é¢çç¹ç¹ã
é¦å ï¼Spark对åæ£çæ°æ®éè¿è¡æ½æ ·ï¼åæ°å°æåºRDD(ResilientDistributedDataset)çæ¦å¿µï¼ææçç»è®¡åæä»»å¡è¢«ç¿»è¯æ对RDDçåºæ¬æä½ç»æçæåæ ç¯å¾(DAG)ãRDDå¯ä»¥è¢«é©»çå¨RAMä¸ï¼å¾åçä»»å¡å¯ä»¥ç´æ¥è¯»åRAMä¸çæ°æ®;åæ¶åæDAGä¸ä»»å¡ä¹é´çä¾èµæ§å¯ä»¥æç¸é»çä»»å¡å并ï¼ä»èåå°äºå¤§éä¸åç¡®çç»æè¾åºï¼æ大åå°äºHarddiskI/Oï¼ä½¿å¤ææ°æ®åæä»»å¡æ´é«æãä»è¿ä¸ªæ¨ç®ï¼å¦æä»»å¡å¤å¤æï¼Sparkæ¯Map/Reduceå¿«ä¸å°ä¸¤åã
å ¶æ¬¡ï¼Sparkæ¯ä¸ä¸ªçµæ´»çè¿ç®æ¡æ¶ï¼éååæ¹æ¬¡å¤çãå·¥ä½æµã交äºå¼åæãæµéå¤ççä¸åç±»åçåºç¨ï¼å æ¤Sparkä¹å¯ä»¥æ为ä¸ä¸ªç¨é广æ³çè¿ç®å¼æï¼å¹¶å¨æªæ¥å代Map/Reduceçå°ä½ã
æåï¼Sparkå¯ä»¥ä¸Hadoopçæç³»ç»çå¾å¤ç»ä»¶äºç¸æä½ãSparkå¯ä»¥è¿è¡å¨æ°ä¸ä»£èµæºç®¡çæ¡æ¶YARNä¸ï¼å®è¿å¯ä»¥è¯»åå·²æ并åæ¾å¨Hadoopä¸çæ°æ®ï¼è¿æ¯ä¸ªé常大çä¼å¿ã
è½ç¶Sparkå ·æ以ä¸ä¸å¤§ä¼ç¹ï¼ä½ä»ç®åSparkçåå±ååºç¨ç°ç¶æ¥çï¼Sparkæ¬èº«ä¹åå¨å¾å¤ç¼ºé·ï¼ä¸»è¦å æ¬ä»¥ä¸å 个æ¹é¢ï¼
â稳å®æ§æ¹é¢ï¼ç±äºä»£ç è´¨éé®é¢ï¼Sparké¿æ¶é´è¿è¡ä¼ç»å¸¸åºéï¼å¨æ¶ææ¹é¢ï¼ç±äºå¤§éæ°æ®è¢«ç¼åå¨RAMä¸ï¼Javaåæ¶åå¾ç¼æ ¢çæ åµä¸¥éï¼å¯¼è´Sparkæ§è½ä¸ç¨³å®ï¼å¨å¤æåºæ¯ä¸SQLçæ§è½çè³ä¸å¦ç°æçMap/Reduceã
âä¸è½å¤ç大æ°æ®ï¼åç¬æºå¨å¤çæ°æ®è¿å¤§ï¼æè ç±äºæ°æ®åºç°é®é¢å¯¼è´ä¸é´ç»æè¶ è¿RAMç大å°æ¶ï¼å¸¸å¸¸åºç°RAM空é´ä¸è¶³ææ æ³å¾åºç»æãç¶èï¼Map/Reduceè¿ç®æ¡æ¶å¯ä»¥å¤ç大æ°æ®ï¼å¨è¿æ¹é¢ï¼Sparkä¸å¦Map/Reduceè¿ç®æ¡æ¶ææã
âä¸è½æ¯æå¤æçSQLç»è®¡;ç®åSparkæ¯æçSQLè¯æ³å®æ´ç¨åº¦è¿ä¸è½åºç¨å¨å¤ææ°æ®åæä¸ãå¨å¯ç®¡çæ§æ¹é¢ï¼SparkYARNçç»åä¸å®åï¼è¿å°±ä¸ºä½¿ç¨è¿ç¨ä¸åä¸é忧ï¼å®¹æåºç°åç§é¾é¢ã
è½ç¶Sparkæ´»è·å¨ClouderaãMapRãHortonworksçä¼å¤ç¥å大æ°æ®å ¬å¸ï¼ä½æ¯å¦æSparkæ¬èº«ç缺é·å¾ä¸å°åæ¶å¤çï¼å°ä¼ä¸¥éå½±åSparkçæ®åååå±ã
关于笛卡尔积CartesianProduct
关于笛卡尔积与shuffle的关系,结论是码分笛卡尔积不会产生shuffle。在分析笛卡尔积的码分源码后,我们发现其运行原理在map端执行,码分并未涉及shuffle过程。码分因此,码分nsis 源码编译从结果中得出,码分笛卡尔积操作不会引发数据重组现象。码分
至于窄依赖的码分定义,网上多数描述存在一定的码分混淆。窄依赖实际上指的码分是子RDD的每个分区依赖于父RDD的部分分区。在理解上,码分可以将窄依赖理解为一个父RDD的码分分区被多个子RDD的分区共享使用,但这些子RDD的码分分区仅依赖于父RDD的特定部分,而非整个分区。码分这种定义在Spark 1.0版本的注释中有所体现,强调了依赖的局部性。
关于join操作是filedisk 源码否一定会产生shuffle,答案并非绝对。在某些特定场景下,如数据量较小、数据分布均匀且内存充足时,join操作可能不会导致数据shuffle。重要的是在实际编程和优化过程中积累经验,学习如何根据不同情况选择合适的join策略,从而提高效率。精读源码是一种有效的技能培养方式,能够帮助深入理解数据处理过程,提升解决问题的能力。
对于希望在职场中脱颖而出的个人,培养自己的优势是关键。无论是通过技术专长、项目管理能力还是团队协作,构建自己的独特竞争力是至关重要的。如果您对此有所兴趣,vcooline 源码欢迎加入硬核源码学习社群(付费)。
社群提供每周六的直播课程,包含历史录屏资源,学员可以随到随学,并且有长期的指导陪伴。如果您对这个社群感兴趣,欢迎了解详情。
有什么关于 Spark 的书推荐?
《大数据Spark企业级实战》本书共包括章,每章的主要内容如下。
第一章回答了为什么大型数据处理平台都要选择SPARK。为什么spark如此之快?星火的理论基础是什么?spark如何使用专门的技术堆栈来解决大规模数据处理的需要?第二章回答了如何从头构建Hadoop集群的问题。如何构建基于Hadoop集群的星火集群?如何测试火星的质量?第三章是如何在一个集成开发环境中开发和运行星火计划。如何开发和测试IDA中的spark代码?
在这4章中,RDD、RDD和spark集成战斗用例API的作用类型将用于实际的战斗RDD。
第四章分析了星火独立模式的设计与实现、星火集群模型和星火客户端模式。ntscan源码
第五章首先介绍了spark core,然后通过对源代码的分析,分析了spark的源代码和源代码,仔细分析了spark工作的整个生命周期,最后分享了spark性能优化的内容。这说明了一步一步的火花的特点是使用了大约个实际案例,并分析了spark GraphX的源代码。
第八章,在星火SQL实践编程实践的基础上,详细介绍了星火SQL的内容。第九章讲了从快速启动机器学习前9章,MLlib的分析框架,基于线性回归、聚类,并解决协同过滤算法,源代码分析和案例启示MLlib一步一步,最后由基本MLlib意味着静态和朴素贝叶斯算法,决策树分析和实践,进一步提高的主要引发机器学习技巧。第十章详细描述了分布式存储文件系统、超轻粒子和超轻粒子的设计、实现、部署和使用。第十一章主要介绍了火花流的softether 源码原理、源代码和实际情况。第十二章介绍了spark多语种编程的特点,并通过实例介绍了spark多语言编程。最后,将一个综合的例子应用到spark多语言编程的实践中。第十三章首先介绍了R语言的基本介绍和实践操作,介绍了使用sparkr和编码的火花,并帮助您快速使用R语言和数据处理能力。在第十四章中,详细介绍了电火花放电的常见问题及其调谐方法。首先介绍了个问题,并对它们的解决方案进行了优化。然后,从内存优化、RDD分区、对象和操作性能优化等方面对常见性能优化问题进行了阐述,最后阐述了火花的最佳实践。附录从spark的角度解释了Scala,并详细解释了Scala函数编程和面向对象编程。
Spark Core读取ES的分区问题分析
撰写本文的初衷是因近期一位星球球友面试时,面试官询问了Spark分析ES数据时,生成的RDD分区数与哪些因素相关。
初步推测,这与分片数有关,但具体关系是什么呢?以下是两种可能的关系:
1).类似于KafkaRDD的分区与kafka topic分区数的关系,一对一。
2).ES支持游标查询,那么是否可以对较大的ES索引分片进行拆分,形成多个RDD分区呢?
下面,我将与大家共同探讨源码,了解具体情况。
1.Spark Core读取ES
ES官网提供了elasticsearch-hadoop插件,对于ES 7.x,hadoop和Spark版本的支持如下:
在此,我使用的ES版本为7.1.1,测试用的Spark版本为2.3.1,没有问题。整合es和spark,导入相关依赖有两种方式:
a,导入整个elasticsearch-hadoop包
b,仅导入spark模块的包
为了方便测试,我在本机启动了一个单节点的ES实例,简单的测试代码如下:
可以看到,Spark Core读取RDD主要有两种形式的API:
a,esRDD。这种返回的是一个tuple2类型的RDD,第一个元素是id,第二个是一个map,包含ES的document元素。
b,esJsonRDD。这种返回的也是一个tuple2类型的RDD,第一个元素依然是id,第二个是json字符串。
尽管这两种RDD的类型不同,但它们都是ScalaEsRDD类型。
要分析Spark Core读取ES的并行度,只需分析ScalaEsRDD的getPartitions函数。
2.源码分析
首先,导入源码github.com/elastic/elasticsearch-hadoop这个gradle工程,可以直接导入idea,然后切换到7.x版本。
接下来,找到ScalaEsRDD,发现getPartitions方法是在其父类中实现的,方法内容如下:
esPartitions是一个lazy型的变量:
这种声明的原因是什么呢?
lazy+transient的原因大家可以思考一下。
RestService.findPartitions方法只是创建客户端获取分片等信息,然后调用,分两种情况调用两个方法:
a).findSlicePartitions
这个方法实际上是在5.x及以后的ES版本,同时配置了
之后,才会执行。实际上就是将ES的分片按照指定大小进行拆分,必然要先进行分片大小统计,然后计算出拆分的分区数,最后生成分区信息。具体代码如下:
实际上,分片就是通过游标方式,对_doc进行排序,然后按照分片计算得到的分区偏移进行数据读取,组装过程是通过SearchRequestBuilder.assemble方法实现的。
这个实际上会浪费一定的性能,如果真的要将ES与Spark结合,建议合理设置分片数。
b).findShardPartitions方法
这个方法没有疑问,一个RDD分区对应于ES index的一个分片。
3.总结
以上就是Spark Core读取ES数据时,分片和RDD分区的对应关系分析。默认情况下,一个ES索引分片对应Spark RDD的一个分区。如果分片数过大,且ES版本在5.x及以上,可以配置参数
进行拆分。