​Spark Core基礎面試題總結(上)

2021-03-02 知了小巷

點擊關註上方「知了小」,

設為「置頂或星標」,第一時間送達乾貨。

Spark Core基礎面試題總結(上)

1. Spark的幾種部署模式及其特點

SparkSubmit#prepareSubmitEnvironment

// Set the cluster manager
val clusterManager: Int = args.master match {
  case "yarn" => YARN
  case m if m.startsWith("spark") => STANDALONE
  case m if m.startsWith("mesos") => MESOS
  case m if m.startsWith("k8s") => KUBERNETES
  case m if m.startsWith("local") => LOCAL
  case _ =>
    error("Master must either be yarn or start with spark, mesos, k8s, or local")
    -1
}

Spark不一定非要跑在Hadoop集群(主要是YARN),可以在本地,起多個線程的方式來指定。將Spark應用以多線程的方式直接運行在本地,一般都是為了方便調試,本地模式分為三類:

local[*] 啟動和CPU數目相同的Executor

分布式部署集群,自帶完整的服務,資源管理和任務監控是Spark自己監控,這個模式也是其他模式的基礎。

分布式部署集群,資源和任務監控交給YARN管理,Spark客戶端直接連接YARN,不需要額外構建Spark集群。有yarn-client和yarn-cluster兩種模式,主要區別在於:Driver程序的運行節點。

cluster適合生產,Driver運行在集群子節點,具有容錯能力client適合調試,Driver運行在客戶端節點

最新文檔:
http://spark.apache.org/docs/latest/running-on-kubernetes.html

Spark 運行在 Kubernetes 集群上的第一種可行方式是將 Spark 以 Standalone 模式運行,但是很快社區就提出使用 Kubernetes 原生 Scheduler 的運行模式,也就是 Native 的模式。

Native 模式簡而言之就是將 Driver 和 Executor Pod 化,用戶將之前向 YARN 提交 Spark 作業的方式提交給 Kubernetes 的 apiserver,提交命令如下:

$ bin/spark-submit \
    --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
    --deploy-mode cluster \
    --name spark-pi \
    --class org.apache.spark.examples.SparkPi \
    --conf spark.executor.instances=5 \
    --conf spark.kubernetes.container.image=<spark-image> \
    local:///path/to/examples.jar

其中 master 就是 kubernetes 的 apiserver 地址。提交之後整個作業的運行方式如下,先將 Driver 通過 Pod 啟動起來,然後 Driver 會啟動 Executor 的 Pod。

Google 雲平臺,也就是 GCP 在 github 上面開源了 Spark 的 Operator,repo 地址:
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
這裡有詳細的使用文檔:
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/quick-start-guide.md

2. Driver端程序的功能是什麼?

一個Spark作業運行時包括一個Driver進程,也是作業的主進程,具有main函數,並且有SparkContext的實例,是程序的入口點。
功能:負責向集群申請資源,向Master註冊信息,負責作業的調度,負責作業的解析、生成Stage並調度Task到Executor上。包括DAGScheduler和TaskScheduler。

3. Hadoop MapReduce和Spark都是並行計算,那麼他們有什麼相同點和區別?

兩者都是用MR模型進行並行計算,Hadoop的一個作業稱為Job(在YARN上也是Application),Job裡面分為MapTask和ReduceTask,每個Task都是在自己的進程中進行的,當Task結束時,進程也會結束。
Spark用戶提交的任務被稱為Application,一個Application對應一個SparkContext,App中存在多個Job,沒觸發一次Action操作就會產生一個Job。這些Job可以並行或串行執行,每個Job中有多個Stage,Stage是Shuffle過程中DAGScheduler通過RDD之間的依賴關係劃分Job而來的,每個Stage裡面有多個Task,組成TaskSet由TaskScheduler分發到各個Executor中執行,Executor的生命周期是和App一樣的,即使沒有Job運行也是存在的,所以Task可以快速啟動讀取內存進行計算,Spark的迭代計算都是在內存中進行的,API中提供了大量的RDD操作如join,group by等,而且通過DAG圖可以實現良好的容錯。
Hadoop的job只有map和reduce操作,表達能力比較欠缺而且在MR過程中會重複的讀寫HDFS,造成大量IO操作,多個Job需要自己管理依賴關係。

