ACMUG徵集原創技術文章。詳情請添加 A_CMUG或者掃描文末二維碼關注我們的微信公眾號。有獎徵稿,請發送稿件至:acmug@acmug.com。
3306現金有獎徵稿說明:知識無價,勞動有償,ACMUG特約撰稿人有獎回報計劃(修訂版)
作者簡介:任坤
現居廣東,畢業後一直從事資料庫相關工作,先後擔任專職Oracle和MySQL DBA,個人微信號justindata,投稿ACMUG是希望拋磚引玉,讓眾多評審幫忙校驗文章的失當之處,藉此提升個人水平,也供眾多DB愛好者參考。
前言:
編寫一個資料庫伺服器是個大工程,以MySQL為例,其代碼總量已經達到百萬行,就算照著敲一遍也需要大半年。
寫一個資料庫客戶端就沒有那麼麻煩,你只需遵循MySQL的協議,自己構造各種命令包發送給伺服器,並對伺服器的響應包進行解析,就可以完成一個簡單的MySQL客戶端。
我最近一直在閱讀pymysql和python-mysql-replication源碼,結合mysql internal文檔,按照自己的理解用python實現一些簡單的MySQL客戶端功能。
該系列文章一共4篇,本文是第一篇,簡單介紹如何建立1個MySQL資料庫連接。
參考資料
https://github.com/PyMySQL/PyMySQL
https://dev.mysql.com/doc/internals
正文
當MySQL客戶端發起一個資料庫連接時,會經歷以下幾步:
1、創建套接字連接到mysqld,採用tcp或者unix socket協議
2、mysqld發起initial handshake,向客戶端發送一個握手包(handshake packet),由後者解析並返回一個響應包(repsonse packet),mysqld解析響應包,如果各項判斷均成功,則響應一個OK_packet,此時成功建立一個資料庫連接。
若自己編寫MySQL客戶端,則需要解析握手包,並根據解析結果構造響應包,而這些交互均以MySQL通信包(communication packet)的形式完成。
依據MySQL官方文檔定義,A communication packet is a single SQL statement sent to the MySQL server, a single row that is sent to the client, or a binary log event sent from a master replication server to a slave. The largest possible packet that can be transmitted to or from a MySQL 5.7 server or client is 1GB.
MySQL客戶端和伺服器每次交互時,都會生成1個通信包,通信包的字節上限為min(max_allowed_packet, 1GB),在發送之前會將其切割成chunk,每個chunk最大值為16M(其實是2^16-1位元組),不足16M的通信包單獨佔據1個chunk。
每個chunk都有1個4位元組的header,前3個字節描述其長度,第4個字節則為序列號seq_id,從0開始遞增允許迴繞,客戶端每發送一條新的sql命令時會重置為0。當1個通信包分割成多個chunk發送時,接收方據此可保證按順序讀取chunk,避免數據紊亂。
這些chunk被發送給作業系統,依次被打散組裝成tcp和ip包,通過網絡發送到對端,遵循tcp/ip協議被反解析成chunk,然後由MySQL從1個或若干個chunk中還原成1個完整的通信包。
tcp/ip協議已經被封裝成庫函數,高級語言只需要調用API便可實現tcp/ip包的發送和接收,無需關注其底層實現。
流程圖如下:
1、創建tcp連接
import socket
import struct
創建1個到本機3306埠的tcp連接
sock = socket.create_connection(('127.0.0.1', 3306))
禁用nagle算法,避免不必要的網絡等待
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
此時資料庫已經創建了1個連接,user顯示為unauthenticated user,該狀態會一直持續直至完成用戶驗證,如果中間的API調用出現阻塞,MySQL新建連接會一直處於該狀態。
2、接收Initial handshake packet
MySQL通過accept()接受到該請求,會創建一個thread專門處理該連接(默認採用one-thread-per-connection模型),並主動向客戶端發起握手請求。
next_seq_id = 0
上文提到,MySQL所有的網絡交互都是以chunk為單位傳送,而chunk都有1個4位元組的header,故客戶端每次都需要先解析chunk header,為了便於重用寫成函數。
def read_chunk():
global next_seq_id
packet_header = sock.recv(4)
btrl, btrh, packet_number = struct.unpack('<HBB', packet_header)
bytes_to_read = btrl + (btrh << 16)
if packet_number != next_seq_id:
raise Exception("got wrong seq_id, expect %d but actually is %d " % (next_seq_id, packet_number))
next_seq_id = (next_seq_id + 1) % 256
return bytes_to_read
從socket讀取指定長度的數據
def read_recv(data_len):
data = sock.recv(data_len)
if len(data) == data_len:
return data
while True:
data += sock.recv(data_len - len(data))
if len(data) == data_len:
return data
注1:recv(N)執行過程中可能會被OS interrupt中斷導致提前返回,故每次都要判斷返回數據的長度是否為N,如果小於N,則需要再次讀取。
讀取一個完整的通信包,如果chunk存儲的數據長度等於0xffffff,說明通信包被拆分成多個chunk,需要讀取後續chunk進行拼接。
def read_packet():
packet_len = read_chunk()
packet_binary = b''
packet_binary = read_recv(packet_len)
while packet_len == 0xffffff:
packet_len = read_chunk()
packet_binary += read_recv(packet_len)
return packet_binary
讀取mysqld發來的initial handshake packet。
packet_binary = read_packet()
initial handshake packet組成格式如下:
長度
類型
存儲內容
1
byte
protocol_version
以'\0'結尾
string
server version
4
byte
server thread ID
8
string
auth-plugin-data-part-1
1
byte
填充值,無意義
2
byte
capability flags (lower 2 bytes)
1
byte
default server character set
2
byte
status flag
2
byte
capability flags (upper 2 bytes)
1
byte
length of auth-plugin-data
10
string
全為0,暫未使用
13
string
auth-plugin-data-part-2
以'\0'結尾
string
auth-plugin-name
目前MySQL支持的握手協議有兩種, HandshakeV9和HandshakeV10,5.X版本都是採用後者,保留protocol_version欄位為了向前兼容,也為以後方便擴展新協議。
auth-plugin-data用於密碼驗證,mysqld填充一個隨機字符串,客戶端接收後,以資料庫密碼作為密鑰將其加密並返回,mysqld以資料庫存儲的用戶密碼將其解密,如果解密後的值和初始的隨機字符串相同,說明客戶端輸入的密碼正確;
capability flags定義了一系列通信標準,比如是否採用4.1協議,是否採用auth-plugin,客戶端需要將其解析,以便正確的獲取握手包欄位並據此構造響應包,auth-plugin-data-part-2和auth-plugin-name的解析都依賴相應標誌位。
i = 0
protocol_version = struct.unpack('<B',packet_binary[i:i+1])[0]
i += 1
server_version長度不固定,以'\0'標識符結束,從第i個位置,搜索第一個出現的'\0',以latin1解析成字符串
server_end = packet_binary.find(b'\0', i)
server_version = packet_binary[i:server_end].decode('latin1')
i = server_end + 1
server_thread_id = struct.unpack('<I', packet_binary[i:i+4])[0]
i += 4
MySQL支持3種驗證方式:old_password_authentication、secure_password_authentication以及自定義auth-plugin,第一種僅用於pre-4.1(可被破解),只需要8位元組,第二種是當前MySQL默認方式,總共需要20位元組,第3種需要用戶自定義,本文只討論第二種。
讀取auth_data的前8位元組,如果採用了old_password_authentication,就不必存儲剩餘12位元組,故拆分兩部分存儲。
auth_data = packet_binary[i:i+8]
i += 8
跳過1個字節的填充值
i += 1
解析capability標誌位,該標誌為總共4位元組,這個是其低2位
server_cap = struct.unpack('<H', packet_binary[i:i+2])[0]
i += 2
為了減少網絡傳輸,MySQL為字符集定義了一個映射表,mysqld和客戶端都需要實現該數據結構,傳輸時只發送1個數值,由接收端從映射表中獲取對應字符集,其他諸如column_type和capability flags,同理。
為減少篇幅,本文約定客戶端和伺服器的字符集都為utf8,不再額外進行charset_id到字符集的轉換。
charset_id = struct.unpack('<B', packet_binary[i:i+1])[0]
charset_name = 'utf8'
i += 1
server_status = struct.unpack('<H', packet_binary[i:i+2])[0]
i += 2
capability flag的解析非常瑣碎,有興趣的讀者可參閱https://dev.mysql.com/doc/internals/en/capability-flags.html讀取capability的高2位字節,,本文不作過多解讀。
server_cap_upper = struct.unpack('<H', packet_binary[i:i+2])[0]
server_cap |= server_cap_upper << 16
i += 2
儘管聲明了len_auth_data,但是secure_password_authentication硬性規定了auth_data長度為20位元組,因此該欄位在本文沒有意義
len_auth_data = struct.unpack('<B', packet_binary[i:i+1])[0]
i += 1
跳過10位元組填充
i += 10
讀取auth_data剩餘的12位元組
auth_data += packet_binary[i:i+12]
i += 13
注2:官方文檔將第二部分的長度定義為13,但是改成packet_binary[i:i+13]會出錯,https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::AuthSwitchRequest。
獲取plugin name
CLIENT_PLUGIN_AUTH = 0x00080000
if server_cap & CLIENT_PLUGIN_AUTH:
server_end = packet_binary.find(b'\0', i)
auth_plugin_name = packet_binary[i:server_end].decode('latin1')
3、構造response packet
有兩種響應包,HandshakeResponse41和HandshakeResponse320,自從4.1版本後默認採用前者,包組成格式如下:
長度
類型
內容
4
byte
capability flags
4
byte
max size of a command packet that the client wants to send to the server(沒理解)
1
byte
connection character id
23
string
全部為0,暫未使用
以NUL結尾
string
username
以NUL結尾
string
auth response
以NUL結尾
string
database,以conn character id指定的字符集編碼
以NUL結尾
string
auth plugin name
conn_char='utf8'
通常客戶端會實現以下capbility
LONG_PASSWORD = 1
LONG_FLAG = 1 << 2
CLIENT_CONNECT_WITH_DB = 1 << 3
注3:本例在發起資料庫連接時會指定資料庫,因此必須設置CLIENT_CONNECT_WITH_DB標誌位,否則handshake會失敗,一開始遺漏了這一點,斷斷續續的多調試了好幾周
PROTOCOL_41 = 1 << 9
TRANSACTIONS = 1 << 13
SECURE_CONNECTION = 1 << 15
MULTI_STATEMENTS = 1 << 16
MULTI_RESULTS = 1 << 17
PLUGIN_AUTH = 1 << 19
PLUGIN_AUTH_LENENC_CLIENT_DATA = 1 << 21
直接賦值
client_flag = (
LONG_PASSWORD | LONG_FLAG | CLIENT_CONNECT_WITH_DB | PROTOCOL_41 | TRANSACTIONS
| SECURE_CONNECTION | MULTI_STATEMENTS | MULTI_RESULTS
| PLUGIN_AUTH | PLUGIN_AUTH_LENENC_CLIENT_DATA)
response_data = struct.pack('<i', client_flag)
沒理解這個欄位的作用,暫定為1
max_size = 1
response_data += struct.pack('<I', max_size)
conn_char_id = 33
response_data += struct.pack('<B', conn_char_id)
23位元組字符串,全部為0
response_data += struct.pack('<23s', b'')
username = 'renkun'.encode(conn_char)
response_data += username + b'\0'
password = '123456'
根據客戶端給出的資料庫密碼,和第2步解析得到的加密字符串,構造auth response。
secure_password_authentication採用如下加密算法:
SHA1( password ) XOR SHA1( "20-bytes random data from server" <concat> SHA1( SHA1( password ) ) )。
採用SHA1算法,對password進行加密,獲取其二進位結果stage1,對stage1再次加密,獲取其二進位結果stage2,然後對(auth_data + stage2)進行第三次加密,獲取stage3,stage1、stage2、stage3長度一樣,都是20位元組。
此時取出stage1和stage3的每個二進位字符,挨個進行^(按位異或)運算,最後得出一個二進位字符串authresp。
from functools import partial
import hashlib
def scramble(password, auth_data):
sha_new = partial(hashlib.new, 'sha1')
stage1 = sha_new(password.encode('latin1')).digest()
stage2 = sha_new(stage1).digest()
s = sha_new()
s.update(auth_data)
s.update(stage2)
stage3 = s.digest()
auth_response = b''
for i in range(len(stage3)):
x =(struct.unpack('B', stage3[i:i+1])[0] ^ struct.unpack('B', stage1[i:i+1])[0])
auth_response += struct.pack('B', x)
return auth_response
auth_response = scramble(password, auth_data)
mysqld在握手包的capability flag中可能會定義一些標誌性,比如PLUGIN_AUTH_LENENC_CLIENT_DATA和SECURE_CONNECTION,這些標誌位會影響響應包的存儲格式,客戶端需要進行逐一判斷。
PLUGIN_AUTH_LENENC_CLIENT_DATA = 0x00200000
SECURE_CONNECTION = 0x00008000
if server_cap & PLUGIN_AUTH_LENENC_CLIENT_DATA:
response_data += struct.pack('<B', len(auth_response)) + auth_response
elif server_cap & SECURE_CONNECTION:
response_data += struct.pack('<B', len(auth_response)) + auth_response
else:
response_data += auth_response + b'\0'
剩下的欄位都需要先判斷capbility的標誌位
CLIENT_CONNECT_WITH_DB = 0x00000008
database = 'test'
if server_cap & CLIENT_CONNECT_WITH_DB:
database = database.encode(conn_char)
response_data += database + b'\0'
CLIENT_PLUGIN_AUTH = 0x00080000
if server_cap & CLIENT_PLUGIN_AUTH:
response_data += auth_plugin_name.encode('ascii') + b'\0'
至此,響應包已經構造完畢,要將其封裝成chunk發送給mysqld,上面提到過,chunk header = len(3位元組) + seq_id(1位元組)
chunk = struct.pack('<I', len(response_data))[:3] + struct.pack('<B', next_seq_id)
chunk += response_data
next_seq_id = (next_seq_id + 1) % 256
sock.sendall(chunk)
mysqld收到客戶端發送的handshake response packet,有3種響應結果:OK_packet、ERR_packet,或者是AuthSwitchRequest_packet,本例只關注前兩種。
packet_data = read_packet()
def is_ok_packet(packet):
if packet[0:1] == '\x00' and len(packet) >= 7:
return True
else:
return False
def is_err_packet(packet):
if packet[0:1] == '\xff':
print "error code : %d" % struct.unpack('<H', packet[1:3])[0]
server_end = packet.find('\0', 9)
print "error message : %s" % packet[3:server_end]
return True
else:
return False
if is_ok_packet(packet_data):
print "succeed connect to mysql"
elif is_err_packet(packet_data):
print "failed connect to mysql"
我的測試版本是5.6.29,上述代碼能順利創建一個資料庫連接,以用戶renkun登陸,連接到test庫。
註:ACMUG收錄技術文章版權屬於原作者本人所有。如有疑問,請聯繫作者。
看完轉發,手留餘香。關注我們,一起進步。
關注ACMUG公眾號,參與社區活動,交流開源技術,分享學習心得,一起共同進步。