国产精品免费嫩草研究院|无遮羞动漫在线观看AV|国产麻豆精品传媒AV国产在线|村在线观看|寂寞情人1正版|韩国床震韩国床震古|精品系列专区久久

我的Spark學習筆記

一、架構(gòu)設(shè)計

我的Spark學習筆記

文章插圖
  • Driver根據(jù)用戶代碼構(gòu)建計算流圖,拆解出分布式任務(wù)并分發(fā)到 Executors 中去;每個Executors收到任務(wù),然后處理這個 RDD 的一個數(shù)據(jù)分片子集
  • DAGScheduler根據(jù)用戶代碼構(gòu)建 DAG;以 Shuffle 為邊界切割 Stages;基于 Stages 創(chuàng)建 TaskSets,并將 TaskSets 提交給 TaskScheduler 請求調(diào)度
  • TaskScheduler 在初始化的過程中,會創(chuàng)建任務(wù)調(diào)度隊列,任務(wù)調(diào)度隊列用于緩存 DAGScheduler 提交的 TaskSets 。TaskScheduler 結(jié)合 SchedulerBackend 提供的 WorkerOffer,按照預先設(shè)置的調(diào)度策略依次對隊列中的任務(wù)進行調(diào)度,也就是把任務(wù)分發(fā)給SchedulerBackend
  • SchedulerBackend 用一個叫做 ExecutorDataMap 的數(shù)據(jù)結(jié)構(gòu),來記錄每一個計算節(jié)點中 Executors 的資源狀態(tài) 。會與集群內(nèi)所有 Executors 中的 ExecutorBackend 保持周期性通信 。SchedulerBackend收到TaskScheduler過來的任務(wù),會把任務(wù)分發(fā)給ExecutorBackend去具體執(zhí)行
  • ExecutorBackend收到任務(wù)后多線程執(zhí)行(一個線程處理一個Task) 。處理完畢后反饋StatusUpdate給SchedulerBackend,再返回給TaskScheduler,最終給DAGScheduler

我的Spark學習筆記

文章插圖
二、常用算子2.1、RDD概念Spark 主要以一個 彈性分布式數(shù)據(jù)集_(RDD)的概念為中心,它是一個容錯且可以執(zhí)行并行操作的元素的集合 。有兩種方法可以創(chuàng)建 RDD:在你的 driver program(驅(qū)動程序)中 _parallelizing 一個已存在的集合,或者在外部存儲系統(tǒng)中引用一個數(shù)據(jù)集,例如,一個共享文件系統(tǒng),HDFS,HBase,或者提供 Hadoop InputFormat 的任何數(shù)據(jù)源 。
從內(nèi)存創(chuàng)建RDDimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// 從內(nèi)存創(chuàng)建RDDobject MakeRDDFromMemory {def main(args: Array[String]): Unit = {// 準備環(huán)境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")// 并行度,如果不設(shè)置則默認當前運行環(huán)境的最大可用核數(shù)sparkConf.set("spark.default.parallelism", "2")val sc = new SparkContext(sparkConf)// 從內(nèi)存中創(chuàng)建RDD,將內(nèi)存中集合的數(shù)據(jù)作為處理的數(shù)據(jù)源val seq = Seq[Int](1, 2, 3, 4, 5, 6)val rdd: RDD[Int] = sc.makeRDD(seq)rdd.collect().foreach(println)// numSlices表示分區(qū)的數(shù)量,不傳默認spark.default.parallelismval rdd2: RDD[Int] = sc.makeRDD(seq, 3)// 將處理的數(shù)據(jù)保存成分區(qū)文件rdd2.saveAsTextFile("output")sc.stop()}}從文件中創(chuàng)建RDDimport org.apache.spark.{SparkConf, SparkContext}// 從文件中創(chuàng)建RDD(本地文件、HDFS文件)object MakeRDDFromTextFile {def main(args: Array[String]): Unit = {// 準備環(huán)境val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")val sc = new SparkContext(sparkConf)// 從文件中創(chuàng)建RDD,將文件中的數(shù)據(jù)作為處理的數(shù)據(jù)源// path路徑默認以當前環(huán)境的根路徑為基準 。可以寫絕對路徑,也可以寫相對路徑//val rdd: RDD[String] = sc.textFile("datas/1.txt")// path路徑可以是文件的具體路徑,也可以目錄名稱//val rdd = sc.textFile("datas")// path路徑還可以使用通配符 *//val rdd = sc.textFile("datas/1*.txt")// path還可以是分布式存儲系統(tǒng)路徑:HDFSval rdd = sc.textFile("hdfs://localhost:8020/test.txt")rdd.collect().foreach(println)sc.stop()}}2.2、常用算子map算子:數(shù)據(jù)轉(zhuǎn)換import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}// map算子object map {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4))// 轉(zhuǎn)換函數(shù)def mapFunction(num: Int): Int = {num * 2}// 多種方式如下//val mapRDD: RDD[Int] = rdd.map(mapFunction)//val mapRDD: RDD[Int] = rdd.map((num: Int) => {//num * 2//})//val mapRDD: RDD[Int] = rdd.map((num: Int) => num * 2)//val mapRDD: RDD[Int] = rdd.map((num) => num * 2)//val mapRDD: RDD[Int] = rdd.map(num => num * 2)val mapRDD: RDD[Int] = rdd.map(_ * 2)mapRDD.collect().foreach(println)sc.stop()}}

經(jīng)驗總結(jié)擴展閱讀