4. Spark中的RDD

RDD:Resilient Distributed DataSet,彈性分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、裡面的元素可並行計算的集合。

/**
 * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
 * partitioned collection of elements that can be operated on in parallel. This class contains the
 * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
 * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
 * pairs, such as `groupByKey` and `join`;
 * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
 * Doubles; and
 * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
 * can be saved as SequenceFiles.
 * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)])
 * through implicit.
 *
 * Internally, each RDD is characterized by five main properties:
 *
 *  - A list of partitions
 *  - A function for computing each split
 *  - A list of dependencies on other RDDs
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
 *    an HDFS file)
 *
 * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
 * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
 * reading data from a new storage system) by overriding these functions. Please refer to the
 * <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
 * for more details on RDD internals.
 */
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {...}

RDD五大特性:

A list of partitions 一個分區列表,RDD中的數據都存在一個分區列表裡面A function for computing each split 作用在每一個分區中的函數A list of dependencies on other RDDs 一個RDD依賴於其他多個RDD(RDD容錯機制)Optionally, a Partitioner for key-value RDDs KV類型的RDDOptionally, a list of preferred locations to compute each split on 數據本地性,最近的數據位置5. 寬依賴和窄依賴,groupByKey、reduceByKey、map、filter、union五種算子的寬窄依賴

窄依賴
父RDD的每一個分區最多被一個子RDD的分區所使用,表現為一個父RDD的分區對應於一個子RDD的分區,和兩個父RDD的分區對應於一個子RDD的分區。map和filter、union屬於第一類,對輸入進行協同劃分(co-partitioned)的join屬於第二類。

寬依賴
子RDD的分區依賴於父RDD的所有分區,Shuffle類操作的結果。

Shuffle的本質就是group by,把相同類型或相同規則的數據放在一起(磁碟或網絡IO進行分類)。

算子的寬窄依賴
對RDD進行map、filter、union等transformation一般是窄依賴。
寬依賴一般是對RDD進行groupByKey、reduceByKey等操作,就是對RDD中的partition中的數據進行重分區(Shuffle)。
join操作既可能是寬依賴,也可能是窄依賴,當要對RDD進行join操作時,如果RDD進行過重分區則為窄依賴,否則為寬依賴。6. Spark如何防止內存溢出?

可以增加Driver的內存參數:

# 默認1G
spark.driver.memory

這個參數用來設置Driver端的內存。在Spark程序中,SparkContext、DAGScheduler都是運行在Driver端的。對應RDD的Stage切分也是在Driver端運行,如果用戶自己寫的程序有過多的步驟,切分出過多的Stage,這部分信息消耗的是Driver的內存,這個時候就需要調大Driver的內存。

這種溢出的原因是在單個map中產生了大量的對象導致的,比如:rdd.map(x => for(i <-1 to 10000) yield i.toString),這個操作在RDD中,每個對象都產生了10000個對象,這肯定很容易產生內存溢出的問題。針對這種問題,在不增加內存的情況下,可以通過減少每個Task的大小,以便達到每個Task即使產生大量的對象Executor的內存也能夠裝得下。具體做法可以在會產生大量對象的map操作之前調用repartition方法,分區成更小的塊傳入map,例如:

rdd.repartition(10000).map(x => for(i <-1 to 10000) yield i.toString)

注意,不能使用rdd.coalesce,這個方法只能減少分區,不能增加分區,不會有Shuffle的過程。

數據不均衡導致內存溢出
數據不均衡,除了有可能導致內存溢出外,也有可能導致性能問題,解決方法和上面類似,就是調用repartition重新分區。

