Spark On MaxCompute如何訪問Phonix

2020-10-14 阿里云云棲號

一、購買Hbase1.1並設置對應資源

1.1購買hbase

hbase主要版本為2.0與1.1,這邊選擇對應hbase對應的版本為1.1
Hbase與Hbase2.0版本的區別
HBase1.1版本
1.1版本基於HBase社區1.1.2版本開發。
HBase2.0版本
2.0版本是基於社區2018年發布的HBase2.0.0版本開發的全新版本。同樣,在此基礎上,做了大量的改進和優化,吸收了眾多阿里內部成功經驗,比社區HBase版本具有更好的穩定性和性能。

1.2確認VPC,vsWitchID

確保測試聯通性的可以方便可行,該hbase的VPCId,vsWitchID儘量與購買的獨享集成資源組的為一致的。

1.3設置hbase白名單,其中DataWorks白名單如下,個人ECS也可添加

根據文檔連結選擇對應的DataWorks的region下的白名單進行添加。

1.4查看hbase對應的版本和訪問地址

打開資料庫連結的按鈕,可以查看到Hbase的主版本以及Hbase的專有網絡訪問地址,以及是否開通公網訪問的方式進行連接。

二、安裝Phonix客戶端,並創建表和插入數據

2.1安裝客戶端

根據hbase的版本為1.1選擇Phonix的版本為4.12.0下載對應的客戶端文件ali-phoenix-4.12.0-AliHBase-1.1-0.9.tar.gz
登陸客戶端執行命令

./bin/sqlline.py 172.16.0.13,172.16.0.15,172.16.0.12:2181

創建表:

CREATE TABLE IF NOT EXISTS users_phonix( id INT , username STRING, password STRING) ;

插入數據:

UPSERT INTO users (id, username, password) VALUES (1, 'admin', 'Letmein');

2.2查看是否創建和插入成功

在客戶端執行命令,查看當前表與數據是否上傳成功

select * from users;

三、編寫對應代碼邏輯

3.1編寫代碼邏輯

在IDEA按照對應得Pom文件進行配置本地得開發環境,將代碼涉及到得配置信息填寫完整,進行編寫測試,這裡可以先使用Hbase得公網訪問連結進行測試,代碼邏輯驗證成功後可調整配置參數,具體代碼如下

