如何使用PySpark來利用機器學習模型對流數據進行預測?

2021-01-09 讀芯術

全文共6787字,預計學習時長20分鐘

來源:Pexels

概述

流數據是一個在機器學習領域蓬勃發展的概念學習如何使用PySpark來利用機器學習模型對流數據進行預測我們將介紹流數據和Spark Streaming的基礎知識,然後深入到實現部分

引言

想像一下——每一秒都有8,500多條推文發布,900多張照片被上傳到Instagram,4,200多個Skype呼叫,78,000多次Google搜索,以及200多萬封電子郵件被發送(數據來自InternetLive Stats)。

我們正在以前所未有的速度和規模生產數據。這是在數據科學領域工作的大好時候!但是有了大量的數據後,接踵而至的是複雜的挑戰。

首要,如何收集大規模的數據?如何確保一旦生成並收集數據,機器學習管道就會繼續產生結果?這些都是業界面臨的重大挑戰,以及為什麼流數據的概念在企業中越來越受到關注。

增加處理流數據的能力將極大地擴展當前的數據科學產品投資組合。這是業界急需的技能,若能熟練掌握它,將幫助你擔負起下一個數據科學角色。

因此,在本文中,我們將學習什麼是流數據,了解Spark Streaming的基礎知識,然後在一個業界相關的數據集上使用Spark實現流數據。

目錄

1. 什麼是流數據?

2. Spark Streaming的基礎知識

3. 離散流

4. 緩存

5. 檢查點

6. 流數據的共享變量

7. 累加器變量

8. 廣播變量

9. 使用PySpark對流數據進行情感分析

什麼是流數據?

社交媒體產生的數據是驚人的。你敢於想像存儲所有數據需要些什麼嗎?這是一個複雜的過程!因此,在深入探討本文的Spark方面之前,先來理解什麼是流數據。

流數據沒有離散的開始或結束。這些數據是每秒從數千個數據源中生成的,它們需要儘快進行處理和分析。大量流數據需要實時處理,例如Google搜索結果。

我們知道,在事件剛發生時一些見解會更有價值,而隨著時間的流逝它們會逐漸失去價值。以體育賽事為例——我們希望看到即時分析,即時統計見解,在那一刻真正享受比賽,對吧?

例如,假設你正在觀看一場羅傑·費德勒(Roger Federer)對戰諾瓦克·喬科維奇(Novak Djokovic)的激動人心的網球比賽。

這場比賽兩局打平,你想了解與費德勒的職業平均水平相比,其反手發球的百分比。是在幾天之後看到有意義,還是在決勝局開始前的那一刻看到有意義呢?

來源:Pexels

Spark Streaming的基礎知識

Spark Streaming是核心Spark API的擴展,可實現實時數據流的可伸縮和容錯流處理。

在轉到實現部分之前,先了解一下Spark Streaming的不同組成部分。

離散流

離散流(Dstream)是一個連續的數據流。對於離散流,其數據流可以直接從數據源接收,也可以在對原始數據進行一些處理後接收。

構建流應用程式的第一步是定義要從中收集數據的數據資源的批處理持續時間。如果批處理持續時間為2秒,則將每2秒收集一次數據並將其存儲在RDD中。這些RDD的連續序列鏈是一個DStream,它是不可變的,可以通過Spark用作一個分布式數據集。

考慮一個典型的數據科學項目。在數據預處理階段,我們需要轉換變量,包括將分類變量轉換為數字變量,創建分箱,去除異常值和很多其他的事。Spark保留了在數據上定義的所有轉換的歷史記錄。因此,無論何時發生故障,它都可以追溯轉換的路徑並重新生成計算結果。

我們希望Spark應用程式7 x 24小時持續運行。並且每當故障發生時,我們都希望它能儘快恢復。但是,在大規模處理數據的同時,Spark需要重新計算所有轉換以防出現故障。可以想像,這樣做的代價可能會非常昂貴。

緩存

這是應對該挑戰的一種方法。我們可以暫時存儲已計算(緩存)的結果,以維護在數據上定義的轉換的結果。這樣,當發生故障時,就不必一次又一次地重新計算這些轉換。

DStreams允許將流數據保留在內存中。當我們要對同一數據執行多種運算時,這很有用。

檢查點

高速緩存在正常使用時非常有用,但是它需要大量內存。並不是每個人都有數百臺具有128 GB內存的計算機來緩存所有內容。

檢查點的概念能夠有所幫助。