Shuffle後內存溢出
Shuffle內存溢出的情況可以說基本上都是Shuffle後,單個文件過大導致的。在Spark中,join、reduceByKey這一類的操作,都會有Shuffle的過程,在Shuffle的時候,需要傳入一個分區器Partitioner,大部分Spark中的Shuffle操作,默認的Partitioner都是HashPartitioner,默認值是父RDD中最大的分區數,這個參數通過

# 只對HashPartitioner有效
spark.default.parallelism
# Spark SQL使用下面參數
spark.sql.shuffle.partitions

控制。

如果是別的Partitioner導致的Shuffle內存溢出,就需要從Partitioner的代碼增加partitions數量。

rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()

內存不足時,rdd.cache會丟失數據,再次使用的時候會重算,StorageLevel.MEMORY_AND_DISK_SER內存不足時會存儲在磁碟,避免重新計算,只是會消耗一點IO時間。

7. Stage、Task和Job的區別與劃分方式Job:一個由多個任務組成的並行計算,當需要執行一個RDD的Action操作的時候,會生成一個JobStage:每個Job被拆分成更小的被稱作Stage(階段)的Task(任務)組,Stage彼此之間是相互依賴的,各個Stage會按照執行順序依次執行(Pipeline)Task:一個將要發送到Executor中的工作單元。是Stage的一個任務執行單元,一般來說,一個RDD有多少個Partition,就會有多少個Task,因為每一個Task只是處理一個Partition上的數據。

Stage

private[scheduler] abstract class Stage(
    val id: Int,
    val rdd: RDD[_],
    val numTasks: Int,
    val parents: List[Stage],
    val firstJobId: Int,
    val callSite: CallSite,
    val resourceProfileId: Int)
  extends Logging {

  val numPartitions = rdd.partitions.length

  /** Set of jobs that this stage belongs to. */
  val jobIds = new HashSet[Int]

  /** The ID to use for the next new attempt for this stage. */
  private var nextAttemptId: Int = 0

  val name: String = callSite.shortForm
  val details: String = callSite.longForm

  /**
   * Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized
   * here, before any attempts have actually been created, because the DAGScheduler uses this
   * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
   * have been created).
   */
  private var _latestInfo: StageInfo =
    StageInfo.fromStage(this, nextAttemptId, resourceProfileId = resourceProfileId)

  /**
   * Set of stage attempt IDs that have failed. We keep track of these failures in order to avoid
   * endless retries if a stage keeps failing.
   * We keep track of each attempt ID that has failed to avoid recording duplicate failures if
   * multiple tasks from the same stage attempt fail (SPARK-5945).
   */
  val failedAttemptIds = new HashSet[Int]

  private[scheduler] def clearFailures() : Unit = {
    failedAttemptIds.clear()
  }

  /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
  def makeNewStageAttempt(
      numPartitionsToCompute: Int,
      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
    val metrics = new TaskMetrics
    metrics.register(rdd.sparkContext)
    _latestInfo = StageInfo.fromStage(
      this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences,
      resourceProfileId = resourceProfileId)
    nextAttemptId += 1
  }

  /** Returns the StageInfo for the most recent attempt for this stage. */
  def latestInfo: StageInfo = _latestInfo

  override final def hashCode(): Int = id

  override final def equals(other: Any): Boolean = other match {
    case stage: Stage => stage != null && stage.id == id
    case _ => false
  }

  /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
  def findMissingPartitions(): Seq[Int]

  def isIndeterminate: Boolean = {
    rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
  }
}

Task

