Spark實戰第二版(涵蓋Spark3.0)-第十章 通過結構化流接入數據

2021-02-23 登峰大數據
關注公眾號:登峰大數據,閱讀Spark實戰第二版(完整中文版),系統學習Spark3.0大數據框架!如果您覺得作者翻譯的內容有幫助,請分享給更多人。您的分享,是作者翻譯的動力!

本章涵蓋

從總覽的角度看你的數據,專注於數據生成部分。您看到的是生成批量數據的系統,還是連續生成數據的系統?提供數據流(也稱為streams)的系統幾年前不太流行。streams肯定會得到更多的關注,理解streams是本章的重點。

例如,您的手機會定期ping手機發射塔。如果是智慧型手機(本書的讀者群,很有可能),它將同時檢查電子郵件和其他內容。

通過(智能)城市的巴士會發送其GPS坐標。

超市收銀臺的收銀機在收銀員(或你)將商品從掃描儀前掃描時生成數據。交易是在你付款時進行的。

當你把車開進車庫時,大量的信息被收集、存儲並發送給其他各種接收者,比如製造商、保險公司或報告公司。

在美國,當病人進入醫療設施時,會生成包含原子信息的信息,稱為入院、出院和轉院(ADTs)。

像流數據這樣的業務;這使他們對發生的事情的時效性更敏銳。在我居住的北卡羅來納州,可能還有美國其他地方,一有災難宣布,人們就會奔向商店購買牛奶、水和鬆軟的麵包(不要問我他們會怎麼處理這些東西)。在商業方面,獲得正在出售的商品的實時feed實際上可以觸發配送中心更快的響應,向銷售最多的雜貨店供應更多的商品。在這種情況下,我不是在談論災難發生時,甚至是災難恢復時,而是能夠對市場需求作出更快的反應。2019年,數據更應該被視為一種流,而不是被填滿的煙囪。

在本章中,你將會發現什麼是流以及它與批處理模式的區別(如此之小)。然後您將構建第一個流處理應用程式。

為了讓流更容易模擬,我在本章的原始碼庫中構建並添加了一個流數據生成器,它對於模擬流非常有用。您可以根據需要調整這個生成器。附錄O涵蓋了數據生成器的詳細信息,這不是本章的先決條件。然而,我們將在本章的實驗中使用生成器。

從本章開始,示例和實驗將使用更多的日誌記錄,而不是簡單的屏幕列印。日誌簡化了讀取,並且是一種行業標準,它更符合您在日常工作中的期望(而println是一種糟糕的開發實踐)。日誌設置仍然會在控制臺上轉儲信息;不需要在一個偏僻的地方尋找日誌文件!

您還將使用一個更複雜的示例進行實驗,在該示例中,您將使用兩個流。最後,作為本章的總結,您將學習結構化流(從Apache Spark v2開始)和離散流(從Apache Spark v1開始)之間的區別。附錄P提供了關於流處理的額外資源。

圖10.1顯示了您在Apache Spark中接入數據的位置。好消息!這是關於數據接入的最後一章。

圖10.1 您獲取數據的旅程即將結束;這一章涵蓋結構化流數據接入。

實驗

本章的例子可以在GitHub上找到:https://github .com/jgperrin/net.jgp.books.spark.ch10。附錄L是接入參考。附錄P還提供了關於流處理的額外資源。

10.1 流處理是什麼?

在本節中,讓我們在Apache Spark的上下文中看看流計算。這將為您提供必要的基礎知識,以理解示例並將流集成到您的項目中。

通過流處理數據並不是一個新奇的想法。然而,近年來它越來越受歡迎。沒人願意再等什麼了。作為一個社會,我們一直都在期待直接的結果。你去看醫生,你希望回家時在醫療保健提供者的門戶網站上看到你的索賠。你把意外購買的電視退回到好市多(Costco),你希望馬上看到信用卡帳單上的信用記錄。你完成了Lyft之旅,希望馬上就能看到你的SkyMiles獎金。在一個數據加速發展的世界裡,流處理絕對有它的一席之地:沒有人願意等待通宵批處理。

使用流的另一個原因是,隨著數據量的增加,將其分割成小塊以減少尖峰時間的負載也是一個不錯的主意。

總的來說,流計算比傳統的批處理計算更自然,因為它發生在一個流中。然而,因為這是一個不同於你可能習慣的範式,你可能不得不改變你的思維方式。圖10.2說明了該系統。

