日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

Python操作redis(普通操作,连接池,封装)

發(fā)布時(shí)間:2025/3/15 python 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python操作redis(普通操作,连接池,封装) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

安裝python包

pip install redis==2.10.6

普通操作

import redisabc = {"aa":{"bb":"cc"}} r = redis.Redis(host='192.168.2.139', port=6379, db=1, password="111111", decode_responses=True) try:r.ping() except TimeoutError:print('redis connection timeout') r.lpush("abccba", abc)

連接池

#-*-coding:utf-8-*- import redis# 連接池連接使用,節(jié)省了每次連接用的時(shí)間 conn_pool = redis.ConnectionPool(host='localhost', port=6379, db=1, password="111111", decode_responses=True) # 第一個(gè)客戶端訪問 re_pool = redis.Redis(connection_pool=conn_pool, decode_responses=True) # 第二個(gè)客戶端訪問 re_pool2 = redis.Redis(connection_pool=conn_pool, decode_responses=True) # key value存儲(chǔ)到redis數(shù)據(jù)庫 try:re_pool.set('chinese1', 'hello_world')re_pool2.set('chinese2', 'hello_python') except Exception as e:print(e) # 根據(jù)key獲取存的數(shù)據(jù)的內(nèi)容 data_info = re_pool.get('chinese1') data_info2 = re_pool.get('chinese2') # 輸出從redis庫中取出來的數(shù)據(jù)的內(nèi)容 print(data_info) print(data_info2) # 獲取兩個(gè)連接的信息 id1 = re_pool.client_list() id2 = re_pool2.client_list() # 輸出兩個(gè)連接的id,判斷是否一致 print('re_pool_id{}======re_pool2_id{}'.format(id1[0]['id'], id2[0]['id']))

?

實(shí)例封裝

#! /usr/bin/env python # -*- coding: utf-8 -*-import sys import time import loggingimport redislogger = logging.getLogger()class RedisDB(object):def __init__(self, host, port, password, db, list_name, ):self.host = hostself.port = portself.password = passwordself.db = dbself.list_name = list_nameself.lock = Falseself.redis_conn = None#self.redis_conn = self.get_redis_connect()#if self.redis_conn is None:# logger.error("Redis connect fail.")# sys.exit()def get_redis_connect(self):#connect_num = 5#while connect_num:while True:try:self.redis_conn = redis.Redis(self.host,self.port,self.db,self.password,decode_responses=True)self.redis_conn.ping()self.lock = Truebreakexcept Exception as e:logger.error("Redis connection timeout, waiting for reconnection.")#if connect_num > 0:# connect_num -= 1# continueself.lock = Falsetime.sleep(5)continuedef pinging(self):try:self.redis_conn.ping()return Trueexcept Exception as e:logger.error("Redis connection timeout")return Falsedef get_redis_data(self):try:data = self.redis_conn.rpop(self.list_name)return dataexcept Exception as e:logger.error("Get redis data fail.")return Nonedef send_result(self, result_list_name, result):try:self.redis_conn.lpush(result_list_name, result)return Trueexcept Exception as e:logger.error("Send ai result data to redis fail: {}.".format(e))return False

調(diào)用

def thread_get_data():global redis_objtry:while True:if not redis_obj.lock:time.sleep(1)continueif thread_lock:continue#data = json.loads(redis_obj.get_redis_data())data = ast.literal_eval(line.strip()) if data is None:time.sleep(1)continueevent_type = data["event_type"]# dnsif event_type.lower() == "dns":。。。。。。# icmpelif event_type.lower() == "icmp":。。。。。。except Exception as e:#logger.error("Get redis data thread error:{0}, {1}".format(e, e.__traceback__.tb_lineno))print("Get redis data thread error:{0}, {1}".format(e, e.__traceback__.tb_lineno))

?

總結(jié)

以上是生活随笔為你收集整理的Python操作redis(普通操作,连接池,封装)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。