private[spark] abstract class Task[T](
    val stageId: Int,
    val stageAttemptId: Int,
    val partitionId: Int,
    @transient var localProperties: Properties = new Properties,
    // The default value is only used in tests.
    serializedTaskMetrics: Array[Byte] =
      SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(),
    val jobId: Option[Int] = None,
    val appId: Option[String] = None,
    val appAttemptId: Option[String] = None,
    val isBarrier: Boolean = false) extends Serializable {

  @transient lazy val metrics: TaskMetrics =
    SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(serializedTaskMetrics))

  /**
   * Called by [[org.apache.spark.executor.Executor]] to run this task.
   *
   * @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
   * @param attemptNumber how many times this task has been attempted (0 for the first attempt)
   * @param resources other host resources (like gpus) that this task attempt can access
   * @return the result of the task along with updates of Accumulators.
   */
  final def run(
      taskAttemptId: Long,
      attemptNumber: Int,
      metricsSystem: MetricsSystem,
      resources: Map[String, ResourceInformation]): T = {
    SparkEnv.get.blockManager.registerTask(taskAttemptId)
    ...
  }
  ...
}

TaskSet

/**
 * A set of tasks submitted together to the low-level TaskScheduler, usually representing
 * missing partitions of a particular stage.
 */
private[spark] class TaskSet(
    val tasks: Array[Task[_]],
    val stageId: Int,
    val stageAttemptId: Int,
    val priority: Int,
    val properties: Properties,
    val resourceProfileId: Int) {
  val id: String = stageId + "." + stageAttemptId

  override def toString: String = "TaskSet " + id
}

8. Spark提交作業參數

源碼在spark/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

在提交任務時的幾個重要參數:

executor-cores 每個executor使用的內核數,默認為1 建議2-5個;一般可以設置4個
Spark standalone, YARN and Kubernetes only:
--executor-cores NUM  Number of cores used by each executor. (Default: 1 in YARN and K8S modes, or all available cores on the worker in standalone mode).

源碼裡有個測試類:
spark/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

"--executor-cores", "5",

num-executors 啟動executors的數量,默認為2
Spark on YARN and Kubernetes only:
--num-executors NUM  Number of executors to launch (Default: 2).
If dynamic allocation is enabled, the initial number of executors will be at least NUM.

executor-memory executor的內存大小,默認1GB
--executor-memory MEM  Memory per executor (e.g. 1000M, 2G) (Default: 1G).

driver-cores Driver端使用的內核數,默認為1
Cluster deploy mode only:
--driver-cores NUM  Number of cores used by the driver, only in cluster mode
(Default: 1).

driver-memory Driver端使用的內存大小,默認512MB
--driver-memory MEM  Memory for driver (e.g. 1000M, 2G) (Default: ${mem_mb}M).

例如提交任務On YARN:

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    --driver-cores 2 \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 2 \
    --num-executors 2 \
    --queue thequeue \
    examples/jars/spark-examples*.jar \
    10

9. Spark中reduceByKey VS groupByKey區別與用法

reduceByKey用於對每個Key對應的多個Value進行merge操作,最重要的是它能夠在本地先進行merge操作,並且merge操作可以通過函數自定義。
JavaPairRDD#reduceByKey#similarly to a "combiner" in MapReduce.

/**
 * Merge the values for each key using an associative and commutative reduce function. This will
 * also perform the merging locally on each mapper before sending results to a reducer, similarly
 * to a "combiner" in MapReduce.
 */
def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
  fromRDD(rdd.reduceByKey(partitioner, func))

groupByKey也是對每個Key對應的多個Value進行操作,但是只是匯總生成一個Sequence,本身不能自定義函數,只能通過額外的map(func)來實現。

/**
 * Group the values for each key in the RDD into a single sequence. Allows controlling the
 * partitioning of the resulting key-value pair RDD by passing a Partitioner.
 *
 * @note If you are grouping in order to perform an aggregation (such as a sum or average) over
 * each key, using `JavaPairRDD.reduceByKey` or `JavaPairRDD.combineByKey`
 * will provide much better performance.
 */
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =
  fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))