圖10.2 在這個典型的流場景中,數據被生成、作為原子元素髮送、動態轉換,最後以處理後的形式提供實時報告、存儲等。

流通常有兩種形式:文件流和網絡流。在文件流場景中,文件被放置在一個目錄中(有時稱為著陸區或暫存區;(見第1章),Spark從這個目錄中獲取文件。在第二個場景中,數據通過網絡發送。

Apache Spark處理流的方式是在一小段時間內重新分組操作。這被稱為微批量。

10.2 創建第一個流處理程序

對於您的第一個流處理,您將使用文件。文件在文件夾中生成。Spark在文件生成時,處理這些文件。這種更簡單的場景避免了潛在的網絡問題,並將說明流的核心原則,即在數據可用時立即使用數據。

文件流在醫療保健行業是一個常見的用例。醫院(提供者)可以將文件轉儲到FTP伺服器上,然後由保險公司(支付方)獲取這些文件。

在這個場景中,您將運行兩個應用程式。對於文件流,啟動應用程式的順序無關緊要。一個應用程式將生成流,其中包含描述人員的記錄。另一個應用程式將使用Spark來使用生成的流。您將從數據生成器開始,然後構建使用者。圖10.3說明了這個過程。

圖10.3 在這個流場景中,一個應用程式(生成器)在一個文件中生成記錄,並將這些文件放在一個文件夾中。Spark應用程式(消費者)讀取該文件夾中的文件並顯示數據。

10.2.1 生成文件流

要模擬穩定的數據流,首先要啟動記錄生成器。在本節中,您將查看生成器的輸出,學習如何編譯和運行它,然後簡要查看它的代碼。

為了使事情更容易一些,我在本章的存儲庫中添加了一個記錄生成器。記錄生成器的設計目的是創建帶有描述人名的隨機記錄的文件:他們有名字、中間名、姓氏、年齡和美國社會安全號碼。嘿,這是假數據,所以不要以為你可以用它來冒充任何人!

在本節中,您將運行生成器,但不會對其進行太多修改。附錄O描述了如何修改生成器或基於生成器所基於的easy API構建自己的生成器。

運行時,RecordsInFilesGeneratorApp的輸出如下所示。

#清單10.1流生成器的輸出[INFO] Scanning for projects.[INFO] --- exec-maven-plugin:1.6.0:java (write-file-in-stream) @sparkInAction-chapter10 ---2020-11-13 12:14:12.496 -DEBUG --- [treamApp.main()] ure.getRecords(RecordStructure.java:131): Generated data:Aubriella,Silas,Gillet,62,373-69-4505Reese,Clayton,Kochan,2,130-00-2393Trinity,Sloan,Vieth,107,202-34-4161Daphne,Forrest,Huffman,77,250-50-6797Emmett,Heath,Golston,41,133-17-2450Alex,Orlando,Courtier,32,290-51-1937Titan,Deborah,Mckissack,89,073-83-0162
2020-11-13 12:14:12.498 - INFO --- [treamApp.main()] erUtils.write(RecordWriterUtils.java:21): Writing in: /var/folders/v7/ 3jv0[...]/T/streaming/in/contact_1542129252485.txt ...

在Eclipse這樣的IDE中同時運行兩個應用程式並不容易。因此,我確保您可以使用Maven從命令行運行所有的實驗。如果您不熟悉Maven,附錄B介紹了Maven的安裝,附錄H提供了一些使用Maven的技巧。

在本地複製了存儲庫之後,轉到存放項目的pom.xml文件的文件夾。在本例中,示例如下:

$ cd /Users/jgp/Workspaces/Books/net.jgp.books.spark.ch10

然後清理和編譯您的數據生成應用程式。首先,您不會修改它,而只是編譯並運行它。清理將確保不留下已編譯的工件。第一次肯定不需要,因為沒有什麼要清洗的。編譯將簡單地構建應用程式:

如果Maven開始下載很多包(或者沒有),不要驚慌,除非您是根據您所使用的數據量為internet服務付費。然後運行這個:

$ mvn exec:java@generate-records-in-files

這個命令將在由generate -records-in-files ID定義的pom.xml文件中執行應用程式,如清單10.2所示。本章中的示例依賴於您的pom.xml中的id來區分不同的應用程式。當然,您也可以在IDE中運行所有的應用程式。

#清單10.2 pom.xml的摘錄...<build>     <plugins>     <plugin>       <groupId> org.codehaus.mojo </groupId>       <artifactId> exec-maven-plugin </artifactId>       <version> 1.6.0 </version>       <executions>         <execution>         <id> generate-records-in-files </id>//定義被調用的「塊」的唯一標識符         <goals>           <goal> java </goal>         </goals>         <configuration>           <mainClass> net.jgp.books.spark.ch10.x.utils.streaming. app.RecordsInFilesGeneratorApp </mainClass> #2         </configuration>          </execution>   ...

下面的清單顯示了生成器代碼。附錄O詳細介紹了生成器、生成器的API以及它的可擴展性。

#清單10.3 RecordsInFilesGeneratorApp.javapackage net.jgp.books.spark.ch10.x.utils.streaming.app;
import net.jgp.books.spark.ch10.x.utils.streaming.lib.*; public class RecordsInFilesGeneratorApp { public int streamDuration = 60; public int batchSize = 10; public int waitTime = 5;
public static void main(String[] args ) { RecordStructure rs = new RecordStructure( "contact" ) .add( "fname" , FieldType. FIRST_NAME ) .add( "mname" , FieldType. FIRST_NAME ) .add( "lname" , FieldType. LAST_NAME ) .add( "age" , FieldType. AGE ) .add( "ssn ", FieldType. SSN );
RecordsInFilesGeneratorApp app = new RecordsInFilesGeneratorApp(); app .start( rs ); }
private void start(RecordStructure rs ) { long start = System.currentTimeMillis(); while ( start + streamDuration * 1000 > System.currentTimeMillis()) { int maxRecord = RecordGeneratorUtils.getRandomInt( batchSize ) + 1; RecordWriterUtils.write( rs .getRecordName() + "_" + System.currentTimeMillis() + ".txt" , rs .getRecords( maxRecord , false )); try { Thread.sleep(RecordGeneratorUtils.getRandomInt( waitTime * 1000) + waitTime * 1000 / 2);... } catch (InterruptedException e ) { } } } }

你可以修改參數(streamDuration, batchSize和waitTime)和記錄的結構來研究各種行為:

streamDuration以秒為單位定義流的持續時間。預設值為60秒(1分鐘)。

batchSize定義單個事件中記錄的最大數量。默認值為10意味著生成器最多生成10條記錄。

waitTime是生成器在兩個事件之間等待的時間。這個值有一些隨機性:默認值5毫秒意味著應用程式將在2.5毫秒(= 5 / 2)到7.5毫秒(= 5 × 1.5)之間等待。

10.2.2 消費記錄

現在文件夾已經被文件中的記錄填滿了,您可以使用Spark來消費它們。您將首先查看記錄顯示的方式,然後深入研究代碼。

實驗

這裡是200號實驗。它可以在GitHub上訪問https://github.com/jgperrin/ net.jgp.books.spark.ch10。這個應用程式是net.jgp.books.spark.ch10.lab200_read_stream包中的ReadLinesFromFileStreamApp.java類。

實驗只需要接入這些記錄,將它們存儲在一個dataframe中(是的,與您之前使用的dataframe相同),然後在控制臺上顯示結果。圖10.4說明了這個過程。

圖10.4從流中獲取數據的過程:初始化Spark和流之後,您的自定義代碼將在獲取的數據上執行。

下面的清單顯示了應用程式的輸出。

#清單10.4 ReadLinesFromFileStreamApp的輸出2020-11-16 14:13:54.924 -DEBUG --- [ main]➥ tart(ReadLinesFromFileStreamApp.java:29): -> start()---Batch: 0---+----+|value                             |+----+|Mara,Jamison,Acy,52,492-23-4955   ||Ariel,Raegan,Abend,104,007-31-2841||Kynlee,Ari,Bevier,106,439-70-9576 |+----+only showing top 3 rows
---Batch: 1---+----+|value |+----+|Conrad,Alex,Gutwein,34,998-04-4584||Aldo,Adam,Ballard,6,996-95-8983 |+----+...2020-11-16 14:14:59.648 -DEBUG --- [ main] tart(ReadLinesFromFileStreamApp.java:58): <- start()

要啟動接入應用程式,可以直接在IDE中(本例中為Eclipse)或通過Maven運行它。在複製項目的同一目錄下,在另一個終端中,運行以下命令:

$ cd /Users/jgp/Workspaces/Book/net.jgp.books.spark.ch10$ mvn clean install $ mvn exec:exec@lab200

注意,這裡使用的是exec:exec,而不是exec:java。通過使用exec:exec, Maven將啟動一個新的JVM來運行您的應用程式。通過這種方式,您可以向JVM傳遞參數。下面的清單顯示了負責執行應用程式的pom.xml部分。

#清單10.5執行實驗#200的pom.xml部分,ReadLinesFromFileStreamApp...         <execution>         <id> lab200 </id>         <configuration>           <executable> java </executable>           <arguments>             <argument> -classpath </argument>             <classpath />             <argument> net.jgp.books.spark.ch10.lab200_read_stream. ReadLinesFromFileStreamApp </argument>           </arguments>         </configuration>         </execution> ...

讓我們分析一下net.jgp.books.spark.ch10中的ReadLinesFromFileStreamApp應用程式中的代碼。清單10.6中的lab200_read_stream包。我知道在原始碼的開頭有這麼一大塊導入並不總是吸引人的,但是隨著底層框架(這裡是Apache Spark)的各種發展,我希望確保您使用正確的包。

從本書的這一點開始,我將使用日誌記錄(SLF4J包)而不是println。日誌記錄是行業標準,而println可能會嚇到我們中的一些人(比如將您不希望用戶看到的信息轉儲到控制臺)。我不會在書中描述每個實驗中日誌的初始化,以保持代碼清晰。然而,在存儲庫中,您將找到每個示例的初始化(否則,它將不起作用,對嗎?)

無論您計劃使用流處理還是批處理的數據處理方式,創建Spark會話(Spark session)都沒有區別。

有了會話之後,可以使用readStream()方法請求會話從流中讀取數據。根據流的類型,它將需要額外的參數。這裡從一個目錄(由load()方法定義)讀取一個文本文件(由format()方法指定)。注意,format()的參數是一個String,而不是Enum,但是沒有什麼可以阻止您在某處使用一個小的實用程序類(例如,使用常量)。

到目前為止,還算簡單,不是嗎?你啟動一個會話,構建一個dataframe,從一個流讀取它。然而,在流中,數據可能存在,也可能不存在,也可能尚未到來。因此,應用程式需要等待數據的到來,有點像伺服器等待請求。數據寫入是通過dataframe的writeStream()方法和StreamingQuery對象完成的。

首先從作為流使用的dataframe定義流查詢對象。查詢將開始填充結果表,如圖10.5所示。結果表隨著輸入的數據而增長。

要構建你的查詢,你需要指定以下內容:

輸出模式outputMode(參見附錄P的輸出模式列表)。在這個實驗裡,你只顯示兩個接收信號之間的更新。格式format,它基本上說明了如何處理接收到的數據。在這個實驗中,將在控制臺上顯示它(而不是通過日誌記錄)。在文獻中,當提到輸出格式時,經常會讀到關於sink的內容。還可以參考附錄P了解不同的接收及其描述。選項option。在顯示到控制臺的情況下,truncate選項設置為false意味著記錄不會被截斷到某個長度,numRows指定最多顯示三條記錄。這相當於在非流(批處理)模式下在dataframe上調用show(3, false)。

指定輸出模式、格式和選項之後,就可以開始查詢了。

當然,現在應用程式需要等待數據進入。它是通過查詢的awaitterminate()方法實現的。awaitterminate()是一個阻塞方法。它可以有參數,也可以沒有參數。如果沒有參數,該方法將永遠等待。通過一個參數,可以指定方法將等待的時間。在那些實驗裡,我一直用一分鐘。

圖10.5 當數據流接收到數據時,數據被添加到結果表中。結果表是一個無界的表,它可以像dataframe一樣增長(當然是根據集群的物理存儲容量)。

這是你第一次從流中獲取數據。在下一節中,您將從流中提取完整的記錄,而不僅僅是原始數據行。

清單10.6 ReadLinesFromFileStreamApp.javapackage net.jgp.books.spark.ch10.lab200_read_stream; import java.util.concurrent.TimeoutException; import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.streaming.OutputMode;import org.apache.spark.sql.streaming.StreamingQuery;import org.apache.spark.sql.streaming.StreamingQueryException;import org.slf4j.Logger;import org.slf4j.LoggerFactory; import net.jgp.books.spark.ch10.x.utils.streaming.lib.StreamingUtils; public class ReadLinesFromFileStreamApp {     private static transient Logger log = LoggerFactory.getLogger(        ReadLinesFromFileStreamApp. class );      public static void main(String[] args ) {      ReadRecordFromFileStreamApp app = new ReadRecordFromFileStreamApp();       try {         app .start();      } catch (TimeoutException e ) {         log .error( "A timeout exception has occured: {}" , e .getMessage());      }    }      private void start() {       log .debug( "-> start()" );       SparkSession spark = SparkSession.builder()            .appName( "Read lines over a file stream" )            .master( "local" )            .getOrCreate();       Dataset<Row> df = spark            .readStream()            .format( "text" )            .load(StreamingUtils.getInputDirectory());       StreamingQuery query = df            .writeStream()            .outputMode(OutputMode.Append())            .format( "console" )            .option( "truncate" , false )            .option( "numRows" , 3)            .start();        try {         query .awaitTermination(60000);      } catch (StreamingQueryException e ) {         log .error(               "Exception while waiting for query to end {}." ,               e .getMessage(),               e );      }        log .debug( "<- start()" );    } }

注意,在Spark 3.0預覽版2中,StreamingQuery中的start()現在拋出一個超時異常,這需要管理。存儲庫中的代碼在相應的分支中相應地運行。

10.2.3 獲取記錄,而不是行

在前面的示例中,您吸收了諸如Conrad、Alex、Gutwein、34998 -04-4584等行。雖然數據在Spark中,但使用起來並不方便。它是原始的,您必須重新解析它,沒有數據類型. . . .讓我們通過使用模式將原始行轉換為記錄。

(未完待續.)  歡迎關注公眾號,及時獲得最新翻譯內容:

相關焦點

  • Spark機器學習.pdf
    書中沒有讓人抓狂的數據公式, 而是從準備和正確認識數據開始講起,全面涵蓋了推 薦系統、回歸、聚類、降維等經典的機器學習算法及 其實際應用。     本書適合網際網路公司從事數據分析的人員,以及 高校數據挖掘相關專業的師生閱讀參考。
  • Spark權威指南(中文版)----第27章 回歸
    其中一些模型是從第26章繼承下來的。其他的只與回歸問題域相關。參見ISL和ESL中的第3章來回顧回歸。讓我們讀取一些例子數據,這些數據將在整個章節中使用:// in Scalaval df =  spark.read.load("/data/regression")# in Pythondf =  spark.read.load("/data/regression")27.3.
  • spark結構化數據處理:Spark SQL、DataFrame和Dataset
    DataFrame的前身是SchemaRDD,從Spark 1.3.0開始SchemaRDD更名為DataFrame。與SchemaRDD的主要區別是:DataFrame不再直接繼承自RDD,而是自己實現了RDD的絕大多數功能。你仍舊可以在DataFrame上調用.rdd方法將其轉換為一個RDD。
  • Spark 2.0系列之SparkSession詳解
    Spark2.0中引入了SparkSession的概念,它為用戶提供了一個統一的切入點來使用Spark的各項功能,用戶不但可以使用DataFrame和Dataset
  • 一、Spark概述
    2、易用性(可以通過java/scala/python/R開發spark應用程式)3、通用性(可以使用spark sql/spark streaming/mlib/Graphx)4、兼容性(spark程序可以運行在standalone/yarn/mesos)Spark 框架模塊整個Spark 框架模塊包含
  • 《Spark編程基礎(Python版)》
    《Spark編程基礎(Python版)》(人民郵電出版社,ISBN:978-7-115-52439-3)由國內高校知名大數據教師廈門大學林子雨副教授主編
  • Spark 1.6.0 新手快速入門
    /bin/spark-shell Spark最重要的一個抽象概念是彈性分布式數據集(Resilient Distributed Dataset)簡稱RDD。RDDs可以通過Hadoop InputFormats(例如HDFS文件)創建,也可以由其它RDDs轉換而來。
  • SparkSQL基礎及實戰練習
    Spark的入門測試        首先讓我們準備好該題所需的數據 test.txt        數據結構如下依次是:班級 姓名 年齡 性別 科目 成績12 宋江 25 男 chinese 5012 宋江
  • 『 Spark 』13. Spark 2.0 Release Notes 中文版
    >Kryo 升級到 3.0java 中,RDD.flatMap 和 RDD.mapPartitions 中的函數不需要返回所有數據,只需要能返回一個迭代器即可Java RDD’s countByKey and countAprroxDistinctByKey now returns a map from K to java.lang.Long, rather than
  • 大數據分析工程師入門9-Spark SQL
    本文為《大數據分析師入門課程》系列的第9篇,在本系列的第8篇-Spark基礎中,已經對Spark做了一個入門介紹,在此基礎上本篇拎出Spark SQL,主要站在使用者的角度來進行講解,需要注意的是本文中的例子的代碼均使用Scala語言。
  • Spark SQL重點知識總結
    通過SQL的形式將數據的計算任務轉換成了MapReduce。2、統一的數據訪問方式,Spark SQL提供標準化的SQL查詢。3、Hive的繼承,Spark SQL通過內嵌的hive或者連接外部已經部署好的hive案例,實現了對hive語法的繼承和操作。
  • 『 Spark 』2. spark 基本概念解析
    寫這樣一個系列僅僅是為了梳理個人學習spark的筆記記錄,並非為了做什麼教程,所以一切以個人理解梳理為主,沒有必要的細節就不會記錄了。若想深入了解,最好閱讀參考文章和官方文檔。其次,本系列是基於目前最新的 spark 1.6.0 系列開始的,spark 目前的更新速度很快,記錄一下版本好還是必要的。1.
  • 詭異 | Spark使用get_json_object函數
    2.2 分析數據量發現其實並不大,也就幾十萬。以前上千萬的數據量都不需要這麼多內存。因此懷疑是get_json_object函數引起的問題。2.3 去掉get_json_object函數,果然任務非常流暢,而且內存調回6G依然是成功的。
  • Spark-TFRecord: Spark將全面支持TFRecord
    本項目的目的是將TFRecord 作為Spark數據源社區中的第一類公民,類似於 Avro,JSON,Parquet等。Spark-TFRecord 不僅僅提供簡單的功能支持,比如 Data Frame的讀取、寫入,還支持一些高階功能,比如ParititonBy。使用 Spark-TFRecord 將會使數據處理流程與訓練工程完美結合。
  • Apache Spark大數據分析入門(一)
    Apache Spark的出現讓普通人也具備了大數據及實時數據分析能力。鑑於此,本文通過動手實戰操作演示帶領大家快速地入門學習Spark。本文是Apache Spark入門系列教程(共四部分)的第一部分。
  • 簡化TensorFlow和Spark互操作性:LinkedIn開源Spark-TFRecord
    TFRecord 是 TensorFlow 的原生數據結構,在 Apache Spark 中並不完全受支持。最近,LinkedIn 工程師開源了 Spark-TFRecord,這是一個基於 TensorFlow TFRecord 的 Spark 新的原生數據源。LinkedIn 決定著手解決這一問題,並不令人感到驚訝。
  • 實戰課堂 | 手把手教你用MongoDB Spark Connector構建分析應用
    3,統一構建:支持多種數據源,通過 Spark RDD 屏蔽底層數據差異,同一個分析應用可運行於不同的數據源;4,應用場景廣泛:能同時支持批處理以及流式處理MongoDB Spark Connector 為官方推出,用於適配 Spark 操作 MongoDB 數據;本文以Python
  • Spark入門介紹
    SparkCore中還包含了對彈性分布式數據集(Resilient Distributed DataSet,簡稱RDD)的API定義。Spark SQL:是Spark用來操作結構化數據的程序包。通過Spark SQL,我們可以使用 SQL或者Apache Hive版本的SQL方言(HQL)來查詢數據。Spark SQL支持多種數據源,比如Hive表、Parquet以及JSON等。
  • 2小時入門SparkSQL編程
    本節我們將主要介紹以下主要內容:import findspark#指定spark_home為剛才的解壓路徑,指定python路徑spark_home = "/Users/liangyun/ProgramFiles/spark-3.0.1-bin-hadoop3.2
  • 大數據入門:Spark RDD、DataFrame、DataSet
    尤其是在涉及到數據結構的部分,理解清楚這三者的共性與區別,非常有必要。今天的大數據入門分享,我們就主要來講講Spark RDD、DataFrame、DataSet。首先從版本的產生上來看:RDD(Spark1.0)—>Dataframe(Spark1.3)—>Dataset(Spark1.6)如果同樣的數據都給到這三個數據結構,他們分別計算之後,都會給出相同的結果。