註明: 此篇文章為投稿文字, 投稿人 閆樹爽, (資深程式設計師, 目前從事REDIS ,MONGODB ,以及資料庫運維自動化代碼工作)
在NOSQL 資料庫中的操作,與關係型資料庫不同的是,會一門程序對於NOSQL的資料庫操控是十分有利的,下面的內容是關於如何利用python程序來操作redis 集群的說明.
## 利用python操作redis集群
redis的cluster模式為大型應用中常用的方式,今天學習如何使用redis-py-cluster來操作redis集群
首先安裝redis-py-cluster
```
pip install redis-py-cluster
```
基本用法
```python
from rediscluster import RedisCluster
from string import ascii_letters
import random
conn = RedisCluster(host="127.0.0.1", port=6379, password='password')#創建連接
for i in range(10000):
key = ''.join(random.sample(ascii_letters, k=7))#創建一個隨機字符串作為key
# ex = random.randint(1, 60 * 60 * 24)
conn.set(key, i, ex=500)
print(i)
```
此時可以發現reidis中已經有了我們的數據
然而在現實使用中,redis的使用往往涉及高並發,每次都重新創建連接不是我們所建議的方式,我們可以使用連接池來創建連接,並通過多線程來進行訪問。
redis-py-cluster的官方文檔寫的比較簡單並沒有給出詳細的連接池使用方式,但是好在python能夠查看源碼,我們可以看到其中有一個ClusterConnectionPool類,這個從命名來看應該是連接池的。進源碼去看這個類是繼承自redis庫的ConnectionPool。
直接連接嘗試
```python
from rediscluster import ClusterBlockingConnectionPool,RedisCluster,ClusterConnectionPool
startup_nodes = [
{'host': '10.50.132.92', 'port': 6379},
{'host': '10.50.132.93', 'port': 6379},
{'host': '10.50.132.94', 'port': 6379},
]
pool = ClusterConnectionPool(startup_nodes=startup_nodes,password="xxxx")
```
## 處理報錯
會發現報錯(個別現象,原因下面說),如果未遇到錯誤可以跳過這小節
```
redis.exceptions.ResponseError: unknown command `CONFIG`, with args beginning with: `GET`, `cluster-require-full-coverage`,
```
原因是因為我將redis中的config命令rename掉了,但是在默認情況下,會檢查槽的完整性,所以會使用到config命令,但是config命令已經被我rename掉了啊,所以會報這個錯誤。解決方法有兩種:
1、修改自己的代碼,這個參數就是字面意思,跳過完整性檢查,這樣也就不會調用config命令了。
```python
pool = ClusterBlockingConnectionPool(startup_nodes=startup_nodes,password="xxxx",skip_full_coverage_check=True)
```
2、修改庫的代碼,這樣的好處是還是能夠檢查完整行,但是缺點是在進行項目遷移的時候還要再改一遍源碼,修改方法如下:
首先找到redis包中的client.py,一般在site-packages/redis/client.py
如果找不到可以從報錯的地方快速找到redis包的安裝路徑

找到這個1239行
```python
def config_get(self, pattern="*"):
"Return a dictionary of configuration based on the ``pattern``"
return self.execute_command('CONFIG GET', pattern)
```
修改這個CONFIG GET為rename後的命令就可以了
通過以上兩種方法,可以正常使用連接池了
## 連接池的使用
這個連接池的使用方法如下
```python
from rediscluster import ClusterBlockingConnectionPool,RedisCluster,ClusterConnectionPool
startup_nodes = [
{'host': '10.50.132.92', 'port': 6379},
{'host': '10.50.132.93', 'port': 6379},
{'host': '10.50.132.94', 'port': 6379},
]
pool = ClusterConnectionPool(startup_nodes=startup_nodes,password="xxxx")
client = RedisCluster(connection_pool=pool)
```
這裡的pool使用上面的方式就可以得到了一個client
## 實際使用
我們可以用多線程為例子,使用連接池來操作redis
```python
from rediscluster import ClusterBlockingConnectionPool,RedisCluster,ClusterConnectionPool
import threading
from string import ascii_letters
import random
startup_nodes = [
{'host': '10.50.132.92', 'port': 6379},
{'host': '10.50.132.93', 'port': 6379},
{'host': '10.50.132.94', 'port': 6379},
]
pool = ClusterConnectionPool(startup_nodes=startup_nodes,password="xxxx",max_connections=10)
threads=[]
lock = threading.Lock()
def test_thread(thread_id):
client = RedisCluster(connection_pool=pool)
for i in range(100):
key = ''.join(random.sample(ascii_letters, k=7))
client.set(key,random.randint(1,100),ex=100)
lock.acquire()
print(f"Thread-{thread_id}:processed {i+1} times")
lock.release()
#創建線程
for i in range(10):
threads.append(threading.Thread(target=test_thread,args=(i,)))
#開啟線程
for th in threads:
th.start()
```
此處我們的線程池裡面放10個連接,然後開啟10個線程,為了顯示的不太難看,我們在print的地方加了個鎖,我們期待這個樣能夠正常的10個線程同時向redis裡面寫數據,但是實際卻發現