/**
 * Group the values for each key in the RDD into a single sequence. Hash-partitions the
 * resulting RDD with into `numPartitions` partitions.
 *
 * @note If you are grouping in order to perform an aggregation (such as a sum or average) over
 * each key, using `JavaPairRDD.reduceByKey` or `JavaPairRDD.combineByKey`
 * will provide much better performance.
 */
def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =
  fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))

在大的數據集上,reduceByKey(func)的效果比groupByKey()的效果更好一些。因為reduceByKey()會在Shuffle之前對數據進行合併,傳輸速度優於groupByKey(網絡IO)。

10. foreach和map的區別

先看源碼,foreach是RDD中Actions裡的第一個方法:
Actions (launch a job to return a value to the user program)

// Actions (launch a job to return a value to the user program)

/**
 * Applies a function f to all elements of this RDD.
 */
def foreach(f: T => Unit): Unit = withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

map則是RDD中Transformations裡的第一個方法:
Transformations (return a new RDD)

// Transformations (return a new RDD)

/**
 * Return a new RDD by applying a function to all elements of this RDD.
 */
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}

兩個方法的共同點:都是用於遍歷集合對象,並對每一項執行指定的方法。
兩者的差異:

foreach沒有返回值(準確的說是返回void),map返回集合對象。foreach用於遍歷集合,而map用於映射(轉換)集合到另一個集合。foreach中的處理邏輯是串行的,map中的處理邏輯是並行的。map是Transformation算子,foreach是Action算子。11. map與mapPartitions的區別

mapPartitions:

/**
 * Return a new RDD by applying a function to each partition of this RDD.
 *
 * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
 * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
 */
def mapPartitions[U: ClassTag](
    f: Iterator[T] => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U] = withScope {
  val cleanedF = sc.clean(f)
  new MapPartitionsRDD(
    this,
    (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),
    preservesPartitioning)
}

相同點:map與mapPartitions都屬於Transformation算子
區別:

mapPartitions則是對RDD中的每個分區的迭代器進行操作map操作性能低下。比如一個partition中有一萬條數據,那麼在分析每個分區時,function要執行和計算1萬次。mapPartitions性能高。使用mapPartitions操作之後,一個Task僅僅會執行一次function,function一次接收所有的partition數據。只要執行一次就可以了,性能比較高。RDD中的每個分區數據量超大的情形,比如一個Partition有100萬條數據。mapPartitions一次傳入一個function後,可能一下子內存不夠用,造成OOM(內存溢出)。12. foreach和foreachPartition的區別

相同:foreach和foreachPartition都屬於Action算子
區別:

foreachPartition每次處理RDD中每個分區的迭代器中的數據
/**
 * Applies a function f to each partition of this RDD.
 */
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}

13. groupByKey、reduceByKey、combineByKey的區別

上面第9已經對Spark中reduceByKey VS groupByKey區別與用法做了說明。

用於對每個Key進行操作,將結果生成一個Sequence用於對每個Key對應的多個Value進行merge操作reduceByKey底層就是使用了combineByKey,準確一點是combineByKeyWithClassTag14. sortByKey這個算子是全局排序嗎?

sortByKey是全局排序。RDD#sortBy

/**
 * Return this RDD sorted by the given key function.
 */
def sortBy[K](
    f: (T) => K,
    ascending: Boolean = true,
    numPartitions: Int = this.partitions.length)
    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
  this.keyBy[K](f)
      .sortByKey(ascending, numPartitions)
      .values
}

OrderedRDDFunctions#sortByKey

/**
 * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
 * `collect` or `save` on the resulting RDD will return or output an ordered list of records
 * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
 * order of the keys).
 */
