上一節我們已經實現了資料庫連接池,視頻版和文字版的內容都已更新,
這節我們將要連接資料庫,實現數據的增刪改查。但還有一點需要注意,在數據自動化當中很少有刪除操作,一般只涉及更新(新增)和查詢,所以,外面我們主要實現這兩個方法。
老規矩,先看代碼結構:
我們在這裡定義一個類,叫做mysqlelper當中包含如下幾個函數:
__init__:初始化這個類,並且接收資料庫的資料庫地址、埠、帳號、密碼、資料庫名稱等信息;insterdata_bydf:插入數據函數。我們使用python操作數據,一般是把數據轉化成pandas當中的DataFrame,這裡簡稱我df。也就是說,把已有的dataframe數據插入的mysql資料庫當中;insterdata_bydf_thread:insterdata_bydf的進階用法,thread表示「多線程」,當我們的數據量過大的時候,插入資料庫就很慢,使用多線程可以縮短插入時間,當我們每天需要跑很多的自動化腳本的時候,多線程就會很有用;__get_df:查詢數據函數。只需要扔給這個函數一個寫好的sql,它就會返回給你數據的df,在後續的章節中,你可以把這個數據通過郵件的方式,發送出去;get_df:對上一個函數的封裝,方便直接調用。好了,現在你已經了解了這個類的主要架構,下面我們就來寫代碼:
第一步:導包
from utils.mydbpools import getDBonnectionPools # 上一節中構建的連接池import pandas as pd # pandas 用於處理數據from config import config # 資料庫相關配置import pymysqlimport osimport time
第二步:編寫__init__函數
init函數內容
__init__函數主要作用是在實例化的時候,接收資料庫相關配置參數,這裡我們設置了是否使用pools這個參數,如果使用的話,就用到了我們上一節課構造的連接池。
第三步:編寫插入數據函數
既然是像資料庫中插入數據,那麼這個函數就要包含以下參數
df:要插入的數據tb:要往哪個數據表裡面插if_exists:如果這個數據表裡已經有數據了,我們該怎麼辦,是直接新增,還是把原數據刪除之後再新增n:有時候由於網絡或者其它原因,插入可能失敗,所以失敗時候需要重新嘗試,n就是嘗試的最大次數def insertdata_bydf(self, df, tb, if_exists='append', n=5):
start_time = time.time() # 返回當前時間戳
為了計算插入數據所需的時間,這裡我們定義一個插入的開始時間。
df = df.where(pd.notnull(df), "None").replace("nan", "None").replace("NaN", "None")df = df.astype("str")
之後,把傳進來的df做一下格式化,把pandas當中的nan這種數據,替換為mysql當中的None,否則插入的時候會報錯,再把所有字符轉化為字符串。
sql = """insert into {0} ({1}) values ({2});"""sql = sql.format(tb, ",".join(df.columns), ("%s," * len(df.columns))[:-1])
這一步是構建sql語句的模板,有sql基礎的同學可能都知道,插入數據使用的是:
INSERT INTO table_name (列1, 列2,...) VALUES (值1, 值2,....)
所以,我們要構造出來這個意思,中間用到了format函數,不太熟悉的同學也沒關係,先把框架搭起來,python的基礎知識我們後面陸續補充。
{!-- PGC_COLUMN --}conn = self.getconn()cursor = conn.cursor()
這裡我們使用getconn()函數獲取資料庫連接
getconn函數
if if_exists == 'replace':delete_rows = cursor.execute('delete from {0}'.format(tb))elif if_exists == 'replace-truncate':delete_rows = cursor.execute('truncate from {0}'.format(tb))else:delete_rows = 0
接著,判斷是要「新增」還是「替換」,替換有兩種不同的語句,一個是「delete 」,另一個是『truncate 』,區別是:如果使用delete刪除數據,那麼自增列id不會重置,比如說你之前資料庫有100條數據,即使你刪除了,新增的數據也是從101開始自增,而truncate則可以重置自增列,繼續從1開始插入。
para = [tuple([None if y == "None" else y for y in x]) for x in df.values]insert_rows = cursor.executemany(sql, para)conn.commit()print(" insert數據行數:{0}".format(insert_rows))print("資料庫insert成功")endTime = time.time()time_eclipse = round((endTime - startTime), 2)count_times = n
最終,我們使用 cursor.executemany 來插入我們的數據,sql是之前做的sql語句模板,para是我們把df中的每行數據構建成一個元祖,滿足mysql插入的格式需求。
那如果插入不成功,需要使用try...excecption來捕獲異常,下面是異常處理:
關於異常處理我們也會單獨拿一節來講,數據插入的部分先講到這裡,下一節我們講一下數據查詢語句的構建,以及實戰過程中可能碰到的問題,已經購買課程的同學可以私信我一下哦,發你們一下課程代碼。
視頻版的內容也會同步更新,歡迎大家關注