在印象中spark的速度和hive on mr模式比特別快,但實際上SparkSQL操作insert overwrite table到hive特別慢。針對此情況,說下自己收集和使用的方案:
解決方案如下:
1. 將dataframe重新分區,dataFreame.repartition(1),重新分區為1,原來是 job 花了大量的時間在寫文檔上。來源連結:https://www.dazhuanlan.com/2019/08/21/5d5d11a2893ea/
monthDF.repartition(1).createOrReplaceTempView("tempMonth")
val insertMonthSQL =
"""
| INSERT OVERWRITE TABLE app.table_name PARTITION (dt)
| SELECT
| id,
| name,
| class,
| dt
| FROM tempMonth
""".stripMargin
sparkSession.sql(insertMonthSQL)
dataframe.registerTempTable("result")
sql(s"""INSERT OVERWRITE Table $outputTable PARTITION (dt ='$outputDate') select * from result""")
而整個結果數據的產生只需要4分鐘左右的時間,比如以下方式:將結果以textfile存入hdfs:
result.rdd.saveAsTextFile(output_tmp_dir)
由此可見,對hive的寫入操作耗用了大量的時間。
對此現象的優化可以是,將文件存為符合hive table文件的格式,然後使用hive load將產生的結果文件直接move到指定目錄下。代碼如下:
result.rdd.map { r => r.mkString("\001") }.repartition(partitions).saveAsTextFile(output_tmp_dir)
sql(s"""load data inpath '$output_tmp_dir' overwrite into table $output partition (dt='$dt')""")
詳解:
hive column默認分隔符在scala/java中的表示為「/001」,r.mkString("/001")既是將column以分隔符/001進行分割,hive在導入時會自動識別。
repartition(partitions)是為了防止hdfs中產生大量小文件。partitions的設定與最終結果大小有關,一般是result_size/hdfs_block_size。
此處使用hive load data命令,將hdfs文件load到hive表中。後臺操作為直接將目錄下的文件移到hive table所在目錄,所以只是hdfs move數據的過程,執行非常快。
sql(s"""load data inpath '$output_tmp_dir' overwrite into table $tmp_table partition (dt='$dt')""")
sql(s"""INSERT OVERWRITE Table $outputTable PARTITION (dt ='$outputDate') select * from $tmp_table where dt='$dt'""")
在資源配置為--num-executors 20 --executor-cores 4,結果數據為1.8g的情況下,需要額外耗時50s。好處是結果數據使用列式、壓縮方式存儲,壓縮比12.7左右。
使用優化後的方式,原有test case的耗時從半小時降到4分鐘,效率提升明顯。