package com.git.phoniximport org.apache.hadoop.conf.Configurationimport org.apache.spark.sql.SparkSessionimport org.apache.phoenix.spark._/** * 本實例適用於Phoenix 4.x版本 */object SparkOnPhoenix4xSparkSession { def main(args: Array[String]): Unit = { //HBase集群的ZK連結地址。 //格式為:xxx-002.hbase.rds.aliyuncs.com,xxx-001.hbase.rds.aliyuncs.com,xxx-003.hbase.rds.aliyuncs.com:2181 val zkAddress = args(0) //Phoenix側的表名,需要在Phoenix側提前創建。Phoenix表創建可以參考:https://help.aliyun.com/document_detail/53716.html?spm=a2c4g.11186623.4.2.4e961ff0lRqHUW val phoenixTableName = args(1) //Spark側的表名。 val ODPSTableName = args(2) val sparkSession = SparkSession .builder() .appName("SparkSQL-on-MaxCompute") .config("spark.sql.broadcastTimeout", 20 * 60) .config("spark.sql.crossJoin.enabled", true) .config("odps.exec.dynamic.partition.mode", "nonstrict") //.config("spark.master", "local[4]") // 需設置spark.master為local[N]才能直接運行,N為並發數 .config("spark.hadoop.odps.project.name", "***") .config("spark.hadoop.odps.access.id", "***") .config("spark.hadoop.odps.access.key", "***") //.config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api") .config("spark.hadoop.odps.end.point", "http://service.cn-beijing.maxcompute.aliyun-inc.com/api") .config("spark.sql.catalogImplementation", "odps") .getOrCreate() //第一種插入方式 var df = sparkSession.read.format("org.apache.phoenix.spark").option("table", phoenixTableName).option("zkUrl",zkAddress).load() df.show() df.write.mode("overwrite").insertInto(ODPSTableName) }}

3.2對應Pom文件

pom文件中分為Spark依賴,與ali-phoenix-spark相關的依賴,由於涉及到ODPS的jar包,會在集群中引起jar衝突,所以要將ODPS的包排除掉

<?xml version="1.0" encoding="UTF-8"?><!-- Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. See accompanying LICENSE file.--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <properties> <spark.version>2.3.0</spark.version> <cupid.sdk.version>3.3.8-public</cupid.sdk.version> <scala.version>2.11.8</scala.version> <scala.binary.version>2.11</scala.binary.version> <phoenix.version>4.12.0-HBase-1.1</phoenix.version> </properties> <groupId>com.aliyun.odps</groupId> <artifactId>Spark-Phonix</artifactId> <version>1.0.0-SNAPSHOT</version> <packaging>jar</packaging> <dependencies> <dependency> <groupId>org.jpmml</groupId> <artifactId>pmml-model</artifactId> <version>1.3.8</version> </dependency> <dependency> <groupId>org.jpmml</groupId> <artifactId>pmml-evaluator</artifactId> <version>1.3.10</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </exclusion> <exclusion> <groupId>org.scala-lang</groupId> <artifactId>scalap</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>cupid-sdk</artifactId> <version>${cupid.sdk.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.aliyun.phoenix</groupId> <artifactId>ali-phoenix-core</artifactId> <version>4.12.0-AliHBase-1.1-0.8</version> <exclusions> <exclusion> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-mapred</artifactId> </exclusion> <exclusion> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-commons</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.aliyun.phoenix</groupId> <artifactId>ali-phoenix-spark</artifactId> <version>4.12.0-AliHBase-1.1-0.8</version> <exclusions> <exclusion> <groupId>com.aliyun.phoenix</groupId> <artifactId>ali-phoenix-core</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>false</minimizeJar> <shadedArtifactAttached>true</shadedArtifactAttached> <artifactSet> <includes> <!-- Include here the dependencies you want to be packed in your fat jar --> <include>*:*</include> </includes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> <exclude>**/log4j.properties</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.3.2</version> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile-first</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build></project>

四、打包上傳到DataWorks進行冒煙測試

4.1創建要傳入的MaxCompute表

CREATE TABLE IF NOT EXISTS users_phonix( id INT , username STRING, password STRING) ;

4.2打包上傳到MaxCompute

在IDEA打包要打成shaded包,將所有的依賴包,打入jar包中,由於DatadWork界面方式上傳jar包有50M的限制,因此採用MaxCompute客戶端進行jar包

4.3選擇對應的project環境,查看上傳資源,並點擊添加到數據開發

進入DataWorks界面選擇左側資源圖標,選擇對應的環境位開發環境,輸入刪除文件時的文件名稱進行搜索,列表中展示該資源已經上傳成,點擊提交到數據開發

點擊提交按鈕

4.4配置對應的vpcList參數並提交任務測試

其中的配置vpcList文件的配置信息如下,可具體根據個人hbase的連結,進行配置

{ "regionId":"cn-beijing", "vpcs":[ { "vpcId":"vpc-2ze7cqx2bqodp9ri1vvvk", "zones":[ { "urls":[ { "domain":"172.16.0.12", "port":2181 }, { "domain":"172.16.0.13", "port":2181 }, { "domain":"172.16.0.15", "port":2181 }, { "domain":"172.16.0.14", "port":2181 }, { "domain":"172.16.0.12", "port":16000 }, { "domain":"172.16.0.13", "port":16000 }, { "domain":"172.16.0.15", "port":16000 }, { "domain":"172.16.0.14", "port":16000 }, { "domain":"172.16.0.12", "port":16020 }, { "domain":"172.16.0.13", "port":16020 }, { "domain":"172.16.0.15", "port":16020 }, { "domain":"172.16.0.14", "port":16020 } ] } ] } ]}

Spark任務提交任務的配置參數,主類,以及對應的參數
該參數主要為3個參數第一個為Phonix的連結,第二個為Phonix的表名稱,第三個為傳入的MaxCompute表

點擊冒煙測試按鈕,可以看到任務執行成功

在臨時查詢節點中執行查詢語句,可以得到數據已經寫入MaxCompute的表中

總結:

使用Spark on MaxCompute訪問Phonix的數據,並將數據寫入到MaxCompute的表中經過實踐,該方案是可行的。但在實踐的時有幾點注意事項:

1.結合實際使用情況選擇對應的Hbase以及Phonix版本,對應的版本一致,並且所使用的客戶端,以及代碼依賴都會有所改變。
2.使用公網在IEAD進行本地測試,要注意Hbase白名單,不僅要設置DataWorks的白名單,還需將自己本地的地址加入到白名單中。
3.代碼打包時需要將pom中的依賴關係進行梳理,避免ODPS所存在的包在對應的依賴中,進而引起jar包衝突,並且打包時打成shaded包,避免缺失遺漏對應的依賴。


本文為阿里雲原創內容,未經允許不得轉載。

相關焦點

  • MaxCompute Spark 使用和常見問題
    配置介紹3.1 配置的位置3.1.1 Spark配置的位置用戶使用Maxcompute Spark通常會有幾個位置可以添加Spark配置,主要包括:位置1:spark-defaults.conf,用戶通過客戶端提交時在spark-defaults.conf文件中添加的Spark配置位置2:dataworks的配置項,用戶通過dataworks
  • 如何理解maxcompute常見報錯信息?
    打開APP 如何理解maxcompute常見報錯信息?「子查詢結果數大於1000行」這是maxcompute的限制。子查詢一般是習慣於使用傳統資料庫的開發,經常用的查詢方式。而實際上,無論是MYSQL還是maxcompute,都不建議使用子查詢。在maxcompute中一般會用多表關聯來解決問題。
  • 再談Spark Streaming Kafka反壓
    0x02 速率預估 啟用反壓也比較簡單:sparkConf.set("spark.streaming.backpressure.enabled", "true")。spark會在作業執行結束後,調用RateController.onBatchCompleted更新batch的元數據信息:batch處理結束時間、batch處理時間、調度延遲時間、batch接收到的消息量等. 然後基於上述參數,使用P的compute方法。
  • Spark 1.6.0 新手快速入門
    首先介紹Spark的交互界面的API使用,然後介紹如何使用Java、Scala以及Python編寫Spark應用。詳細的介紹請閱讀Spark Programming Guide。 在按照本文進行操作之前,請確保已安裝Spark。本文中的所有操作沒有使用HDFS,所以您可以安裝任何版本的Hadoop。
  • SPARK RDD 介紹
    一、什麼事RDD RDD(Resilient Distributed DataSet)叫做彈性分布式數據集,是spark中最基本的數據抽象,它代表一個不可變、可分區、裡面元素可並行計算的集合,RDD具有數據流模型得特點,自動容錯、位置感知性調度和可伸縮性,RDD允許用戶在執行多個查詢時顯示的將工作集緩存在內存中,後續的查詢能夠重用工作集
  • sparksql合併小文件
    查看sparksql支持的參數:spark-sql set -v需要注意這種方式對Spark的版本有要求,建議在Spark2.4.X及以上版本使用,示例: INSERT ... SELECT /*+ COALESCE(numPartitions) */ ... INSERT ...
  • 用Spark-NLP建立文本分類模型
    設置環境讓我們繼續看看如何在AWS EMR上設置Spark NLP。1.在啟動EMR集群之前,我們需要創建一個引導操作。引導操作用於設置其他軟體或自定義群集節點的配置。以下是可用於在EMR集群上設置Spark NLP的引導操作,#!
  • (純乾貨建議收藏)一次GC引發的Spark調優大全
    一般在我們開發spark程序的時候,從代碼開發到上線以及後期的維護中,在整個過程中都需要涉及到調優的問題,即一開始需要考慮如何把代碼寫的更簡潔高效調優(即代碼優化),待開發測試完成後,提交任務時綜合考量該任務所需的資源(這裡涉及到資源調優),上線後是否會出現數據傾斜問題(即傾斜調優),以及是否出現頻繁GC問題(這裡涉及到GC調優)。
  • spark-sql無法連接hive元資料庫
    :389) at org.apache.spark.sql.hive.HiveExternalCatalog.lzycompute(SharedState.scala:137) at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:127) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala
  • Spark Streaming 和 Flink 誰是數據開發者的最愛?
    val Array(brokers, topics) = args// 創建一個批處理時間是2s的context val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) /
  • spark job 裝載率統計
    可問題是 spark怎麼做迭代計算,難道是for循環嗎?我本來一直在像 能否通過一次shuffle解決問題,苦苦思索,並沒有這樣的實現。select max(order) from route_table ,那麼我們得到一個結果,就是3.
  • Spark—15分鐘教程
    _import org.apache.spark.sql.functions._// 初始化Spark會話val spark = SparkSession.builder.讓我們看看如何使用Sales表進行基本操作。簡單的Select語句和顯示數據# 以Parquet格式讀取源表sales_table = spark.read.parquet(".
  • sparksql序列化異常
    在sparksql中顯示的指定了mapjoin,導致廣播的數據量太大,導致序列化超過指定大小。19/02/21 21:24:29 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, CHDD183, executor 1): org.apache.spark.SparkException
  • 如何快速學習spark
    spark它是發源於美國的加州大學伯克利分校amplab集群計算平臺的,基於內存計算的,時速非常快,性能比Hadoop快100陪以上,它可以流處理、圖計算、還有很多的計算範式,我個人認為spark是個全能的選手,但是隨著技術的快速發展,現在又出現Flink,它和spark是一個競爭對手了,今天我們主要講spark,後續我會再分享Flink的知識。
  • 手把手教你在 Windows 平臺搭建本地 Hive 訪問環境
    spark-sql訪問已經被hive創建的表出現的問題簡介學習大數據最痛苦和費時間的就是入門時的環境搭建,對於大數據工程師而言,這個過程必不可少,但對於一些簡單的測試,每次都要打開自己搭建好的虛擬機,未免有些麻煩。對於數據分析師而言,一般只需要使用hive和spark就好,搭建集群實在是一件費力沒有效果的事。
  • Spark Streaming如何讀Kafka數據 Hbase如何設計表
    spark rdd 怎麼分區寬依賴和窄依賴寬依賴:父RDD的分區被子RDD的多個分區使用。例如 groupByKey、reduceByKey、sortByKey等操作會產生寬依賴,會產生shuffle。窄依賴:父RDD的每個分區都只被子RDD的一個分區使用。
  • Spark項目案例實戰和分布式部署
    {ArrayUtils, Bytes}import org.apache.spark.*.tgz,每臺hadoop伺服器上放在同一個目錄下不用任何配置值即可,用spark-submit提交就行。項目本身的jar包放在這裡目錄下/home/hadoop/chongdianleme/,然後通過spark-submit提交如下腳本即可:hadoop fs -rmr /ods/kc/dim/ods_kc_dim_hbase/;/home/hadoop/software/spark21/bin/spark-submit
  • sparksql 窗口函數原理
    例如當你需要基於每天的用戶訪問數,來計算七天的移動平均訪問數,就需要按照時間排序,每一條數據的計算都需要前面6條數據一起參與計算。二、窗口函數的使用範式一般窗口函數都是這樣用的SELECT window_func(args)OVER ( [PARTITIONBY col_name, col_name, ...]
  • 超"機"訪問:4G網絡六成首選HTC One max
    作為HTC首款LTE手機HTC One max,在第一時間就支持4G網絡,在當前僅有的一些4G手機中傲立群雄。金屬材質的外觀,5.9英寸的高清大屏無論是手感還是功能體驗都非常出色,那麼網友們是否認可呢?HTC One max會成為網友們首選的4G手機嗎?
  • 0644-5.16.1-如何在CDH5中使用Spark2.4 Thrift
    在CDH5中通過自己單獨安裝的方式運行Thrift服務現在已經調通並在使用的是如下版本組合:1.在CDH5中安裝Spark1.6的Thrift服務,參考《0079-如何在CDH中啟用Spark Thrift》2.在CDH5中安裝Spark2.1的Thrift服務,參考《0280-如何在Kerberos環境下的CDH集群部署Spark2.1的Thrift及spark-sql