一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

腳本之家,腳本語言編程技術及教程分享平臺!
分類導航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服務器之家 - 腳本之家 - Python - Python與Redis的連接教程

Python與Redis的連接教程

2020-06-10 09:50腳本之家 Python

這篇文章主要介紹了Python與Redis的連接教程,Redis是一個高性能的基于內存的數據庫,需要的朋友可以參考下

今天在寫zabbix storm job監控腳本的時候用到了python的redis模塊,之前也有用過,但是沒有過多的了解,今天看了下相關的api和源碼,看到有ConnectionPool的實現,這里簡單說下。
在ConnectionPool之前,如果需要連接redis,我都是用StrictRedis這個類,在源碼中可以看到這個類的具體解釋:
 
redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an
implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server

使用的方法:
 

?
1
2
r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx)
r.xxxx()

有了ConnectionPool這個類之后,可以使用如下方法
 

?
1
2
pool = redis.ConnectionPool(host=xxx, port=xxx, db=xxxx)
r = redis.Redis(connection_pool=pool)

這里Redis是StrictRedis的子類
簡單分析如下:
在StrictRedis類的__init__方法中,可以初始化connection_pool這個參數,其對應的是一個ConnectionPool的對象:
 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class StrictRedis(object):
........
  def __init__(self, host='localhost', port=6379,
         db=0, password=None, socket_timeout=None,
         socket_connect_timeout=None,
         socket_keepalive=None, socket_keepalive_options=None,
         connection_pool=None, unix_socket_path=None,
         encoding='utf-8', encoding_errors='strict',
         charset=None, errors=None,
         decode_responses=False, retry_on_timeout=False,
         ssl=False, ssl_keyfile=None, ssl_certfile=None,
         ssl_cert_reqs=None, ssl_ca_certs=None):
     if not connection_pool:
       ..........
       connection_pool = ConnectionPool(**kwargs)
     self.connection_pool = connection_pool

在StrictRedis的實例執行具體的命令時會調用execute_command方法,這里可以看到具體實現是從連接池中獲取一個具體的連接,然后執行命令,完成后釋放連接:

 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# COMMAND EXECUTION AND PROTOCOL PARSING
def execute_command(self, *args, **options):
  "Execute a command and return a parsed response"
  pool = self.connection_pool
  command_name = args[0]
  connection = pool.get_connection(command_name, **options) #調用ConnectionPool.get_connection方法獲取一個連接
  try:
    connection.send_command(*args) #命令執行,這里為Connection.send_command
    return self.parse_response(connection, command_name, **options)
  except (ConnectionError, TimeoutError) as e:
    connection.disconnect()
    if not connection.retry_on_timeout and isinstance(e, TimeoutError):
      raise
    connection.send_command(*args)
    return self.parse_response(connection, command_name, **options)
  finally:
    pool.release(connection) #調用ConnectionPool.release釋放連接

在來看看ConnectionPool類:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class ConnectionPool(object):
    ...........
  def __init__(self, connection_class=Connection, max_connections=None,
         **connection_kwargs):  #類初始化時調用構造函數
    max_connections = max_connections or 2 ** 31
    if not isinstance(max_connections, (int, long)) or max_connections < 0: #判斷輸入的max_connections是否合法
      raise ValueError('"max_connections" must be a positive integer')
    self.connection_class = connection_class #設置對應的參數
    self.connection_kwargs = connection_kwargs
    self.max_connections = max_connections
    self.reset() #初始化ConnectionPool 時的reset操作
  def reset(self):
    self.pid = os.getpid()
    self._created_connections = 0 #已經創建的連接的計數器
    self._available_connections = []  #聲明一個空的數組,用來存放可用的連接
    self._in_use_connections = set() #聲明一個空的集合,用來存放已經在用的連接
    self._check_lock = threading.Lock()
.......
  def get_connection(self, command_name, *keys, **options): #在連接池中獲取連接的方法
    "Get a connection from the pool"
    self._checkpid()
    try:
      connection = self._available_connections.pop() #獲取并刪除代表連接的元素,在第一次獲取connectiong時,因為_available_connections是一個空的數組,
      會直接調用make_connection方法
    except IndexError:
      connection = self.make_connection()
    self._in_use_connections.add(connection)  #向代表正在使用的連接的集合中添加元素
    return connection 
  def make_connection(self): #在_available_connections數組為空時獲取連接調用的方法
    "Create a new connection"
    if self._created_connections >= self.max_connections:  #判斷創建的連接是否已經達到最大限制,max_connections可以通過參數初始化
      raise ConnectionError("Too many connections")
    self._created_connections += 1  #把代表已經創建的連接的數值+1
    return self.connection_class(**self.connection_kwargs)   #返回有效的連接,默認為Connection(**self.connection_kwargs)
  def release(self, connection): #釋放連接,鏈接并沒有斷開,只是存在鏈接池中
    "Releases the connection back to the pool"
    self._checkpid()
    if connection.pid != self.pid:
      return
    self._in_use_connections.remove(connection)  #從集合中刪除元素
    self._available_connections.append(connection) #并添加到_available_connections 的數組中
  def disconnect(self): #斷開所有連接池中的鏈接
    "Disconnects all connections in the pool"
    all_conns = chain(self._available_connections,
             self._in_use_connections)
    for connection in all_conns:
      connection.disconnect()

execute_command最終調用的是Connection.send_command方法,關閉鏈接為 Connection.disconnect方法,而Connection類的實現:
 

?
1
2
3
4
5
6
7
class Connection(object):
  "Manages TCP communication to and from a Redis server"
  def __del__(self):  #對象刪除時的操作,調用disconnect釋放連接
    try:
      self.disconnect()
    except Exception:
      pass

核心的鏈接建立方法是通過socket模塊實現:

 
   

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def _connect(self):
    err = None
    for res in socket.getaddrinfo(self.host, self.port, 0,
                   socket.SOCK_STREAM):
      family, socktype, proto, canonname, socket_address = res
      sock = None
      try:
        sock = socket.socket(family, socktype, proto)
        # TCP_NODELAY
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        # TCP_KEEPALIVE
        if self.socket_keepalive:  #構造函數中默認 socket_keepalive=False,因此這里默認為短連接
          sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
          for k, v in iteritems(self.socket_keepalive_options):
            sock.setsockopt(socket.SOL_TCP, k, v)
        # set the socket_connect_timeout before we connect
        sock.settimeout(self.socket_connect_timeout) #構造函數中默認socket_connect_timeout=None,即連接為blocking的模式
        # connect
        sock.connect(socket_address)
        # set the socket_timeout now that we're connected
        sock.settimeout(self.socket_timeout) #構造函數中默認socket_timeout=None
        return sock
      except socket.error as _:
        err = _
        if sock is not None:
          sock.close()
.....

關閉鏈接的方法:
 

?
1
2
3
4
5
6
7
8
9
10
11
def disconnect(self):
  "Disconnects from the Redis server"
  self._parser.on_disconnect()
  if self._sock is None:
    return
  try:
    self._sock.shutdown(socket.SHUT_RDWR) #先shutdown再close
    self._sock.close()
  except socket.error:
    pass
  self._sock = None

       
可以小結如下
1)默認情況下每創建一個Redis實例都會構造出一個ConnectionPool實例,每一次訪問redis都會從這個連接池得到一個連接,操作完成后會把該連接放回連接池(連接并沒有釋放),可以構造一個統一的ConnectionPool,在創建Redis實例時,可以將該ConnectionPool傳入,那么后續的操作會從給定的ConnectionPool獲得連接,不會再重復創建ConnectionPool。
2)默認情況下沒有設置keepalive和timeout,建立的連接是blocking模式的短連接。
3)不考慮底層tcp的情況下,連接池中的連接會在ConnectionPool.disconnect中統一銷毀。

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 亚洲欧美日韩中文高清一 | brazzersvideo欧美最新 | 扒开双腿羞辱调教play视频 | 国产一区二区三区在线观看视频 | 91麻豆精品国产 | 欧美日韩三区 | 色综合网天天综合色中文男男 | 色批网站www | 欧美sex另类孕妇 | 欧美se图| 亚洲图片 自拍偷拍 | 国产原创精品 | 99成人国产精品视频 | 国产大秀视频一区二区三区 | 污小说在线阅读 | 色就色欧美综合偷拍区a | 亚洲天堂成人在线观看 | 91免费高清无砖码区 | 国产精品久久久久久吹潮 | 欧美在线视频 一区二区 | 双性总裁被调教1v1 双性双根 | 我与白丝同桌的故事h文 | 91亚洲精品久久91综合 | 密臀tv | 亚洲精品视频一区 | 美女班主任下面好爽好湿好紧 | 波多野结衣中文字幕在线 | 女教师巨大乳孔中文字幕免费 | 99久久精品免费看国产情侣 | 国产成人欧美视频在线 | 美女脱得一二净无内裤全身的照片 | heyzo在线观看 | 四虎永久在线精品免费影视 | 久久综合久综合久久鬼色 | 甜宠巨肉h文1v1校园 | 男人天堂网www | 日韩精品福利视频一区二区三区 | 小辣椒精品福利视频导航 | 车上小婕子系列辣文小说 | 国产综合亚洲欧美日韩一区二区 | 亚洲男gay同性同志 亚洲免费在线看 |