點擊關註上方「知了小巷」,
設為「置頂或星標」,第一時間送達乾貨。
Spark Core基礎面試題總結(上)
1. Spark的幾種部署模式及其特點SparkSubmit#prepareSubmitEnvironment
// Set the cluster manager
val clusterManager: Int = args.master match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
}Spark不一定非要跑在Hadoop集群(主要是YARN),可以在本地,起多個線程的方式來指定。將Spark應用以多線程的方式直接運行在本地,一般都是為了方便調試,本地模式分為三類:
local[*] 啟動和CPU數目相同的Executor分布式部署集群,自帶完整的服務,資源管理和任務監控是Spark自己監控,這個模式也是其他模式的基礎。
分布式部署集群,資源和任務監控交給YARN管理,Spark客戶端直接連接YARN,不需要額外構建Spark集群。有yarn-client和yarn-cluster兩種模式,主要區別在於:Driver程序的運行節點。
cluster適合生產,Driver運行在集群子節點,具有容錯能力client適合調試,Driver運行在客戶端節點最新文檔:
http://spark.apache.org/docs/latest/running-on-kubernetes.htmlSpark 運行在 Kubernetes 集群上的第一種可行方式是將 Spark 以 Standalone 模式運行,但是很快社區就提出使用 Kubernetes 原生 Scheduler 的運行模式,也就是 Native 的模式。
Native 模式簡而言之就是將 Driver 和 Executor Pod 化,用戶將之前向 YARN 提交 Spark 作業的方式提交給 Kubernetes 的 apiserver,提交命令如下:
$ bin/spark-submit \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=<spark-image> \
local:///path/to/examples.jar其中 master 就是 kubernetes 的 apiserver 地址。提交之後整個作業的運行方式如下,先將 Driver 通過 Pod 啟動起來,然後 Driver 會啟動 Executor 的 Pod。
Google 雲平臺,也就是 GCP 在 github 上面開源了 Spark 的 Operator,repo 地址:
2. Driver端程序的功能是什麼?
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
這裡有詳細的使用文檔:
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/quick-start-guide.md一個Spark作業運行時包括一個Driver進程,也是作業的主進程,具有main函數,並且有SparkContext的實例,是程序的入口點。
3. Hadoop MapReduce和Spark都是並行計算,那麼他們有什麼相同點和區別?
功能:負責向集群申請資源,向Master註冊信息,負責作業的調度,負責作業的解析、生成Stage並調度Task到Executor上。包括DAGScheduler和TaskScheduler。兩者都是用MR模型進行並行計算,Hadoop的一個作業稱為Job(在YARN上也是Application),Job裡面分為MapTask和ReduceTask,每個Task都是在自己的進程中進行的,當Task結束時,進程也會結束。
4. Spark中的RDD
Spark用戶提交的任務被稱為Application,一個Application對應一個SparkContext,App中存在多個Job,沒觸發一次Action操作就會產生一個Job。這些Job可以並行或串行執行,每個Job中有多個Stage,Stage是Shuffle過程中DAGScheduler通過RDD之間的依賴關係劃分Job而來的,每個Stage裡面有多個Task,組成TaskSet由TaskScheduler分發到各個Executor中執行,Executor的生命周期是和App一樣的,即使沒有Job運行也是存在的,所以Task可以快速啟動讀取內存進行計算,Spark的迭代計算都是在內存中進行的,API中提供了大量的RDD操作如join,group by等,而且通過DAG圖可以實現良好的容錯。
Hadoop的job只有map和reduce操作,表達能力比較欠缺而且在MR過程中會重複的讀寫HDFS,造成大量IO操作,多個Job需要自己管理依賴關係。RDD:Resilient Distributed DataSet,彈性分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、裡面的元素可並行計算的集合。
/**
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
* partitioned collection of elements that can be operated on in parallel. This class contains the
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
* [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
* pairs, such as `groupByKey` and `join`;
* [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
* Doubles; and
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
* can be saved as SequenceFiles.
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)])
* through implicit.
*
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
*
* All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
* reading data from a new storage system) by overriding these functions. Please refer to the
* <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
* for more details on RDD internals.
*/
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {...}RDD五大特性:
A list of partitions 一個分區列表,RDD中的數據都存在一個分區列表裡面A function for computing each split 作用在每一個分區中的函數A list of dependencies on other RDDs 一個RDD依賴於其他多個RDD(RDD容錯機制)Optionally, a Partitioner for key-value RDDs KV類型的RDDOptionally, a list of preferred locations to compute each split on 數據本地性,最近的數據位置5. 寬依賴和窄依賴,groupByKey、reduceByKey、map、filter、union五種算子的寬窄依賴窄依賴
父RDD的每一個分區最多被一個子RDD的分區所使用,表現為一個父RDD的分區對應於一個子RDD的分區,和兩個父RDD的分區對應於一個子RDD的分區。map和filter、union屬於第一類,對輸入進行協同劃分(co-partitioned)的join屬於第二類。寬依賴
子RDD的分區依賴於父RDD的所有分區,Shuffle類操作的結果。Shuffle的本質就是group by,把相同類型或相同規則的數據放在一起(磁碟或網絡IO進行分類)。
算子的寬窄依賴
對RDD進行map、filter、union等transformation一般是窄依賴。
寬依賴一般是對RDD進行groupByKey、reduceByKey等操作,就是對RDD中的partition中的數據進行重分區(Shuffle)。
join操作既可能是寬依賴,也可能是窄依賴,當要對RDD進行join操作時,如果RDD進行過重分區則為窄依賴,否則為寬依賴。6. Spark如何防止內存溢出?可以增加Driver的內存參數:
# 默認1G
spark.driver.memory這個參數用來設置Driver端的內存。在Spark程序中,SparkContext、DAGScheduler都是運行在Driver端的。對應RDD的Stage切分也是在Driver端運行,如果用戶自己寫的程序有過多的步驟,切分出過多的Stage,這部分信息消耗的是Driver的內存,這個時候就需要調大Driver的內存。
這種溢出的原因是在單個map中產生了大量的對象導致的,比如:rdd.map(x => for(i <-1 to 10000) yield i.toString),這個操作在RDD中,每個對象都產生了10000個對象,這肯定很容易產生內存溢出的問題。針對這種問題,在不增加內存的情況下,可以通過減少每個Task的大小,以便達到每個Task即使產生大量的對象Executor的內存也能夠裝得下。具體做法可以在會產生大量對象的map操作之前調用repartition方法,分區成更小的塊傳入map,例如:
rdd.repartition(10000).map(x => for(i <-1 to 10000) yield i.toString)注意,不能使用rdd.coalesce,這個方法只能減少分區,不能增加分區,不會有Shuffle的過程。
數據不均衡導致內存溢出
數據不均衡,除了有可能導致內存溢出外,也有可能導致性能問題,解決方法和上面類似,就是調用repartition重新分區。Shuffle後內存溢出
Shuffle內存溢出的情況可以說基本上都是Shuffle後,單個文件過大導致的。在Spark中,join、reduceByKey這一類的操作,都會有Shuffle的過程,在Shuffle的時候,需要傳入一個分區器Partitioner,大部分Spark中的Shuffle操作,默認的Partitioner都是HashPartitioner,默認值是父RDD中最大的分區數,這個參數通過# 只對HashPartitioner有效
spark.default.parallelism
# Spark SQL使用下面參數
spark.sql.shuffle.partitions控制。
如果是別的Partitioner導致的Shuffle內存溢出,就需要從Partitioner的代碼增加partitions數量。
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()內存不足時,rdd.cache會丟失數據,再次使用的時候會重算,StorageLevel.MEMORY_AND_DISK_SER內存不足時會存儲在磁碟,避免重新計算,只是會消耗一點IO時間。
7. Stage、Task和Job的區別與劃分方式Job:一個由多個任務組成的並行計算,當需要執行一個RDD的Action操作的時候,會生成一個JobStage:每個Job被拆分成更小的被稱作Stage(階段)的Task(任務)組,Stage彼此之間是相互依賴的,各個Stage會按照執行順序依次執行(Pipeline)Task:一個將要發送到Executor中的工作單元。是Stage的一個任務執行單元,一般來說,一個RDD有多少個Partition,就會有多少個Task,因為每一個Task只是處理一個Partition上的數據。Stage
private[scheduler] abstract class Stage(
val id: Int,
val rdd: RDD[_],
val numTasks: Int,
val parents: List[Stage],
val firstJobId: Int,
val callSite: CallSite,
val resourceProfileId: Int)
extends Logging {
val numPartitions = rdd.partitions.length
/** Set of jobs that this stage belongs to. */
val jobIds = new HashSet[Int]
/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0
val name: String = callSite.shortForm
val details: String = callSite.longForm
/**
* Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
* StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
* have been created).
*/
private var _latestInfo: StageInfo =
StageInfo.fromStage(this, nextAttemptId, resourceProfileId = resourceProfileId)
/**
* Set of stage attempt IDs that have failed. We keep track of these failures in order to avoid
* endless retries if a stage keeps failing.
* We keep track of each attempt ID that has failed to avoid recording duplicate failures if
* multiple tasks from the same stage attempt fail (SPARK-5945).
*/
val failedAttemptIds = new HashSet[Int]
private[scheduler] def clearFailures() : Unit = {
failedAttemptIds.clear()
}
/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
def makeNewStageAttempt(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
val metrics = new TaskMetrics
metrics.register(rdd.sparkContext)
_latestInfo = StageInfo.fromStage(
this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences,
resourceProfileId = resourceProfileId)
nextAttemptId += 1
}
/** Returns the StageInfo for the most recent attempt for this stage. */
def latestInfo: StageInfo = _latestInfo
override final def hashCode(): Int = id
override final def equals(other: Any): Boolean = other match {
case stage: Stage => stage != null && stage.id == id
case _ => false
}
/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
def findMissingPartitions(): Seq[Int]
def isIndeterminate: Boolean = {
rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
}
}Task
private[spark] abstract class Task[T](
val stageId: Int,
val stageAttemptId: Int,
val partitionId: Int,
@transient var localProperties: Properties = new Properties,
// The default value is only used in tests.
serializedTaskMetrics: Array[Byte] =
SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(),
val jobId: Option[Int] = None,
val appId: Option[String] = None,
val appAttemptId: Option[String] = None,
val isBarrier: Boolean = false) extends Serializable {
@transient lazy val metrics: TaskMetrics =
SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(serializedTaskMetrics))
/**
* Called by [[org.apache.spark.executor.Executor]] to run this task.
*
* @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
* @param attemptNumber how many times this task has been attempted (0 for the first attempt)
* @param resources other host resources (like gpus) that this task attempt can access
* @return the result of the task along with updates of Accumulators.
*/
final def run(
taskAttemptId: Long,
attemptNumber: Int,
metricsSystem: MetricsSystem,
resources: Map[String, ResourceInformation]): T = {
SparkEnv.get.blockManager.registerTask(taskAttemptId)
...
}
...
}TaskSet
/**
* A set of tasks submitted together to the low-level TaskScheduler, usually representing
* missing partitions of a particular stage.
*/
private[spark] class TaskSet(
val tasks: Array[Task[_]],
val stageId: Int,
val stageAttemptId: Int,
val priority: Int,
val properties: Properties,
val resourceProfileId: Int) {
val id: String = stageId + "." + stageAttemptId
override def toString: String = "TaskSet " + id
}
8. Spark提交作業參數源碼在spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
在提交任務時的幾個重要參數:
executor-cores 每個executor使用的內核數,默認為1 建議2-5個;一般可以設置4個Spark standalone, YARN and Kubernetes only:
--executor-cores NUM Number of cores used by each executor. (Default: 1 in YARN and K8S modes, or all available cores on the worker in standalone mode).源碼裡有個測試類:
spark/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala"--executor-cores", "5",
num-executors 啟動executors的數量,默認為2Spark on YARN and Kubernetes only:
--num-executors NUM Number of executors to launch (Default: 2).
If dynamic allocation is enabled, the initial number of executors will be at least NUM.
executor-memory executor的內存大小,默認1GB--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
driver-cores Driver端使用的內核數,默認為1Cluster deploy mode only:
--driver-cores NUM Number of cores used by the driver, only in cluster mode
(Default: 1).
driver-memory Driver端使用的內存大小,默認512MB--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: ${mem_mb}M).例如提交任務On YARN:
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-cores 2 \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 2 \
--num-executors 2 \
--queue thequeue \
examples/jars/spark-examples*.jar \
10
9. Spark中reduceByKey VS groupByKey區別與用法reduceByKey用於對每個Key對應的多個Value進行merge操作,最重要的是它能夠在本地先進行merge操作,並且merge操作可以通過函數自定義。
JavaPairRDD#reduceByKey#similarly to a "combiner" in MapReduce./**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.reduceByKey(partitioner, func))groupByKey也是對每個Key對應的多個Value進行操作,但是只是匯總生成一個Sequence,本身不能自定義函數,只能通過額外的map(func)來實現。
/**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*
* @note If you are grouping in order to perform an aggregation (such as a sum or average) over
* each key, using `JavaPairRDD.reduceByKey` or `JavaPairRDD.combineByKey`
* will provide much better performance.
*/
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
*
* @note If you are grouping in order to perform an aggregation (such as a sum or average) over
* each key, using `JavaPairRDD.reduceByKey` or `JavaPairRDD.combineByKey`
* will provide much better performance.
*/
def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))在大的數據集上,reduceByKey(func)的效果比groupByKey()的效果更好一些。因為reduceByKey()會在Shuffle之前對數據進行合併,傳輸速度優於groupByKey(網絡IO)。
10. foreach和map的區別先看源碼,foreach是RDD中Actions裡的第一個方法:
Actions (launch a job to return a value to the user program)// Actions (launch a job to return a value to the user program)
/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}map則是RDD中Transformations裡的第一個方法:
Transformations (return a new RDD)// Transformations (return a new RDD)
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}兩個方法的共同點:都是用於遍歷集合對象,並對每一項執行指定的方法。
foreach沒有返回值(準確的說是返回void),map返回集合對象。foreach用於遍歷集合,而map用於映射(轉換)集合到另一個集合。foreach中的處理邏輯是串行的,map中的處理邏輯是並行的。map是Transformation算子,foreach是Action算子。11. map與mapPartitions的區別
兩者的差異:mapPartitions:
/**
* Return a new RDD by applying a function to each partition of this RDD.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}相同點:map與mapPartitions都屬於Transformation算子
mapPartitions則是對RDD中的每個分區的迭代器進行操作map操作性能低下。比如一個partition中有一萬條數據,那麼在分析每個分區時,function要執行和計算1萬次。mapPartitions性能高。使用mapPartitions操作之後,一個Task僅僅會執行一次function,function一次接收所有的partition數據。只要執行一次就可以了,性能比較高。RDD中的每個分區數據量超大的情形,比如一個Partition有100萬條數據。mapPartitions一次傳入一個function後,可能一下子內存不夠用,造成OOM(內存溢出)。12. foreach和foreachPartition的區別
區別:相同:foreach和foreachPartition都屬於Action算子
foreachPartition每次處理RDD中每個分區的迭代器中的數據
區別:/**
* Applies a function f to each partition of this RDD.
*/
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}
13. groupByKey、reduceByKey、combineByKey的區別上面第9已經對Spark中reduceByKey VS groupByKey區別與用法做了說明。
用於對每個Key進行操作,將結果生成一個Sequence用於對每個Key對應的多個Value進行merge操作reduceByKey底層就是使用了combineByKey,準確一點是combineByKeyWithClassTag14. sortByKey這個算子是全局排序嗎?sortByKey是全局排序。RDD#sortBy
/**
* Return this RDD sorted by the given key function.
*/
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
}OrderedRDDFunctions#sortByKey
/**
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
* order of the keys).
*/
// TODO: this currently doesn't work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
在sortByKey之前將數據使用Partitioner根據數據範圍來分(keyBy)。使得p1分區所有的數據小於p2,p2分區所有的數據小於p3,依次類推。(p1~pn是分區標識)。然後,使用sortByKey算子針對每一個Partition進行排序,這樣全局的數據就被排序了。15. Spark中coalesce VS repartition先看源碼:
coalesce shuffle: Boolean = false/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions. If a larger number
* of partitions is requested, it will stay at the current number of partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* @note With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner. The optional partition coalescer
* passed in must be serializable.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}repartition shuffle = true
/**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}通常認為coalesce不產生Shuffle會比repartition產生Shuffle效率高,而實際情況往往要根據具體問題具體分析,coalesce效率不一定高,有時還可能有大坑,所以還是要慎用。
如果N<M。一般情況下N個分區有數據分布不均勻的狀況,利用HashPartitioner函數將數據重新分區為M個,這時需要將shuffle設置為true。如果N>M,並且N和M相差不多,(假如N是1000,M是100),那麼就可以將N個分區中的若干個分區合併成一個新的分區,最終合併為M個分區,這時可以將shuffle設置為false,如果M>N時,coalesce是無效的,不進行Shuffle過程,父RDD和子RDD之間是窄依賴關係,無法使文件數partitions變多。總之如果shuffle為false時,傳入的參數大於現有的分區數目,RDD的分區數將保持不變。也就是說不經過Shuffle,是無法將RDD的分區數變多的。如果N>M,並且N和M相差很大很大,這是要看executor數量與要生成的partition的關係。如果executor數 <= 要生成的partition數,coalesce效率高,反之如果用coalesce可能會導致(executor數 - 要生成的partition數)個executor空跑從而降低效率。如果在M為1的時候,為了使得coalesce之前的操作有更好的並行度,可以將shuffle設置為true。
兩個算子都是對RDD的分區進行重新劃分,repartition調用了coalesce,把默認為false的shuffle參數置為了true。
假設有一個RDD有N個分區,需要重新劃分為M個分區: