CoProcessFunction實戰三部曲之二:狀態處理

2020-12-16 紙鶴視界

歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

本篇概覽

本文是《CoProcessFunction實戰三部曲》的第二篇,咱們要實戰的是雙流連接場景下,處理一號流中的數據時,還要結合該key在二號流中的情況;最簡單的例子:aaa在一號流中的value和二號流的value相加,再輸出到下遊,如下圖所示,一號流中的value存入state,在二號流中取出並相加,將結果輸出給下遊:

本篇的內容就是編碼實現上圖的功能;參考文章

理解狀態:《深入了解ProcessFunction的狀態操作(Flink-1.10)》

源碼下載

如果您不想寫代碼,整個系列的源碼可在GitHub下載到,地址和連結信息如下表所示(https://github.com/zq2599/blog_demos):

這個git項目中有多個文件夾,本章的應用在flinkstudy文件夾下,如下圖紅框所示:

編碼

字符串轉Tuple2的Map函數,以及抽象類AbstractCoProcessFunctionExecutor都和上一篇《CoProcessFunction實戰三部曲之一:基本功能》一模一樣;新增AbstractCoProcessFunctionExecutor的子類AddTwoSourceValue.java,源碼如下,稍後會說明幾個關鍵點:package com.bolingcavalry.coprocessfunction;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.co.CoProcessFunction;import org.apache.flink.util.Collector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * @author will * @email zq2599@gmail.com * @date 2020-11-11 09:48 * @description 功能介紹 */public class AddTwoSourceValue extends AbstractCoProcessFunctionExecutor { private static final Logger logger = LoggerFactory.getLogger(AddTwoSourceValue.class); @Override protected CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> getCoProcessFunctionInstance() { return new CoProcessFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() { // 某個key在processElement1中存入的狀態 private ValueState<Integer> state1; // 某個key在processElement2中存入的狀態 private ValueState<Integer> state2; @Override public void open(Configuration parameters) throws Exception { // 初始化狀態 state1 = getRuntimeContext().getState(new ValueStateDescriptor<>("myState1", Integer.class)); state2 = getRuntimeContext().getState(new ValueStateDescriptor<>("myState2", Integer.class)); } @Override public void processElement1(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception { logger.info("處理元素1:{}", value); String key = value.f0; Integer value2 = state2.value(); // value2為空,就表示processElement2還沒有處理或這個key, // 這時候就把value1保存起來 if(null==value2) { logger.info("2號流還未收到過[{}],把1號流收到的值[{}]保存起來", key, value.f1); state1.update(value.f1); } else { logger.info("2號流收到過[{}],值是[{}],現在把兩個值相加後輸出", key, value2); // 輸出一個新的元素到下遊節點 out.collect(new Tuple2<>(key, value.f1 + value2)); // 把2號流的狀態清理掉 state2.clear(); } } @Override public void processElement2(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception { logger.info("處理元素2:{}", value); String key = value.f0; Integer value1 = state1.value(); // value1為空,就表示processElement1還沒有處理或這個key, // 這時候就把value2保存起來 if(null==value1) { logger.info("1號流還未收到過[{}],把2號流收到的值[{}]保存起來", key, value.f1); state2.update(value.f1); } else { logger.info("1號流收到過[{}],值是[{}],現在把兩個值相加後輸出", key, value1); // 輸出一個新的元素到下遊節點 out.collect(new Tuple2<>(key, value.f1 + value1)); // 把1號流的狀態清理掉 state1.clear(); } } }; } public static void main(String[] args) throws Exception { new AddTwoSourceValue().execute(); }}關鍵點之一:對於aaa這個key,無法確定會先出現在一號源還是二號源,如果先出現在一號源,就應該在processElement1中將value保存在state1中,這樣等到aaa再次出現在二號源時,processElement2就可以從state1中取出一號源的value,相加後輸出到下遊;關鍵點之二:如果輸出到下遊,就表示數據已經處理完畢,此時要把保存的狀態清理掉;如果您想了解低階函數中的狀態存取的更多細節,請參考《深入了解ProcessFunction的狀態操作(Flink-1.10)》驗證

分別開啟本機的9998和9999埠,我這裡是MacBook,執行nc -l 9998和nc -l 9999啟動Flink應用,如果您和我一樣是Mac電腦,直接運行AddTwoSourceValue.main方法即可(如果是windows電腦,我這沒試過,不過做成jar在線部署也是可以的);在監聽9998埠的控制臺輸入aaa,111,此時flink控制臺輸出如下,可見processElement1方法中,讀取state2為空,表示aaa在二號流還未出現過,此時的aaa是首次出現,應該放入state中保存:22:35:12,135 INFO AddTwoSourceValue - 處理元素1:(aaa,111)22:35:12,136 INFO AddTwoSourceValue - 2號流還未收到過[aaa],把1號流收到的值[111]保存起來在監聽9999埠的控制臺輸入bbb,123,flink日誌如下所示,表示bbb也是首次出現,把值保存在state中:22:35:34,473 INFO AddTwoSourceValue - 處理元素2:(bbb,123)22:35:34,473 INFO AddTwoSourceValue - 1號流還未收到過[bbb],把2號流收到的值[123]保存起來在監聽9999埠的控制臺輸入aaa,222,flink日誌如下,很明顯,之前保存在state中的值被取出來了,因此processElement2方法中,aaa在兩個數據源的值111和222會被相加後輸出到下遊,下遊是print,直接列印出來了:22:35:38,072 INFO AddTwoSourceValue - 處理元素2:(aaa,222)22:35:38,072 INFO AddTwoSourceValue - 1號流收到過[aaa],值是[111],現在把兩個值相加後輸出(aaa,333)至此,雙流場景下的狀態互通實踐咱們已經完成了,接下來的文章,會加上定時器和旁路輸出,將雙流場景的數據處理考慮得更加全面;你不孤單,欣宸原創一路相伴

Java系列Spring系列Docker系列kubernetes系列資料庫+中間件系列DevOps系列歡迎關注公眾號:程式設計師欣宸

微信搜索「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界...https://github.com/zq2599/blog_demos

相關焦點

  • CoProcessFunction實戰三部曲之三:定時器和側輸出
    定時器和側輸出本篇概覽本文是《CoProcessFunction實戰三部曲》的終篇,主要內容是在CoProcessFunction中使用定時器和側輸出,對上一篇的功能進行增強;回顧上一篇的功能:一號流收到aaa後保存在狀態中,直到二號流收到aaa,把兩個aaa的值相加後輸出到下遊;上述功能有個問題:二號流如果一直收不到aaa,下遊就一直沒有aaa的輸出,相當於進入一號流的aaa已經石沉大海了;今天的實戰就是修復上述問題:aaa在一個流中出現後,10秒之內如果出現在另一個流中,就像以前那樣值相加,輸出到下遊,如果10秒內沒有出現在另一個流
  • 實戰|秀兒,如何用一個欄位表示八個狀態啊
    二、解決之道通過一個int或者long欄位,來添加多個 標誌或者狀態.像這種,獨立狀態(標記)之間相互組合可以產生新的狀態(標記),且每個獨立狀態(標記)只有true或者false值的,我們可以使用位狀態的概念來管理這些狀態.
  • 學會這些,迅速脫離新手狀態—MACD與布林帶搭配實戰買賣技巧
    其次需要了解布林帶縮開口狀態的指導意義:布林線開口的意義:1,當價格由低位向高位經過數浪上升後,布林線最上壓力線和最下支撐線開口達到了極大程度,並開口不能繼續放大轉為收縮時,此時是賣出信號,通常價格緊跟著是一輪大幅下跌或調整行情.2,如布林線在高位開口極度縮小,一旦價格向下破位,布林線開口放大,一輪跌勢將不可避免.
  • Pick麥吉麗素顏三部曲,素顏也可以美
    原來她的上鏡寶藏好物就是麥吉麗素顏三部曲,簡單三步,一拍透亮細膩,二抹緊緻嫩膚,三拍亮膚美顏!無需過多「妝」飾,素顏也能美美出門!作為麥吉麗的護膚代言人,秦嵐卸下妝容用最真實的狀態告訴大家,用麥吉麗素顏三部曲,即使素顏也能盡享自信之美。麥吉麗素顏三部曲,內含1.5億酵母精粹濃縮,幫助肌膚告別暗沉,緊緻收縮毛孔,平衡肌膚水油,堅持四周讓肌膚呈現細膩透亮狀態。
  • 英文域名goods.co小六位數易主!
    西部數碼(west.cn)9月16日消息,據外媒消息,上周五英文域名Goods.co在sedo平臺以20,000美元成交,約合人民幣14萬元。域名Goods.co註冊於2010年2月,主體「goods」是通用英文單詞,有「商品、貨品、動產、私人財產、運載的貨物」等意思;後綴.co是哥倫比亞共和國國家域名,現多被看作是「corporation
  • 繃緊防控之弦 抓實戰時舉措
    繃緊防控之弦 抓實戰時舉措 2020-12-12 20:09 來源:澎湃新聞·澎湃號·政務
  • 《射鵰三部曲》十五大絕學排名,九陰真經和九陽真經並列第二
    射鵰英雄傳、神鵰俠侶、倚天屠龍記是一個系列的小說故事,被讀者稱為是射鵰三部曲。三部曲的整體武學水平很高,可以說僅次于天龍八部,因為三部曲的高深武學很多。這期咱們就來聊聊三部曲最厲害的十五大絕學,給它們做一個排名。
  • 《教父2》與《教父3》,教父三部曲的巔峰之決
    無論從類型片,黑幫片,歷史片,傳記片,或者是系列片的角度看,由弗朗西斯.福特.科波拉執導的《教父》三部曲都是影壇當之無愧的經典。若問,在合計長達8個多小時的三部影片中,哪一部堪稱三部曲的頂峰之作?相信絕大多數觀眾會推崇1974年的《教父2》。然而,西山以為,暗藏鋒磯的《教父3》,絲毫不遜於《教父2》,甚至在影片的內涵意義方面,有更多足堪玩味的地方。
  • 《封神三部曲》你知道是哪三部曲嗎
    電影《封神三部曲》由導演烏爾善執導,故事取材於小說名著《封神演義》,講述了一場三千多年前人、仙、妖之間曠日持久的神話戰爭的故事。導演烏爾善和他的團隊用了五年左右的時間打磨劇本,目前《封神三部曲》已經拍攝了一年的時間,據悉電影的外景片場佔地達到了五百畝,場地全部依靠劇組自行搭建。
  • 報名| CDA沙龍:Spark機器學習最新趨勢及實戰分享
    2016-11-15 機器之心Spark被標榜為「快如閃電的集群計算」,也是目前最活躍的Apache項目。Spark提供了一個更快、更通用的數據處理平臺。CDA數據分析師聯手機器之心,於11月18日舉辦Spark&機器學習技術沙龍,屆時將邀請兩位機器學習領域的專家現場分享「
  • 封神三部曲是哪幾部系列電影名 封神三部曲上映時間
    封神三部曲是哪幾部系列電影名 封神三部曲上映時間  烏爾善的電影一向都是很有特點的,這一次「封神」系列的電影還是很讓人期待的,有老搭檔陳坤加盟,電影品質還是不低的。  封神三部曲為哪幾部  《封神三部曲之妖亂國殤》、《封神三部曲之魔道爭鋒》、《封神三部曲之封神天下》。
  • OpenGLES2.0(二)實戰之繪製三角形
    在這上一篇介紹了OpenGLES2.0基本概述《OpenGLES2.0基本概述》,有沒看的讀者建議看過之後再看本篇實戰。正式進入今天實戰。選擇繪製三角形作為OpenGL ES 2.0的第一個實例,是因為前文中提到的,點、線、三角形是OpenGL ES世界的圖形基礎。
  • 《教父》三部曲之讀書筆記
    我堅持自己的這個道理數十年之久,但是我真的得到了我想要的嗎?也不盡然。我想,我的這個道理的弊病就在於,缺乏實際的行動力:我的喜歡與不喜歡,除了我自己、我的本心以外,其他人誰也看不到。然而,讓其他人知道我的喜歡與不喜歡,這一點重要嗎?當這份喜歡是與另外一個(些)人發生聯繫的時候,就很重要。在這其中,「決定一個人的是行為,言語只不過是個屁。」是對我的行為影響得最大的一句話。
  • 海外大佬超34萬元出售「足球」域名football.co!曾經手多個.co域名
    域名football.co是「足球」的意思,相信這個詞在全球範圍內都廣為人知。從品相上看,football.co通俗易懂,屬於足球行業的行業大米,可用來搭建足球電商、足球賽事直播網站等足球領域終端。加上近來年,網際網路體育的興盛,足球域名的市場價值也是有目共睹。
  • 什麼是帶薪實習CO-OP
    就業和移民是去加拿大留學的同學的首要考慮,相比中國來說,國外對大學生的招聘要求更嚴苛一些,好企業都願意招有工作經驗的人,所以說在加拿大找到工作的關鍵是一看經驗、二看人脈、三看學歷。所以co-op課程不負眾望地成為加拿大留學的熱門選擇,感覺這是就此走上人生巔峰的節奏啊,但是小夥伴們不要熄滅你們的好奇心,擠破頭的加拿大co-op到底是怎麼回事?
  • 加拿大co-op到底如何?
    就業和移民是去加拿大留學的同學的首要考慮,相比中國來說,國外對大學生的招聘要求更嚴苛一些,好企業都願意招有工作經驗的人,所以說在加拿大找到工作的關鍵是一看經驗、二看人脈、三看學歷。所以co-op課程不負眾望地成為加拿大留學的熱門選擇,感覺這是就此走上人生巔峰的節奏啊,但是小夥伴們不要熄滅你們的好奇心,擠破頭的加拿大co-op到底是怎麼回事?
  • 最新開放的.Co域名註冊嚮導
    註冊一個好域名,真的很有學問今天有一個比較不錯的消息,那就是.co域名即將於美國時間7月20日(北京時間大概7月21號凌晨2點)開放註冊,目前已經處於預註冊狀態。我也是經過三小時的鏖戰才選出一個自認為不錯的.co域名。
  • 陳藝搏愛情三部曲之《從那以後》 正式發布
    ­  搜狐娛樂訊 華語內地男歌手陳藝搏既三月份全網發布2017愛情三部曲之《分手在最愛你的時候》獲得一致好評後,將於6月6日推出愛情三部曲的第二首主打單曲《從那以後》。歌曲旋律宛轉悠揚,歌詞深情感人配合陳藝搏溫暖的聲線更是將歌曲想要表達的主題詮釋的淋漓盡致,想必將會是一首傳唱度極高的佳作。
  • 老四聊武俠第六期.射鵰三部曲十大高手排名,五絕墊底
    「一陽指」的武學修為已經達到【登峰造極】【玄功若神】之境,內力修為亦是雙鵰第一的最有力的爭奪者。第三次華山論劍,成為新五絕之一的「南僧」。一燈大師雖然沒有什麼拿得出手的實戰,強大隻限於理論。但老四卻不認同,畢竟一燈大師乃是得道高僧,怎麼可能去打打殺殺的?更何況是去痛下殺手?
  • 水泥地實戰球鞋推薦!米切爾二代降價接近一半!
    點擊播放 GIF 1.3M米切爾二代外場水泥地實戰的最終答案,小夥伴們提及最多的就是米切爾一代。而近段時間米切爾二代多個配色相繼發售,首發的2K限定配色的價格也已經來到了400+的水平,或許我們可以考慮米切爾二代了。作為「永迪」戰線中最重要的布局產品之一,米切爾二代的實戰性能和性價比肯定不用擔心。