// TODO: this currently doesn't work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
    : RDD[(K, V)] = self.withScope
{
  val part = new RangePartitioner(numPartitions, self, ascending)
  new ShuffledRDD[K, V, V](self, part)
    .setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

在sortByKey之前將數據使用Partitioner根據數據範圍來分(keyBy)。使得p1分區所有的數據小於p2,p2分區所有的數據小於p3,依次類推。(p1~pn是分區標識)。然後,使用sortByKey算子針對每一個Partition進行排序,這樣全局的數據就被排序了。15. Spark中coalesce VS repartition

先看源碼:
coalesce shuffle: Boolean = false

/**
 * Return a new RDD that is reduced into `numPartitions` partitions.
 *
 * This results in a narrow dependency, e.g. if you go from 1000 partitions
 * to 100 partitions, there will not be a shuffle, instead each of the 100
 * new partitions will claim 10 of the current partitions. If a larger number
 * of partitions is requested, it will stay at the current number of partitions.
 *
 * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
 * this may result in your computation taking place on fewer nodes than
 * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
 * you can pass shuffle = true. This will add a shuffle step, but means the
 * current upstream partitions will be executed in parallel (per whatever
 * the current partitioning is).
 *
 * @note With shuffle = true, you can actually coalesce to a larger number
 * of partitions. This is useful if you have a small number of partitions,
 * say 100, potentially with a few partitions being abnormally large. Calling
 * coalesce(1000, shuffle = true) will result in 1000 partitions with the
 * data distributed using a hash partitioner. The optional partition coalescer
 * passed in must be serializable.
 */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
             partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
            (implicit ord: Ordering[T] = null)
    : RDD[T] = withScope {
  require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
  if (shuffle) {
    /** Distributes elements evenly across output partitions, starting from a random partition. */
    val distributePartition = (index: Int, items: Iterator[T]) => {
      var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
      items.map { t =>
        // Note that the hash code of the key will just be the key itself. The HashPartitioner
        // will mod it with the number of total partitions.
        position = position + 1
        (position, t)
      }
    } : Iterator[(Int, T)]

    // include a shuffle step so that our upstream tasks are still distributed
    new CoalescedRDD(
      new ShuffledRDD[Int, T, T](
        mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
        new HashPartitioner(numPartitions)),
      numPartitions,
      partitionCoalescer).values
  } else {
    new CoalescedRDD(this, numPartitions, partitionCoalescer)
  }
}

repartition shuffle = true

/**
 * Return a new RDD that has exactly numPartitions partitions.
 *
 * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
 * a shuffle to redistribute data.
 *
 * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
 * which can avoid performing a shuffle.
 */
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  coalesce(numPartitions, shuffle = true)
}

通常認為coalesce不產生Shuffle會比repartition產生Shuffle效率高,而實際情況往往要根據具體問題具體分析,coalesce效率不一定高,有時還可能有大坑,所以還是要慎用。
兩個算子都是對RDD的分區進行重新劃分,repartition調用了coalesce,把默認為false的shuffle參數置為了true。
假設有一個RDD有N個分區,需要重新劃分為M個分區:

如果N<M。一般情況下N個分區有數據分布不均勻的狀況,利用HashPartitioner函數將數據重新分區為M個,這時需要將shuffle設置為true。如果N>M,並且N和M相差不多,(假如N是1000,M是100),那麼就可以將N個分區中的若干個分區合併成一個新的分區,最終合併為M個分區,這時可以將shuffle設置為false,如果M>N時,coalesce是無效的,不進行Shuffle過程,父RDD和子RDD之間是窄依賴關係,無法使文件數partitions變多。總之如果shuffle為false時,傳入的參數大於現有的分區數目,RDD的分區數將保持不變。也就是說不經過Shuffle,是無法將RDD的分區數變多的。如果N>M,並且N和M相差很大很大,這是要看executor數量與要生成的partition的關係。如果executor數 <= 要生成的partition數,coalesce效率高,反之如果用coalesce可能會導致(executor數 - 要生成的partition數)個executor空跑從而降低效率。如果在M為1的時候,為了使得coalesce之前的操作有更好的並行度,可以將shuffle設置為true。

相關焦點

  • SparkCore——專業術語及流程圖
    隨後這些具體的Task每個都會被分配到集群上的某個節點的某個Executor去執行。·每個節點可以起一個或多個Executor。·每個Executor由若干core組成,每個Executor的每個core一次只能執行一個Task。·每個Task執行的結果就是生成了目標RDD的一個partiton。
  • 當MongoDB遇見Spark
    作為文檔資料庫則表現得更加細顆粒化MongoDB支持HDFS所沒有的索引的概念, 所以在讀取上更加快MongoDB支持的增刪改功能比HDFS更加易於修改寫入後的數據HDFS的響應級別為分鐘, 而MongoDB通常是毫秒級別如果現有資料庫已經是MongoDB的話, 那就不用再轉存一份到HDFS上了可以利用MongoDB強大的Aggregate
  • 【大數據嗶嗶集20210117】Spark面試題靈魂40問
    spark並行度,每個core承載24個partition,如,32個core,那麼64128之間的並行度,也就是設置64~128個partion,並行讀和數據規模無關, 只和內存使用量和cpu使用時間有關。29、collect功能是什麼,其底層是怎麼實現的?
  • 大數據分析工程師面試集錦3-SQL/SparkSql/HiveQL
    面試題庫01SQL基礎知識考察對於面試初級數據分析師來說,SQL的面試重點會放在基礎知識的考察,如果最基本的基礎概念和語法都不能熟練回答出來的話,通過面試的機率就會很低。下面兩張圖是SQL基礎概念和基礎語法的考題大綱圖,接下來圍繞圖中提到的概念來列舉幾個常見面試題。圖1 基礎概念圖2 基礎語法考題模擬題1:你覺得SQL是一種什麼樣的語言,說說你對它的認識。
  • Spark【面試】
    Spark【面試】 存的是和hdfs的映射關係,hive是邏輯上的數據倉庫,實際操作的都是hdfs上的文件,HQL就是用sql語法來寫的mr程序。8、Hive與關係型資料庫的關係?沒有關係,hive是數據倉庫,不能和資料庫一樣進行實時的CURD操作。
  • 數據分析工程師面試集錦5——Spark面試指南
    導語本篇文章為大家帶來spark面試指南,文內會有兩種題型,問答題和代碼題,題目大部分來自於網絡上,有小部分是來自於工作中的總結,每個題目會給出一個參考答案。為什麼考察Spark?Spark作為大數據組件中的執行引擎,具備以下優勢特性。
  • Spark面試高頻考點,必知必會!
    千萬不要被冗長的步驟嚇到,一定要學會總結差異,發現規律,通過圖形去增強記憶。        RDD 可是Spark中最基本的數據抽象,我想就算面試不被問到,那自己是不是也應該非常清楚呢!        這道題就已經開始摻和有「源碼」的味道了,為什麼呢?
  • 百度大數據三面題:shuffle過程+HBase+Spark優化+kmeans算法
    剛剛收到百度的offer內心激動的同時也挺欣慰,經過幾輪的面試下來心裡其實已經有個底了,不過沒有拿到offer的時候內心還是挺慌的,最後終於如願以償的拿到了offer,也算是對得起這麼久的準備。hadoop和spark的都是並行計算,那麼他們有什麼相同和區別呢?說一說Spark Streaming和Storm有何區別?kafka的數據存在內存還是磁碟Hive與關係型資料庫的關係?
  • 黑馬程式設計師:技術筆記大數據面試題之spark相關(二)
    昨天分享了大數據面試題之spark相關一,看到有很大的反響,今天就分享接下來的二,希望能更好的幫助到大家!spark的迭代計算都是在內存中進行的,API中提供了大量的RDD操作如join,groupby等,而且通過DAG圖可以實現良好的容錯。13.RDD機制? 答:rdd分布式彈性數據集,簡單的理解成一種數據結構,是spark框架上的通用貨幣。
  • Spark SQL重點知識總結
    除此之外提供了以樣例類為Schema模型的強類型5、DataFrame=DataSet[Row]6、DataFrame和DataSet都有可控的內存管理機制,所有數據都保存在非堆上,都使用了catalyst進行SQL的優化。
  • 使用Golang實現Spark UDF
    注意在java源文件中的名稱需要和這些代碼和路徑匹配得上。組合即將大功告成,但我們需要一個spark jar包用來做編譯。我寫了一個Makefile,它會下載spark jar包、存儲到./spark_jars目錄下。
  • 詭異 | Spark使用get_json_object函數
    使用spark命令:/opt/software/spark-2.2.0-bin-hadoop2.6/bin/spark-sql \--master yarn-client \--driver-memory
  • Java 基礎知識面試題與知識點總結!(100題)
    Java 基礎知識面試題與知識點知識點:Java基礎知識JavaJDK 監控和故障處理工具Java類文件結構 Java類加載過程 Java類加載器 Java雙親委派模型 自定義類加載器 程序計數器 虛擬機棧 本地方法棧 jvm-堆Java IO BIO NIO AIO面試題
  • 『 Spark 』2. spark 基本概念解析
    Application用戶在 spark 上構建的程序,包含了 driver 程序以及在集群上運行的程序代碼,物理機器上涉及了 driver,master,worker 三個節點.2.Executor在每個 Worker Node 上為某應用啟動的一個進程,該進程負責運行任務,並且負責將數據存在內存或者磁碟上,每個任務都有各自獨立的 Executor。 Executor 是一個執行 Task 的容器。它的主要職責是:總結:Executor 是一個應用程式運行的監控和執行容器。6.
  • RDD和SparkSQL綜合應用
    import findspark#指定spark_home為剛才的解壓路徑,指定python路徑spark_home = "/Users/liangyun/ProgramFiles/spark-3.0.1-bin-hadoop3.2"python_path = "/Users/liangyun/anaconda3
  • Spark 1.6.0 新手快速入門
    Spark交互式Shell的使用 基礎 Spark的交互式Shell提供了一個簡單的方式來學習Spark的API,同時也提供了強大的交互式數據處理能力。Spark Shell支持Scala和Python兩種語言。啟動支持Scala的Spark Shell方式為 .
  • SparkSQL 50道練習題
    為了鞏固大家的基礎,提升實戰的能力,本期又備下了一道關於SparkSQL,綜合性比較全面的基礎訓練題,希望大家能夠受用學習進步。準備數據查詢結果排序36.查詢「男」教師及其所上的課程37.查詢最高分同學的Sno、Cno和Degree列38.查詢和「李軍」同性別的所有同學的Sname39.查詢和「李軍」同性別並同班的同學Sname40.查詢所有選修「計算機導論」課程的「男」同學的成績表41.查詢Student表中的所有記錄的Sname、Ssex和Class列42.查詢教師所有的單位即不重複的Depart列43.查詢Student表的所有記錄
  • 2020 前端面試 | 第一波面試題總結
    年底由於種種原因想換一份工作,但由於太忙,沒認真搞簡歷,也沒怎麼複習基礎,導致很多本來會的都沒敢往簡歷上寫。於是寫了一個簡版的簡歷掛在Boss直聘上,準備年過完再認真籌備這件事情。 令我意外的是,近一個月收到多家公司的面試邀請。
  • 2020大數據面試題真題總結(附答案)
    (智雲健康)v1.22020-08-08朋友面試數據專家提供的數據驅動,spark及flink方面面試題(華為,阿里,小影,拼便宜)v1.32020-08-22朋友面試數據開發提供的關於hive及數倉方面的題目(美團)v1.42020-09-06老徐提供螞蟻阿里微店面試題(數倉方向)及朋友提供數據開發面試題(離線+實時)及軟通面試題
  • SparkSQL基礎及實戰練習
    Spark的入門測試        首先讓我們準備好該題所需的數據 test.txt        數據結構如下依次是:班級 姓名 年齡 性別 科目 成績12 宋江 25 男 chinese 5012 宋江