Presto是一款Facebook開源的MPP架構的OLAP查詢引擎,可針對不同數據源執行大容量數據集的一款分布式SQL執行引擎。因為工作中接觸到Presto,研究它對理解SQL Parser、常見算子的實現(如SQL中table scan,join,aggregation)、資源管理與調度、查詢優化(如向量化執行、動態代碼生成)、大數據下各個組件為何適用不同場景等等都有幫助。我希望通過這個系列可以了解一條SQL在大數據場景下該如何高效執行。233醬準備不定時持續更新這個系列,本文主要從Presto的使用舉例,Presto的應用場景、Presto的基本概念三個部分來初步介紹Presto。
比如說,你想對存儲在不同數據源中的數據,如HDFS、Mysql、HBase等通過一個SQL做查詢分析,那麼只需要把每一個數據源當成是Presto的Connector,對應實現Presto SPI暴露出的Connector API就可以了。
hbase 和 es 的Join查詢舉例Presto官方版和Presto社區版已經支持了很多Connector,社區版略勝一籌。至於兩者有何區別,吃瓜群眾可以前往文末參考資料[4]。簡而言之,都主要由Facebook那幫大佬核心維護。社區版更新更為頻繁,但高版本需要JDK11才能支持;官方版JDK8就行,官方版的Star數是社區版的10倍左右,選哪個就一目了然了吧。
Presto的應用場景Presto是為了處理TB/PB級別的數據查詢和分析,它是OLAP(Online Analytical Processing)領域的一個計算引擎。參考資料[1]提到了Presto在Facebook中的使用場景有:
做過報表和大盤的小夥伴應該對這個場景下複雜的SQL有所了解。這個場景下的使用用戶是Facebook內部或外部人員,通常要求:高QPS,低時延(<1s)。
Adhoc分析Ad hoc是拉丁文「for this purpose」的意思,Adhoc query的查詢特點是海量、實時、靈活。數據量如PB級別以上,時延秒-分鐘級別,靈活性舉例子如下:
var adhoclQuery = "SELECT * FROM table WHERE id = " + myId;
var sqlQuery = "SELECT * FROM table WHERE id = 1";
adhoclQuery的結果取決於參數「myId」的值,它的結果不能被預計算。sqlQuery的結果每次執行可認為都一樣,它的結果可以被預計算。
典型應用場景如:用戶趨勢分析,產品市場洞察等。主要用戶是內部數據分析人員。
批處理批處理通常是指更大數據量的一個分析,可容忍高時延(小時-天級別)。Presto是為了低時延而設計的,它屬於內存型的MPP架構。並不適合類似Spark那樣的長時間離線跑批。參考資料[1]的視頻中分析了兩者架構的區別,Presto跑批的限制。這裡我截幾張PPT幫助大家理解:
兩者的架構區別:
所以他們提供了Presto on Spark方案,這樣做的好處是可以統一用戶使用的SQL方言差異,UDF差異。
當然,業界除了Facebook還有公司把Presto跑在Spark上來跑批嗎?我沒有搜到相關信息。
Presto的基本概念前面主要談了Presto的使用場景,下面簡要從 Presto的架構和基本術語上介紹Presto。
Presto架構Presto的架構圖如下:
Coordinator節點 負責接受客戶端請求、解析SQL語句、生成並優化分布式邏輯執行計劃、將計劃中的任務調度到Worker節點上,並跟蹤Worker節點和任務的執行狀態。
Worker節點 負責任務的執行,接受Coordinator節點的調度。
從中我們可以粗略看出一條SQL在Presto中的執行過程為:
1).Client發送一個SQL語句到Coordinator節點
2).Coordinator節點把請求放到隊列中,解析和分析其中的SQL語句;生成並且優化分布式邏輯執行計劃。
3).Coordinator節點會把這個Plan分解為任務,由多個Worker分布式執行。
要想了解具體的SQL執行過程,我們得先介紹下Presto的基本概念,也為下篇介紹「Presto為什麼是OLAP領域的實時計算引擎」的文章作準備>_<
基本術語我們很容易知道 statements 和 queries 的意思。作為一個使用者我們也應該熟悉 stages、 splits 這些概念使Presto儘可能高效執行queries;作為一個Presto管理員,應該理解 stages 是如何映射為tasks的,包含 drivers 集合 的 task 是如何處理數據的。以下將從一般到具體介紹Presto的基本術語。
Server TypesPresto包含兩種類型的服務端節點:coordinators 和 workers。
CoordinatorPresto中的Coordinator節點負責解析SQL語句,生成並優化物理執行計劃,管理Presto worker節點。它是Presto運行的「大腦」。它也是客戶端提交SQL語句的節點。每個運行的Presto集群包含1個Coordinator節點和1-多個Worker節點。一個服務示例可同時擔任這兩種節點角色。
Coordinator節點一直跟蹤每個Worker節點的狀態和協調查詢計劃的執行。Coordinator生成一個物理執行計劃模型,它包含一系列的stages,而stages會轉化為一系列的任務跑在workers節點上。
Coordinator通過REST API和workers 、 clients通信。
WorkerWorker節點負責執行tasks,處理數據。它從connectors中撈取數據,並且Worker節點之間可交換中間數據。coordinator節點負責合併workers的結果,並且返回結果給Client。
當一個Worker節點開始工作後,它會把自己註冊到coordinator的註冊服務上,從而使Coordinator節點可將task調度到自己執行。
Workers和其他Workers、coordinators之間都是通過REST API通信。
Data Sources諸如connector, catalog, schema, and table這些術語,都是和Presto的模型中:一種特定的數據源有關。
Connectorconnector是Presto中的一個數據源,可以是Hive、Mysql、Elasticsearch、HBase等。你可以把connector認為是一種資料庫驅動,只要實現Presto SPI 中暴露的相關接口,就可以接入一種Connector。
Presto自帶一些connectors:如JMX,System connector用來獲取system tables的,Hive connector,TPCH connector 用來性能測試用的,等等。
每一個catalog和一個特定的connector關聯。每一個catalog配置文件中有一個connector.name屬性,它是被catalog manager用來為一個給定的catalog創建一個connector。一個catalog可以使用相同的connector獲取類似資料庫的兩個實例。
CatalogPresto catalog包含schemas和通過Connector持有的數據源引用。比如:你可以配置一個ES catalog,就可以通過ES Connector提供從ES中獲取數據。
#elasticsearch.properties
connector.name=elasticsearch
elasticsearch.host=es host
elasticsearch.port=9200
elasticsearch.default-schema-name=es
當你在Presto上執行SQL時,你就在運行1-多個catalogs.在Presto上定位一張表,是通過一個catalog的全限定名確定的,如hive.test_data.test代表在hive catalog,test_data schema 下的一張test table.
Catalogs屬性文件是存儲在Presto配置目錄的,默認是Presto主安裝文件下的etc目錄下。
SchemaSchemas是一種組織tables的方式。一個catalog和一個catalog定義了一個可被查詢的table集合。對於MySQL這種關係型資料庫,Presto的schema是和MySQL中的schema相同的概念。對於其他類型的connector,如ES, Presto的schema是用來組織一些表到特定的schema中,從而使底層的數據源能夠在Presto層面說得通。
Tabletable是一組無序的Row集合,Row是一組有類型的column集合。和關係型資料庫中的概念一樣,table的映射是由connector中定義的。
Query Execution ModelPresto執行SQL語句,並且轉化為執行計劃,在由coordinator 和 workers組成的分布式集群上運行。
StatementPresto執行兼容ANSI標準的SQL。這些SQL statements包含子句,表達式,條件。
Presto把Statement 和 Query區分開是因為:在Presto中,statements是指Client提交上來的SQL語句,如:
SELECT * FROM table WHERE id = 1
query是指Presto執行statement時,生成的一個物理執行計劃,並且之後分布式的在一系列workers上執行它。
Query當Presto解析一個statement時,它會把statement轉化為一個query,並且創建一個分布式的執行計劃,然後轉化為一系列的有關聯性的stages運行在Presto workers上。當你在Presto獲取一個query的信息時,得到的是每個參與執行的組件的一個當前結果快照。
statement可認為是Client提交上來的SQL語句,query指的是執行一個statement有關的配置和組件實例信息。query圍繞著stages, tasks, splits, connectors,其他組件和數據源一起工作,以產生最終結果。
Stage當Presto執行一個query時,它會把執行分為一個有層次結構關係的stages.比如SQL語句:
這個層次結構的stages可以理解為一個一個樹。每個query都有一個root stage負責其他stages的輸出結果聚合。stages是coordinator節點用來生成一個分布式查詢計劃的模型,但是stages它們自己並不跑在Presto workers節點上。
Task一個stage是由一系列的tasks分布式運行在Presto workers上。
在Presto架構中,task是「work horse」。因為分布式查詢計劃被分解為一系列stage,然後被轉換為task,這些task隨後執行或被進一步split。一個task有輸入和輸出,就像一個stage可以有一系列的tasks並行執行一樣,一個task可以由一系列的drivers並行執行。
SplitSplit是較大數據集的一個分片。分布式查詢計劃的最低級別的stage(如上圖中的Stage3/Stage4)通過來自connectors得到的splits集合獲取輸入數據,更高級別的中間Stage(如上圖中的Stage2/Stage1)從下一層stage中獲取輸入數據。
當Presto調度一個query時,coordinator節點會查詢連接器的SPI接口獲得一個表可用的所有split集合。coordinator跟蹤哪些機器正在運行哪些task,以及哪些任務正在處理哪些split。
DriverTask包含一個或多個並行的driver。driver對數據進行操作,並結合operators產生輸出,然後結果由一個task聚合,然後傳遞到另一個stage的另一個task。driver是一系列operators實例,或者您可以將driver看作內存中的operator的物理集合。它是Presto體系結構中並行的最低級別。一個driver有一個輸入和一個輸出。
Operator一個operator消費、轉換和生產數據。例如,一個table scan operator從一個connector中獲取數據並生產出可由其他operator消費的數據,一個filter operator通過對輸入數據應用謂詞(過濾條件)並生成一個子集。
ExchangeExchanges為一個query的不同stage在Presto節點之間傳遞數據。task使用一個exchange client,生產數據到一個output buffer中,並且消費其他task產生的數據。
參考資料
[1].https://databricks.com/session_na20/presto-on-apache-spark-a-tale-of-two-computation-engines
[2].https://research.fb.com/wp-content/uploads/2019/03/Presto-SQL-on-Everything.pdf
[3].https://prestodb.io/docs/current/overview/concepts.html