檢查點是另一種保留轉換後的數據框結果的技術。它將不時地將正在運行的應用程式的狀態保存在任何可靠的存儲介質(如HDFS)上。但是,它比緩存慢,靈活性也更差。

在擁有流數據時可以使用檢查點。轉換結果取決於先前的轉換結果,並且需要保存以供使用。此外,我們還存儲檢查點元數據信息,例如用於創建流數據的配置以及一系列DStream操作的結果等。

流數據的共享變量

有時候需要為必須在多個集群上執行的Spark應用程式定義諸如map,reduce或filter之類的函數。在函數中使用的變量會被複製到每臺機器(集群)中。

在這種情況下,每個集群都有一個不同的執行器,我們想要一些可以賦予這些變量之間關係的東西。

例如:假設Spark應用程式在100個不同的集群上運行,它們捕獲了來自不同國家的人發布的Instagram圖片。

現在,每個集群的執行者將計算該特定集群上的數據的結果。但是我們需要一些幫助這些集群進行交流的東西,以便獲得匯總結果。在Spark中,我們擁有共享變量,這些變量使此問題得以克服。

累加器變量

用例包括發生錯誤的次數,空白日誌的數量,我們從特定國家收到請求的次數——所有這些都可以使用累加器解決。

每個集群上的執行程序將數據發送回驅動程序進程,以更新累加器變量的值。 累加器僅適用於關聯和可交換的運算。例如,對求和和求最大值有用,而求平均值不起作用。

廣播變量

當我們使用位置數據(例如城市名稱和郵政編碼的映射)時,這些是固定變量,是吧?現在,如果每次在任意集群上的特定轉換都需要這種類型的數據,我們不需要向驅動程序發送請求,因為它會太昂貴。

相反,可以在每個集群上存儲此數據的副本。這些類型的變量稱為廣播變量。

廣播變量允許程式設計師在每臺計算機上保留一個只讀變量。通常,Spark使用高效的廣播算法自動分配廣播變量,但是如果有任務需要多個階段的相同數據,也可以定義它們。

使用PySpark對流數據進行情感分析

是時候啟動你最喜歡的IDE了!讓我們在本節中進行編碼,並以實踐的方式理解流數據。

理解問題陳述

在本節我們將使用真實數據集。我們的目標是檢測推文中的仇恨言論。為了簡單起見,如果一條推文包含帶有種族主義或性別歧視情緒的言論,我們就認為該推文包含仇恨言論。

因此,任務是將種族主義或性別歧視的推文從其他推文中區分出來。我們將使用包含推文和標籤的訓練樣本,其中標籤「1」表示推文是種族主義/性別歧視的,標籤「0」則表示其他種類。

來源:TechCrunch

為什麼這是一個與主題相關的項目?因為社交媒體平臺以評論和狀態更新的形式接收龐大的流數據。該項目將幫助我們審核公開發布的內容。

設置項目工作流程

1. 模型構建:構建邏輯回歸模型管道,對推文中是否包含仇恨言論進行分類。在這裡,我們的重點不是建立一個完全準確的分類模型,而是了解如何在流數據上使用任意模型並返回結果

2. 初始化Spark Streaming的環境:一旦模型構建完成,需要定義獲取流數據的主機名和埠號

3. 流數據:接下來,從定義的埠添加來自netcat伺服器的推文,SparkStreaming API將在指定的持續時間後接收數據

4. 預測並返回結果:一旦接收到推文,就將數據傳遞到創建的機器學習管道中,並從模型中返回預測的情緒

這是對工作流程的簡潔說明:

訓練數據以建立邏輯回歸模型

我們在一個CSV文件中存儲推文數據及其相應的標籤。使用邏輯回歸模型來預測推文是否包含仇恨言論。如果是,則模型預測標籤為1(否則為0)。你可以參考「面向初學者的PySpark」來設置Spark環境。

可以在這裡下載數據集和代碼。

首先,需要定義CSV文件的模式。否則,Spark會將每列數據的類型都視為字符串。讀取數據並檢查模式是否符合定義:

# importing required librariesfrom pyspark import SparkContextfrom pyspark.sql.session import SparkSessionfrom pyspark.streaming import StreamingContextimport pyspark.sql.types as tpfrom pyspark.ml import Pipelinefrom pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssemblerfrom pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizerfrom pyspark.ml.classification import LogisticRegressionfrom pyspark.sql import Row# initializing spark sessionsc = SparkContext(appName="PySparkShell")spark = SparkSession(sc)# define the schemamy_schema = tp.StructType([ tp.StructField(name='id', dataType= tp.IntegerType(), nullable=True), tp.StructField(name='label', dataType= tp.IntegerType(), nullable=True), tp.StructField(name='tweet', dataType= tp.StringType(), nullable=True)])# read the datasetmy_data = spark.read.csv('twitter_sentiments.csv', schema=my_schema, header=True)# view the datamy_data.show(5)# print the schema of the filemy_data.printSchema()

