從總覽的角度看你的數據,專注於數據生成部分。您看到的是生成批量數據的系統,還是連續生成數據的系統?提供數據流(也稱為streams)的系統幾年前不太流行。streams肯定會得到更多的關注,理解streams是本章的重點。
例如,您的手機會定期ping手機發射塔。如果是智慧型手機(本書的讀者群,很有可能),它將同時檢查電子郵件和其他內容。
通過(智能)城市的巴士會發送其GPS坐標。
超市收銀臺的收銀機在收銀員(或你)將商品從掃描儀前掃描時生成數據。交易是在你付款時進行的。
當你把車開進車庫時,大量的信息被收集、存儲並發送給其他各種接收者,比如製造商、保險公司或報告公司。
在美國,當病人進入醫療設施時,會生成包含原子信息的信息,稱為入院、出院和轉院(ADTs)。
像流數據這樣的業務;這使他們對發生的事情的時效性更敏銳。在我居住的北卡羅來納州,可能還有美國其他地方,一有災難宣布,人們就會奔向商店購買牛奶、水和鬆軟的麵包(不要問我他們會怎麼處理這些東西)。在商業方面,獲得正在出售的商品的實時feed實際上可以觸發配送中心更快的響應,向銷售最多的雜貨店供應更多的商品。在這種情況下,我不是在談論災難發生時,甚至是災難恢復時,而是能夠對市場需求作出更快的反應。2019年,數據更應該被視為一種流,而不是被填滿的煙囪。
在本章中,你將會發現什麼是流以及它與批處理模式的區別(如此之小)。然後您將構建第一個流處理應用程式。
為了讓流更容易模擬,我在本章的原始碼庫中構建並添加了一個流數據生成器,它對於模擬流非常有用。您可以根據需要調整這個生成器。附錄O涵蓋了數據生成器的詳細信息,這不是本章的先決條件。然而,我們將在本章的實驗中使用生成器。
從本章開始,示例和實驗將使用更多的日誌記錄,而不是簡單的屏幕列印。日誌簡化了讀取,並且是一種行業標準,它更符合您在日常工作中的期望(而println是一種糟糕的開發實踐)。日誌設置仍然會在控制臺上轉儲信息;不需要在一個偏僻的地方尋找日誌文件!
您還將使用一個更複雜的示例進行實驗,在該示例中,您將使用兩個流。最後,作為本章的總結,您將學習結構化流(從Apache Spark v2開始)和離散流(從Apache Spark v1開始)之間的區別。附錄P提供了關於流處理的額外資源。
圖10.1顯示了您在Apache Spark中接入數據的位置。好消息!這是關於數據接入的最後一章。
圖10.1 您獲取數據的旅程即將結束;這一章涵蓋結構化流數據接入。
實驗
本章的例子可以在GitHub上找到:https://github .com/jgperrin/net.jgp.books.spark.ch10。附錄L是接入參考。附錄P還提供了關於流處理的額外資源。
10.1 流處理是什麼?
在本節中,讓我們在Apache Spark的上下文中看看流計算。這將為您提供必要的基礎知識,以理解示例並將流集成到您的項目中。
通過流處理數據並不是一個新奇的想法。然而,近年來它越來越受歡迎。沒人願意再等什麼了。作為一個社會,我們一直都在期待直接的結果。你去看醫生,你希望回家時在醫療保健提供者的門戶網站上看到你的索賠。你把意外購買的電視退回到好市多(Costco),你希望馬上看到信用卡帳單上的信用記錄。你完成了Lyft之旅,希望馬上就能看到你的SkyMiles獎金。在一個數據加速發展的世界裡,流處理絕對有它的一席之地:沒有人願意等待通宵批處理。
使用流的另一個原因是,隨著數據量的增加,將其分割成小塊以減少尖峰時間的負載也是一個不錯的主意。
總的來說,流計算比傳統的批處理計算更自然,因為它發生在一個流中。然而,因為這是一個不同於你可能習慣的範式,你可能不得不改變你的思維方式。圖10.2說明了該系統。
圖10.2 在這個典型的流場景中,數據被生成、作為原子元素髮送、動態轉換,最後以處理後的形式提供實時報告、存儲等。
流通常有兩種形式:文件流和網絡流。在文件流場景中,文件被放置在一個目錄中(有時稱為著陸區或暫存區;(見第1章),Spark從這個目錄中獲取文件。在第二個場景中,數據通過網絡發送。
Apache Spark處理流的方式是在一小段時間內重新分組操作。這被稱為微批量。
10.2 創建第一個流處理程序
對於您的第一個流處理,您將使用文件。文件在文件夾中生成。Spark在文件生成時,處理這些文件。這種更簡單的場景避免了潛在的網絡問題,並將說明流的核心原則,即在數據可用時立即使用數據。
文件流在醫療保健行業是一個常見的用例。醫院(提供者)可以將文件轉儲到FTP伺服器上,然後由保險公司(支付方)獲取這些文件。
在這個場景中,您將運行兩個應用程式。對於文件流,啟動應用程式的順序無關緊要。一個應用程式將生成流,其中包含描述人員的記錄。另一個應用程式將使用Spark來使用生成的流。您將從數據生成器開始,然後構建使用者。圖10.3說明了這個過程。
圖10.3 在這個流場景中,一個應用程式(生成器)在一個文件中生成記錄,並將這些文件放在一個文件夾中。Spark應用程式(消費者)讀取該文件夾中的文件並顯示數據。
10.2.1 生成文件流
要模擬穩定的數據流,首先要啟動記錄生成器。在本節中,您將查看生成器的輸出,學習如何編譯和運行它,然後簡要查看它的代碼。
為了使事情更容易一些,我在本章的存儲庫中添加了一個記錄生成器。記錄生成器的設計目的是創建帶有描述人名的隨機記錄的文件:他們有名字、中間名、姓氏、年齡和美國社會安全號碼。嘿,這是假數據,所以不要以為你可以用它來冒充任何人!
在本節中,您將運行生成器,但不會對其進行太多修改。附錄O描述了如何修改生成器或基於生成器所基於的easy API構建自己的生成器。
運行時,RecordsInFilesGeneratorApp的輸出如下所示。
#清單10.1流生成器的輸出[INFO] Scanning for projects.[INFO] --- exec-maven-plugin:1.6.0:java (write-file-in-stream) @sparkInAction-chapter10 ---2020-11-13 12:14:12.496 -DEBUG --- [treamApp.main()] ure.getRecords(RecordStructure.java:131): Generated data:Aubriella,Silas,Gillet,62,373-69-4505Reese,Clayton,Kochan,2,130-00-2393Trinity,Sloan,Vieth,107,202-34-4161Daphne,Forrest,Huffman,77,250-50-6797Emmett,Heath,Golston,41,133-17-2450Alex,Orlando,Courtier,32,290-51-1937Titan,Deborah,Mckissack,89,073-83-0162
2020-11-13 12:14:12.498 - INFO --- [treamApp.main()] erUtils.write(RecordWriterUtils.java:21): Writing in: /var/folders/v7/ 3jv0[...]/T/streaming/in/contact_1542129252485.txt ...在Eclipse這樣的IDE中同時運行兩個應用程式並不容易。因此,我確保您可以使用Maven從命令行運行所有的實驗。如果您不熟悉Maven,附錄B介紹了Maven的安裝,附錄H提供了一些使用Maven的技巧。
在本地複製了存儲庫之後,轉到存放項目的pom.xml文件的文件夾。在本例中,示例如下:
$ cd /Users/jgp/Workspaces/Books/net.jgp.books.spark.ch10然後清理和編譯您的數據生成應用程式。首先,您不會修改它,而只是編譯並運行它。清理將確保不留下已編譯的工件。第一次肯定不需要,因為沒有什麼要清洗的。編譯將簡單地構建應用程式:
如果Maven開始下載很多包(或者沒有),不要驚慌,除非您是根據您所使用的數據量為internet服務付費。然後運行這個:
$ mvn exec:java@generate-records-in-files這個命令將在由generate -records-in-files ID定義的pom.xml文件中執行應用程式,如清單10.2所示。本章中的示例依賴於您的pom.xml中的id來區分不同的應用程式。當然,您也可以在IDE中運行所有的應用程式。
#清單10.2 pom.xml的摘錄...<build> <plugins> <plugin> <groupId> org.codehaus.mojo </groupId> <artifactId> exec-maven-plugin </artifactId> <version> 1.6.0 </version> <executions> <execution> <id> generate-records-in-files </id>//定義被調用的「塊」的唯一標識符 <goals> <goal> java </goal> </goals> <configuration> <mainClass> net.jgp.books.spark.ch10.x.utils.streaming. app.RecordsInFilesGeneratorApp </mainClass> #2 </configuration> </execution> ...下面的清單顯示了生成器代碼。附錄O詳細介紹了生成器、生成器的API以及它的可擴展性。
#清單10.3 RecordsInFilesGeneratorApp.javapackage net.jgp.books.spark.ch10.x.utils.streaming.app;
import net.jgp.books.spark.ch10.x.utils.streaming.lib.*; public class RecordsInFilesGeneratorApp { public int streamDuration = 60; public int batchSize = 10; public int waitTime = 5;
public static void main(String[] args ) { RecordStructure rs = new RecordStructure( "contact" ) .add( "fname" , FieldType. FIRST_NAME ) .add( "mname" , FieldType. FIRST_NAME ) .add( "lname" , FieldType. LAST_NAME ) .add( "age" , FieldType. AGE ) .add( "ssn ", FieldType. SSN );
RecordsInFilesGeneratorApp app = new RecordsInFilesGeneratorApp(); app .start( rs ); }
private void start(RecordStructure rs ) { long start = System.currentTimeMillis(); while ( start + streamDuration * 1000 > System.currentTimeMillis()) { int maxRecord = RecordGeneratorUtils.getRandomInt( batchSize ) + 1; RecordWriterUtils.write( rs .getRecordName() + "_" + System.currentTimeMillis() + ".txt" , rs .getRecords( maxRecord , false )); try { Thread.sleep(RecordGeneratorUtils.getRandomInt( waitTime * 1000) + waitTime * 1000 / 2);... } catch (InterruptedException e ) { } } } }你可以修改參數(streamDuration, batchSize和waitTime)和記錄的結構來研究各種行為:
streamDuration以秒為單位定義流的持續時間。預設值為60秒(1分鐘)。
batchSize定義單個事件中記錄的最大數量。默認值為10意味著生成器最多生成10條記錄。
waitTime是生成器在兩個事件之間等待的時間。這個值有一些隨機性:默認值5毫秒意味著應用程式將在2.5毫秒(= 5 / 2)到7.5毫秒(= 5 × 1.5)之間等待。
10.2.2 消費記錄
現在文件夾已經被文件中的記錄填滿了,您可以使用Spark來消費它們。您將首先查看記錄顯示的方式,然後深入研究代碼。
實驗
這裡是200號實驗。它可以在GitHub上訪問https://github.com/jgperrin/ net.jgp.books.spark.ch10。這個應用程式是net.jgp.books.spark.ch10.lab200_read_stream包中的ReadLinesFromFileStreamApp.java類。
實驗只需要接入這些記錄,將它們存儲在一個dataframe中(是的,與您之前使用的dataframe相同),然後在控制臺上顯示結果。圖10.4說明了這個過程。
圖10.4從流中獲取數據的過程:初始化Spark和流之後,您的自定義代碼將在獲取的數據上執行。
下面的清單顯示了應用程式的輸出。
#清單10.4 ReadLinesFromFileStreamApp的輸出2020-11-16 14:13:54.924 -DEBUG --- [ main]➥ tart(ReadLinesFromFileStreamApp.java:29): -> start()---Batch: 0---+----+|value |+----+|Mara,Jamison,Acy,52,492-23-4955 ||Ariel,Raegan,Abend,104,007-31-2841||Kynlee,Ari,Bevier,106,439-70-9576 |+----+only showing top 3 rows
---Batch: 1---+----+|value |+----+|Conrad,Alex,Gutwein,34,998-04-4584||Aldo,Adam,Ballard,6,996-95-8983 |+----+...2020-11-16 14:14:59.648 -DEBUG --- [ main] tart(ReadLinesFromFileStreamApp.java:58): <- start()要啟動接入應用程式,可以直接在IDE中(本例中為Eclipse)或通過Maven運行它。在複製項目的同一目錄下,在另一個終端中,運行以下命令:
$ cd /Users/jgp/Workspaces/Book/net.jgp.books.spark.ch10$ mvn clean install $ mvn exec:exec@lab200注意,這裡使用的是exec:exec,而不是exec:java。通過使用exec:exec, Maven將啟動一個新的JVM來運行您的應用程式。通過這種方式,您可以向JVM傳遞參數。下面的清單顯示了負責執行應用程式的pom.xml部分。
#清單10.5執行實驗#200的pom.xml部分,ReadLinesFromFileStreamApp... <execution> <id> lab200 </id> <configuration> <executable> java </executable> <arguments> <argument> -classpath </argument> <classpath /> <argument> net.jgp.books.spark.ch10.lab200_read_stream. ReadLinesFromFileStreamApp </argument> </arguments> </configuration> </execution> ...讓我們分析一下net.jgp.books.spark.ch10中的ReadLinesFromFileStreamApp應用程式中的代碼。清單10.6中的lab200_read_stream包。我知道在原始碼的開頭有這麼一大塊導入並不總是吸引人的,但是隨著底層框架(這裡是Apache Spark)的各種發展,我希望確保您使用正確的包。
從本書的這一點開始,我將使用日誌記錄(SLF4J包)而不是println。日誌記錄是行業標準,而println可能會嚇到我們中的一些人(比如將您不希望用戶看到的信息轉儲到控制臺)。我不會在書中描述每個實驗中日誌的初始化,以保持代碼清晰。然而,在存儲庫中,您將找到每個示例的初始化(否則,它將不起作用,對嗎?)
無論您計劃使用流處理還是批處理的數據處理方式,創建Spark會話(Spark session)都沒有區別。
有了會話之後,可以使用readStream()方法請求會話從流中讀取數據。根據流的類型,它將需要額外的參數。這裡從一個目錄(由load()方法定義)讀取一個文本文件(由format()方法指定)。注意,format()的參數是一個String,而不是Enum,但是沒有什麼可以阻止您在某處使用一個小的實用程序類(例如,使用常量)。
到目前為止,還算簡單,不是嗎?你啟動一個會話,構建一個dataframe,從一個流讀取它。然而,在流中,數據可能存在,也可能不存在,也可能尚未到來。因此,應用程式需要等待數據的到來,有點像伺服器等待請求。數據寫入是通過dataframe的writeStream()方法和StreamingQuery對象完成的。
首先從作為流使用的dataframe定義流查詢對象。查詢將開始填充結果表,如圖10.5所示。結果表隨著輸入的數據而增長。
要構建你的查詢,你需要指定以下內容:
輸出模式outputMode(參見附錄P的輸出模式列表)。在這個實驗裡,你只顯示兩個接收信號之間的更新。格式format,它基本上說明了如何處理接收到的數據。在這個實驗中,將在控制臺上顯示它(而不是通過日誌記錄)。在文獻中,當提到輸出格式時,經常會讀到關於sink的內容。還可以參考附錄P了解不同的接收及其描述。選項option。在顯示到控制臺的情況下,truncate選項設置為false意味著記錄不會被截斷到某個長度,numRows指定最多顯示三條記錄。這相當於在非流(批處理)模式下在dataframe上調用show(3, false)。指定輸出模式、格式和選項之後,就可以開始查詢了。
當然,現在應用程式需要等待數據進入。它是通過查詢的awaitterminate()方法實現的。awaitterminate()是一個阻塞方法。它可以有參數,也可以沒有參數。如果沒有參數,該方法將永遠等待。通過一個參數,可以指定方法將等待的時間。在那些實驗裡,我一直用一分鐘。
圖10.5 當數據流接收到數據時,數據被添加到結果表中。結果表是一個無界的表,它可以像dataframe一樣增長(當然是根據集群的物理存儲容量)。
這是你第一次從流中獲取數據。在下一節中,您將從流中提取完整的記錄,而不僅僅是原始數據行。
清單10.6 ReadLinesFromFileStreamApp.javapackage net.jgp.books.spark.ch10.lab200_read_stream; import java.util.concurrent.TimeoutException; import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.streaming.OutputMode;import org.apache.spark.sql.streaming.StreamingQuery;import org.apache.spark.sql.streaming.StreamingQueryException;import org.slf4j.Logger;import org.slf4j.LoggerFactory; import net.jgp.books.spark.ch10.x.utils.streaming.lib.StreamingUtils; public class ReadLinesFromFileStreamApp { private static transient Logger log = LoggerFactory.getLogger( ReadLinesFromFileStreamApp. class ); public static void main(String[] args ) { ReadRecordFromFileStreamApp app = new ReadRecordFromFileStreamApp(); try { app .start(); } catch (TimeoutException e ) { log .error( "A timeout exception has occured: {}" , e .getMessage()); } } private void start() { log .debug( "-> start()" ); SparkSession spark = SparkSession.builder() .appName( "Read lines over a file stream" ) .master( "local" ) .getOrCreate(); Dataset<Row> df = spark .readStream() .format( "text" ) .load(StreamingUtils.getInputDirectory()); StreamingQuery query = df .writeStream() .outputMode(OutputMode.Append()) .format( "console" ) .option( "truncate" , false ) .option( "numRows" , 3) .start(); try { query .awaitTermination(60000); } catch (StreamingQueryException e ) { log .error( "Exception while waiting for query to end {}." , e .getMessage(), e ); } log .debug( "<- start()" ); } }注意,在Spark 3.0預覽版2中,StreamingQuery中的start()現在拋出一個超時異常,這需要管理。存儲庫中的代碼在相應的分支中相應地運行。
10.2.3 獲取記錄,而不是行
在前面的示例中,您吸收了諸如Conrad、Alex、Gutwein、34998 -04-4584等行。雖然數據在Spark中,但使用起來並不方便。它是原始的,您必須重新解析它,沒有數據類型. . . .讓我們通過使用模式將原始行轉換為記錄。
(未完待續.) 歡迎關注公眾號,及時獲得最新翻譯內容: