背景
結構化數據與非結構化數據的轉換不同類型存儲支持不同類型索引支持前置
版本信息正確各組件對應的python語言工具庫安裝就緒,本文基於如下版本,調試正常
核心
sc.read.format("XXXX").options(XXXX).load(XXXX)dataframe.write.format("XXXX").mode().save()
Spark從mysql讀取數據
初始化spark的session: SparkSession.builder.config(conf=self.conf).config("spark.debug.maxToStringFields", "500").getOrCreate()創建mysql到spark的連結器:connector_url = 'jdbc:mysql://{0}:{1}/{2}'.format(ip,port,db)connector = self.spark.read.format("jdbc").option("url", connector_url).option("user", username).option("password", password)
Spark寫數據到mysql
table_dataframe.write.format("jdbc").mode('ignore').option("url", self.connector_url).option("dbtable", table_name).option("user", self.username).option("password", self.password).save()
注意事項
(1) 存儲模式的設置
`append`: 追加記錄到mysql,前提條件: 列名及數據類型保持一致`overwrite`: 覆蓋寫,清除原來表中已經存在的記錄.`error`或 `errorifexists`: 如果記錄已經存在直接拋出異常`ignore`: 如果表中數據已經存在直接跳過;如果表不存在,會自動創建表並寫入數據具體通過df.write.mode("XXX")進行設置(2)在非ignore模式下手動創建表,需要注意列名稱和數據類型;(額外注意spark中的String類型對應mysql的text類型)
Spark從hdfs/本地讀取數據
讀文本類文件:本地: sc.textFile('file:///tmp/src.txt') # 3條線
hdfs:sc.textFile('hdfs://{hadoop-hostname}:{port}/tmp/src.txt')
其他類型文件:sqlcontext.read.format("com.databricks.spark.csv").options(header="true", inferschema="true").load("hdfs://{hadoop-hostname}:{port}/tmp/movies.csv")
Spark寫數據到hdfs/本地
指定文件類型後寫文件:data_frame.write.format("json").save("hdfs://{hadoop-hostname}:{port}/tmp/paper.json")Spark從mongo讀取數據
sc.read.format("com.mongodb.spark.sql").options(uri="mongodb://{IP}:{Port}", database="spark",collection="waiting").load()Spark寫數據到mongo
指定寫入模式及格式com.mongodb.spark.sql
data_frame.write.format("com.mongodb.spark.sql").mode("append").save()
Spark從ES讀取數據
es_rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable","org.elasticsearch.hadoop.mr.LinkedMapWritable",conf={"es.resource": "tspark/pm_value", "es.nodes": "XXXX"})# conf以json格式傳入es.nodes(ES的IP:port)、es.resource(ES的index/type)信息
Spark寫數據到ES
bulk(es, doc_list_w) # 注意doc_list_w中doc需要帶_index、_type、_source欄位信息