定義機器學習管道的各個階段

現在已經將數據保存在Spark數據框中,需要定義轉換數據的不同階段,然後使用它從模型中獲取預測的標籤。

在第一階段,使用RegexTokenizer將推特文本轉換為單詞列表。然後,從單詞列表中刪除停用詞並創建詞向量。在最後階段,使用這些詞向量來構建邏輯回歸模型並獲得預測的情緒。

記住——重點不是建立一個完全準確的分類模型,而是要看看如何在流數據上使用預測模型來獲取結果。

# define stage 1: tokenize the tweet text stage_1 = RegexTokenizer(inputCol='tweet' , outputCol='tokens', pattern='\\W')# define stage 2: remove the stop wordsstage_2 = StopWordsRemover(inputCol='tokens', outputCol='filtered_words')# define stage 3: create a word vector of the size 100stage_3 = Word2Vec(inputCol='filtered_words', outputCol='vector', vectorSize=100)# define stage 4: Logistic Regression Modelmodel = LogisticRegression(featuresCol='vector', labelCol='label')

設置機器學習管道

讓我們在Pipeline對象中添加階段,然後按順序執行這些轉換。用訓練數據集擬合管道,現在,每當有了新的推文,只需要將其傳遞給管道對象並轉換數據即可獲取預測:

# setup the pipelinepipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])# fit the pipeline model with the training datapipelineFit = pipeline.fit(my_data)

流數據和返回結果

假設每秒收到數百條評論,我們希望通過阻止用戶發布仇恨言論來保持平臺整潔。因此,每當我們收到新文本,都會將其傳遞到管道中並獲得預測的情緒。

我們將定義一個函數get_prediction,該函數將刪除空白句子並創建一個數據框,其中每一行都包含一條推文。

初始化Spark Streaming的環境並定義3秒的批處理持續時間。這意味著我們將對每3秒收到的數據進行預測:

# define a function to compute sentiments of the received tweetsdefget_prediction(tweet_text): try: # filter the tweets whose length is greater than 0 tweet_text = tweet_text.filter(lambda x: len(x) >0) # create a dataframe with column name 'tweet' and each row will contain the tweet rowRdd = tweet_text.map(lambda w: Row(tweet=w)) # create a spark dataframe wordsDataFrame = spark.createDataFrame(rowRdd) # transform the data using the pipeline and get the predicted sentiment pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show() except : print('No data')# initialize the streaming contextssc = StreamingContext(sc, batchDuration=3)# Create a DStream that will connect to hostname:port, like localhost:9991lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))# split the tweet text by a keyword 'TWEET_APP' so that we can identify which set of words is from a single tweetwords = lines.flatMap(lambda line : line.split('TWEET_APP'))# get the predicted sentiments for the tweets receivedwords.foreachRDD(get_prediction)# Start the computationssc.start() # Wait for the computation to terminatessc.awaitTermination()

在一個終端上運行該程序,然後使用Netcat(用於將數據發送到定義的主機名和埠號的實用工具)。你可以使用以下命令啟動TCP連接:

nc -lk port_number

最後,在第二個終端中鍵入文本,你將在另一個終端中實時獲得預測。

完美!

結語

流數據在未來幾年只會越來越熱門,因此應該真正開始熟悉這一主題。請記住,數據科學不只是建立模型——整個流程都需要關注。

本文介紹了SparkStreaming的基礎知識以及如何在真實的數據集上實現它。我鼓勵大家使用另一個數據集或抓取實時數據來實現剛剛介紹的內容(你也可以嘗試其他模型)。

期待在下面的評論區聽取你對本文的反饋以及想法。

留言點讚關注

我們一起分享AI學習與發展的乾貨

如轉載,請後臺留言,遵守轉載規範