會顯示有太多的連接,猜測應該是連接池中的連接不夠,所以我們調大max_connections參數為50,發現可以正常使用了,但是為什麼呢?
熟悉redis的同學可能一下就猜測到了原因。因為我們的集群模式,key鍵是要根據hash值來分配的,具體連接到那個我們插入之間是不知道的,所以在連接創建之前,客戶端也是不知道的,所以顯示出這個。在初始化函數中果然找到了相應的參數max_connections_per_node。我們將它設置為True。
這樣測試,我們將max_connections設置為10,同時將max_connections_per_node設置為True
這樣一來每個節點都有10個連接,我們可以開啟30個線程,如下
```python
from rediscluster import ClusterBlockingConnectionPool,RedisCluster,ClusterConnectionPool
import threading
from string import ascii_letters
import random
startup_nodes = [
{'host': '10.50.132.92', 'port': 6379},
{'host': '10.50.132.93', 'port': 6379},
{'host': '10.50.132.94', 'port': 6379},
]
pool = ClusterConnectionPool(startup_nodes=startup_nodes,password="xxx",max_connections=10,skip_full_coverage_check=True,max_connections_per_node=True)
threads=[]
lock = threading.Lock()
def test_thread(thread_id):
client = RedisCluster(connection_pool=pool)
for i in range(100):
key = ''.join(random.sample(ascii_letters, k=7))
client.set(key,random.randint(1,100),ex=100)
lock.acquire()
print(f"Thread-{thread_id}:processed {i+1} times")
lock.release()
#創建線程
for i in range(30):
threads.append(threading.Thread(target=test_thread,args=(i,)))
#開啟線程
for th in threads:
th.start()
```
但是~~~,還是不行,跑了一段時間以後有的線程會報錯

簡單思考下,因為這樣雖然開啟了這麼多的pool,但是對於30個線程來說,仍然有可能引發衝突,因為我們設置的是每個節點10個連接
但是30個線程在同一時間可能有11個訪問到了同一個節點,所以會產生這個錯誤。所以這裡應該改成max_connections=30
但是如果我們連接池只想創建20個呢?我們需要自己加鎖判斷嗎?這樣是否太麻煩了?
of course not~
在上面大家應該注意到,我在引入的時候還另外引入了一個ClusterBlockingConnectionPool,這個注釋中寫的是
```
Thread-safe blocking connection pool for Redis Cluster::
```
這是一個線程安全的連接池,繼承自ClusterConnectionPool,
使用我們僅需要將ClusterConnectionPool替換為ClusterBlockingConnectionPool即可。
至此已經可以正常使用redis連接池。
## 時間對比:
多線程連接池方式30個線程每個線程插入100個共3000次數據耗時5.1秒
單線程訪問插入1000個數據用時49秒,如果3000個應該在150秒左右,大概差了不到30倍,符合我們開的30個線程
## 空閒超時
在沒有經過測試之前,我有一個擔憂就是服務端設置了空閒超時,我們這個python客戶端會被服務端斷開,那麼單機還好,因為每次是單獨的創建連接,那麼連接池呢?
我們可以在test_thread中加入一個延時,然後過一段時間讓連接空閒並超時以後再訪問。經過測試,還是能連接的。