『善者,吾善之;不善者,吾亦善之;德善。信者,吾信之;不信者,吾亦信之,德信。』
——《道德經》第四十九章
網際網路是一個巨大的資源庫,只要方法適當,就可以找到你所需要的數據。少量的數據可以人工去找,但是對於大量的數據,而且數據獲取之後還要進行解析,那麼靠人工就無法完成這些任務,因此就需要通過一個電腦程式來完成這些工作,這就是網絡爬蟲。
『提示:Python社區中有一個網絡爬蟲框架——Scrapy(https://scrapy.org/),Scrapy封裝了網絡爬蟲的技術細節,使得開發人員編寫網絡爬蟲更加方便。這裡並不介紹Scrapy框架,而是介紹實現網絡爬蟲的基本技術。通過一個爬取股票數據項目,介紹網絡爬蟲技術。』下面將介紹一個完整的網絡爬蟲項目,該項目是從搜狐網爬取貴州茅臺股票數據,然後進行解析,解析之後的結果保存到資料庫中,以備以後使用。
掃碼優惠購書
搜狐網數據是動態數據,需要使用Selenium庫爬取數據。相關代碼如下:
"""項目實戰:搜狐網爬取股票數據"""import datetime
from selenium import webdriver
url = 'http://q.stock.sohu.com/cn/600519/lshq.shtml'
def fetch_data(): """爬取並解析數據"""
driver = webdriver.Firefox() driver.get(url)
table_element = driver.find_element_by_id('BIZ_hq_historySearch') tr_list = table_element.find_elements_by_xpath('./tbody/tr') ①
data = [] ②
for idx, tr in enumerate(tr_list): ③ if idx == 0: ④ continue
td_list = tr.find_elements_by_tag_name('td') ⑤ fields = {} ⑥ fields['Date'] = td_list[0].text fields['Open'] = float(td_list[1].text) fields['Close'] = float(td_list[2].text) fields['Low'] = float(td_list[5].text) fields['High'] = float(td_list[6].text) fields['Volume'] = float(td_list[7].text) data.append(fields) ⑦
driver.quit() return data
if __name__ == '__main__': data = fetch_data() print(data)上述代碼將爬取數據和解析數據封裝在fetch_data()函數中,該函數返回列表數據。代碼第①行是在table_element對象(id為BIZ_hq_historySearch 的table標籤)中通過XPath表達式「./tbody/tr」查找符合條件的所有元素,XPath表達式「./tbody/tr」是查找table中tbody中所有tr標籤,每個tr是table中的一行。查找結果返回tr_list對象,如圖1-1所示tr_list。
代碼第②行定義一個列表對象data,用了保存從table中解析出來的數據。代碼第③行遍歷tr_list列表對象。通過enumerate()函數可以拆分idx和tr變量,idx是循環變量。因為table的第一行是其他數據的匯總,不應該提取這一行數據,代碼第④行當idx == 0是時跳第一行數據。代碼第⑤行tr對象中通過td標籤名查詢所有元素,如圖1-1所示td_list。代碼第⑥行創建字典對象fields ,每一個fields 對象可以保存table中一條tr數據。代碼第⑦行將每一個fields對象放到data列表中。
檢測數據是否更新
由於網絡爬蟲需要定期從網頁上爬取數據,但是如果網頁中的數據沒有更新,本次爬取的數據與上次一樣,就沒有必要進行解析和存儲了。驗證兩次數據是否完全相同可以使用MD5數字加密技術,MD5可以對任意長度的數據進行計算,得到固定長度的MD5碼。MD5的典型應用是對一段數據產生信息摘要,以防止被篡改。通過MD5函數對兩次請求返回的HTML數據進行計算,生成的MD5相同則說明數據沒有更新,否則數據已經更新。
Python中計算MD5碼可以使用hashlib模塊中的md5()函數。實現檢測數據更新的代碼如下:
"""項目實戰:搜狐網爬取股票數據"""import hashlibimport os
from selenium import webdriver
url = 'http://q.stock.sohu.com/cn/600519/lshq.shtml'
def check_update(html): """驗證數據是否更新,更新返回True,未更新返回False"""
md5obj = hashlib.md5() ① md5obj.update(html.encode(encoding='utf-8')) ② md5code = md5obj.hexdigest() ③ print(md5code)
old_md5code = '' f_name = 'md5.txt' ④
if os.path.exists(f_name): with open(f_name, 'r', encoding='utf-8') as f: ⑥ old_md5code = f.read()
if md5code == old_md5code: ⑦ print('數據沒有更新') return False else: with open(f_name, 'w', encoding='utf-8') as f: f.write(md5code) ⑧ print('數據更新') return True
def fetch_data(): """爬取並解析數據"""
driver = webdriver.Firefox() driver.get(url)
table_element = driver.find_element_by_id('BIZ_hq_historySearch') ⑨ if not check_update(table_element.text): driver.quit() return None
… … driver.quit() return data
if __name__ == '__main__': data = fetch_data()上述代碼第①行~第③行是生成MD5碼的主要語句。首先通過代碼第①行md5()函數創建md5對象。代碼第②行使用update()方法對傳入的數據進行MD5運算,注意update()方法的參數是字節序列對象,而html.encode(encoding='utf-8')是將字符串轉換為字節序列。代碼第③行是請求MD5摘要,hexdigest()方法返回一個十六進位數字所構成的MD5碼。
代碼第④行定義變量f_name,用來保存上次MD5碼的文件名。代碼第⑤行判斷文件是否存在,如果存在則會讀取MD5碼,見代碼第⑥行。代碼第⑦行是比較兩次的MD5碼,如果一致說明沒有更新,返回False;否則返回True,並將新的MD5碼寫入到文件中,見代碼第⑧行。代碼第⑨行是通過查找id為BIZ_hq_historySearch的table元素對象table_element。代碼第⑩行調用check_update()函數驗證股票數據是否更新,其中table_element.text提取table元素中的所有文本。注意如果沒有更新數據時需要則通過driver.quit()語句退出瀏覽器釋放資源。
保存數據到資料庫
數據解析完成後需要保存到資料庫中。該項目的資料庫設計模型如圖1-2所示,項目中包含兩個數據表,股票信息表(Stocks)和股票歷史價格表(HistoricalQuote)。
圖1-2 資料庫設計模型
資料庫設計完成後需要編寫資料庫DDL腳本。當然,也可以通過一些工具生成DDL腳本,然後把這個腳本放在資料庫中執行就可以了。下面是編寫的DDL腳本文件crebas.sql。
create database if not exists QuoteDB;
use QuoteDB;
drop table if exists HistoricalQuote;
drop table if exists Stocks;
create table HistoricalQuote( HDate varchar(10) not null, Open decimal(8,4), High decimal(8,4), Low decimal(8,4), Close decimal(8,4), Volume bigint, Symbol varchar(10), primary key (HDate));
create table Stocks( Symbol varchar(10) not null, Company varchar(50) not null, primary key (Symbol));
alter table HistoricalQuote add constraint FK_Reference_1 foreign key (Symbol) references Stocks (Symbol) on delete restrict on update restrict;
insert into Stocks (Symbol, Company) values ('600519', '貴州茅臺'); ①在創建完成資料庫後,還在股票信息表中預先插入了一條數據,見代碼第①行。編寫DDL腳本之後需要在MySQL數據中執行,創建資料庫。
資料庫創建完成後,編寫訪問資料庫的Python代碼如下:
import pymysql
def insert_hisq_data(row): """在股票歷史價格表中傳入數據"""
connection = pymysql.connect(host='localhost', user='root', password='12345', database='QuoteDB', charset='utf8') try: with connection.cursor() as cursor:
sql = 'insert into historicalquote ' \ '(HDate,Open,High,Low,Close,Volume,Symbol)' \ ' values (%(Date)s,%(Open)s,%(High)s,%(Low)s,%(Close)s,%(Volume)s,%(Symbol)s)' ①
cursor.execute(sql, row) ②
connection.commit()
except pymysql.DatabaseError as error: connection.rollback() print('插入數據失敗{0}'.format(error)) finally: connection.close()訪問資料庫代碼是在db_access模塊中編寫的。代碼第①行是插入數據SQL語句,其中(%(Date)s等是命名佔位符,綁定時需要字典類型。代碼第②行是綁定參數並執行SQL語句,其中row是要綁定的參數,row是字典類型。
調用db_access模塊代碼如下:
...
if __name__ == '__main__': data = fetch_data() print(data)
if data is not None: for row in data: ① row['Symbol'] = '600519' ② db_access.insert_hisq_data(row) ③上述代碼第①行是循環遍歷data變量,data保存了所有爬蟲爬取的數據。代碼第②行添加Symbol到row中,爬取的數據沒有Symbol。代碼第③行是調用db_access模塊的insert_hisq_data()函數插入數據到資料庫。
爬蟲工作計劃任務
網絡中的數據一般都是定期更新的,網絡爬蟲不需要一直工作,可以根據數據更新頻率或更新數據時間點,制訂網絡爬蟲工作計劃任務。例如股票信息在交易日內是定時更新的,股票交易日是周一至周五,當然還有特殊的日期不進行交易,股票交易時間是上午時段9:30~11:30,下午時段13:00~15:00。本項目有些特殊爬取的數據不是實時數據,而是歷史數據,這種歷史數據應該在交易日結束之後爬取。
本項目需要兩個子線程,一個是工作子線程,另一個是控制子線程。具體代碼如下:
"""項目實戰:搜狐網爬取股票數據"""import datetimeimport hashlibimport osimport threadingimport time
import db_accessfrom selenium import webdriver
url = 'http://q.stock.sohu.com/cn/600519/lshq.shtml'
is_running = Trueinterval = 60 * 60
def check_update(html): """驗證數據是否更新,更新返回True,未更新返回False"""
<省略驗證數據是否更新代碼>
def fetch_data(): """爬取並解析數據""" <省略爬取並解析數據代碼>
def is_trad_time(): ① """判斷交易時間"""
now = datetime.datetime.now() df = '%H%M%S' strnow = now.strftime(df) starttime1 = datetime.time(9, 30).strftime(df) endtime1 = datetime.time(11, 30).strftime(df) starttime2 = datetime.time(13, 0).strftime(df) endtime2 = datetime.time(15, 0).strftime(df)
if now.weekday() == 5 \ or now.weekday() == 6 \ or not ((strnow >= starttime1 and strnow <= endtime1) \ or (strnow >= starttime2 and strnow <= endtime2)): return False return True
def work_thread_body(): """工作線程體函數"""
while is_running: print('爬蟲休眠...') time.sleep(interval) if is_trad_time(): print('交易時間,爬蟲不工作...') continue
print('非交易時間,爬蟲開始工作...')
data = fetch_data()
if data is not None: for row in data: row['Symbol'] = '600519' db_access.insert_hisq_data(row)
def main(): """主函數"""
work_thread = threading.Thread(target=work_thread_body, name='WorkThread') work_thread.start()
if __name__ == '__main__': main()工作線程根據指定計劃任務完成解析數據和數據保存。工作線程啟動後會調用線程體work_thread_body()函數,在線程體函數中代碼第③行讓工作線程休眠。代碼第行調用is_trad_time()函數判斷是否是交易時間,如果是交易時間繼續休眠;如果是非交易時間,爬蟲開始工作。
另外,代碼第①行是判斷交易的時間函數,該函數可以判斷當前時間是否是股票交易時間。代碼第行中now.weekday() == 5是判斷當前日期是星期六,now.weekday() == 6是判斷當前日期是星期日,(strnow >= starttime1 and strnow <= endtime1)是判斷當前時間是在上午時段9:30~11:30,(strnow >= starttime2 and strnow <= endtime2)是判斷當前時間是在下午時段13:00~15:00。
項目編寫完成後就可以進行測試了。圖1-3是項目的控制臺,如果在不在交易時間內,爬蟲開始工作,然後休眠;如果在交易時間內,爬蟲休眠。