相關焦點

  • 如何在Apache Pyspark中運行Scikit-learn模型
    在本文中,我們將了解如何在Apache Pyspark中運行Scikit-learn模型,並討論有關每個步驟的細節。如果您已經準備好了機器學習模型,則可以直接跳到「 pyspark wrapper」部分,也可以通過以下步驟創建一個簡單的scikit learn機器學習模型。
  • 如何使用XGBoost模型進行時間序列預測
    字幕組雙語原文:如何使用XGBoost模型進行時間序列預測英語原文:How to Use XGBoost for Time Series Forecasting翻譯:雷鋒字幕組(Shangru)XGBoost是在有效解決分類和回歸問題的一種梯度提升方法。在廣泛的預測模型任務中,它快且有效,性能好。
  • 使用Flask部署機器學習模型
    我所學的一切都集中在模型構建組件上。沒有多少人會談論如何部署你的機器學習模型。把你的模型投入生產意味著什麼?它需要什麼?這些都是每個數據科學家需要回答的關鍵的職業定義問題。這就是為什麼我決定寫下這個教程來演示如何使用Flask來部署機器學習模型。
  • Python機器學習7:如何保存、加載訓練好的機器學習模型
    本文將介紹如何使用scikit-learn機器學習庫保存Python機器學習模型、加載已經訓練好的模型。學會了這個,你才能夠用已有的模型做預測,而不需要每次都重新訓練模型。本文將使用兩種方法來實現模型的保存和加載:Pickle和joblib。
  • 機器學習模型成為NASA最新預測颶風強度的背後技術
    因此,為了預測未來的颶風強度,美國宇航局位於南加州的噴氣推進實驗室的科學家們提出了一種機器學習模型,聲稱可以準確預測颶風未來的快速強度事件。首先,科學家們介紹了這些測量結果與未來颶風強度變化之間的經驗關係,並確定了他們可以用於業務預報模型的觀測結果。之後,科學家們展示了機器學習模型對大西洋和北太平洋東部地區快速加強的預測技巧,以及從衛星觀測中得出的聯合預測因子。在研究雲衛星數據中的冰和液態水含量時,研究人員意識到,加強型風暴和颶風強度的冰和液態水含量較高,但不一定高於削弱型颶風。
  • 使用Kafka本機模型伺服器進行流式機器學習
    機器學習(ML)包括有關歷史數據的模型訓練以及用於評分和預測的模型部署。雖然訓練大多是分批進行的,但評分通常需要大規模且可靠的實時功能。Apache Kafka在現代機器學習基礎架構中扮演著關鍵角色。下一代體系結構利用Kafka本機流模型伺服器而不是RPC(HTTP / gRPC)。
  • 使用算法模型,進行信息的匹配、流變和變化
    人工智慧不是人工智慧的代名詞,人工智慧代表的是數據科學的形式,根據數據科學的原理,使用算法模型,進行信息的匹配、流變和變化。在數據科學的訓練下,我們可以分析出目標客戶的預測程度,這是一種有效的轉化或對象的識別,並對潛在關係進行簡單的預測。預測未來需要的主要信息內容包括那些呢?
  • 「可解釋學習」利用SHAP對分子活性預測機器學習模型進行解釋
    文章利用可解釋學習方法SHAP(SHapley Additive exPlanation)對基於複雜機器學習模型的分子活性預測模型進行解釋,嘗試打開「黑箱」。目前,模型解釋的方法可以分為模型依賴(Model-specific)方法和模型不可知(Model-agnostic)方法。二者主要的區別在於是否限定使用模型的種類。前者主要指的是利用簡單的可解釋模型(例如線性回歸等)來進行學習,從而可以直接對模型進行解釋,但是這種方法常常會限制了模型的預測效果。後者不依賴於所使用的模型的限制,通過敏感性分析等方法對模型進行解釋,更具通用性。
  • 使用PyTorch進行主動遷移學習:讓模型預測自身的錯誤
    每當為一個特定目的而構建的機器學習模型適應於一個全新的用例時,你都可以感受到同樣的喜悅。如果這個用例碰巧是主動學習,那麼我們將把機器學習中最有趣的部分應用到解決機器學習中最重要的問題中:人類和人工智慧如何一起解決問題?
  • 「蝴蝶效應」也能預測了?看機器學習如何解釋混沌系統
    機器學習的方法能預測到的未來大大延長,比此前的預測方法能預測到的長了八倍,預測效果幾乎和真實情況完全匹配。  而且,這個算法對Kuramoto-Sivashinsky方程式本身一無所知;它只能看到方程式演進的數據。  這使機器學習方法變得更強大。因為,在許多情況下,由於不能確定描述混沌系統的方程式,動力學家無法對它們進行建模和預測。
  • 數據建模中分類與預測模型
    因此,本文基於上期數據預處理部分之後,介紹如何在清洗過後的數據基礎之上建立分類與預測模型,為此種模型的構建方法進行簡單介紹,輔助投資者對自身分析邏輯中的分析框架進行量化分析,方便其多元化的交易分析。  一、分類與預測的介紹  數據建模中分類與預測模型主要是尋求合適的分類模型並在此基礎之上進行未來預測。
  • 谷歌AI模型在即時預報降水的使用
    據外媒報導,幾周前,谷歌人工智慧(AI)使用了一個機器學習模型來改進對乳腺癌的篩查工作。現在,這家公司已經在即時預報降水中使用了卷積神經網絡(CNN)。谷歌 AI 研究人員在一篇名為《Machine Learning for Precipitation Nowcasting from Radar Images》的文章中提到了其在降水短期預測中對 CNN 的利用。
  • 鳶尾花預測:如何創建機器學習Web應用程式?
    圖源:unsplash數據科學的生命周期主要包括數據收集、數據清理、探索性數據分析、模型構建和模型部署。作為數據科學家或機器學習工程師,能夠部署數據科學項目非常重要,這有助於完成數據科學生命周期。通過既有框架(如Django或Flask)對傳統機器學習模型進行部署,可能是一項艱巨耗時的任務。本文就將展示如何在Python庫中使用streamlit,用不到50行的代碼構建一個簡單的基於機器學習的數據科學web應用程式。
  • 大講堂 | 預測時間敏感的機器學習模型建模與優化
    原標題:大講堂 | 預測時間敏感的機器學習模型建模與優化 雷鋒網AI研習社訊:機器學習模型現在已經廣泛應用在越來越多的領域比如地震監測,闖入識別,高頻交易;同時也開始廣泛的應用在行動裝置中比如通過邊緣計算。這些真實世界的應用在原有的模型精度基礎之上帶來很多實際約束比如預測要在很短或規定時間內完成。
  • 如何使用Pandas-Profiling進行探索性數據分析
    當開始一個新的機器學習項目時,獲得機器學習數據集之後的第一步就是要了解它。我們可以通過執行探索性數據分析(EDA)來實現這一點。這包括找出每個變量的數據類型、目標變量的分布、每個預測變量的不同值的數量、數據集中是否有重複值或缺失值等。進行EDA探索機器學習數據集的過程往往是非常耗時的。什麼是Pandas-Profiling?
  • 【原創】Johannes小組JCIM論文:用於預測Frequent Hitter的機器學習模型
    Kirchmair小組研發的Hit Dexter1就是這樣一款基於機器學習模型的Frequent Hitters預測軟體。(如圖1)作者通過主成分分析(PCA)和成對相似性分析,對PSA和CDRA兩種數據集進行比較,從圖中可以看出兩種模型的化學空間覆蓋情況基本吻合。雖然兩種模型的預測結果基本一致,但由於具有不同的敏感性,所以平行使用兩種分類器。
  • 科學家使用機器學習模型加速理論物理研究
    物理助理教授Phiala Shanahan說:「最終,我們在使用晶格場理論研究質子和核結構方面受到了計算限制。」「我們知道原則上如何解決很多有趣的問題,但我們就是沒有足夠的計算能力,儘管我們使用的是世界上最大的超級計算機。」為了突破這些限制,Shanahan領導了一個將理論物理學與機器學習模型結合起來的小組。
  • 物理所等利用機器學習方法預測材料性能獲進展
    研究過程中使用了支持向量機(Support Vector Machine)這種方法(圖1),通過構建多維空間,並在這個多維空間內對數據進行分割,從而建立輸入參量與輸出參量之間的關聯。該研究方法可通過不斷選擇新的參數對模型進行重複訓練,探討了合金的不同性質對其玻璃形成能力的影響(圖2)。
  • 數據科學家應該知道的頂級機器學習算法
    因為它迫使您考慮輸入數據的角色和模型準備過程。另外,選擇最適合您的問題的方法以獲得最佳結果。讓我們看一下機器學習算法中的三種不同的學習風格:監督學習基本上,在此監督式機器學習中,輸入數據稱為訓練數據,並且一次具有已知標籤或結果,例如垃圾郵件/非垃圾郵件或股票價格。在此,通過訓練過程準備了模型。另外,在此需要做出預測。並在這些預測錯誤時進行糾正。
  • 多種機器學習和統計模型預測個體患者臨床風險並不一致
    多種機器學習和統計模型預測個體患者臨床風險並不一致 作者:小柯機器人 發布時間:2020/11/8 22:27:12 英國曼徹斯特大學Tjeerd Pieter van Staa團隊研究了多種機器學習和統計模型預測個體患者臨床風險的一致性