Python操作redis(普通操作,连接池,封装)
生活随笔
收集整理的這篇文章主要介紹了
Python操作redis(普通操作,连接池,封装)
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
安裝python包
pip install redis==2.10.6普通操作
import redisabc = {"aa":{"bb":"cc"}} r = redis.Redis(host='192.168.2.139', port=6379, db=1, password="111111", decode_responses=True) try:r.ping() except TimeoutError:print('redis connection timeout') r.lpush("abccba", abc)連接池
#-*-coding:utf-8-*- import redis# 連接池連接使用,節(jié)省了每次連接用的時(shí)間 conn_pool = redis.ConnectionPool(host='localhost', port=6379, db=1, password="111111", decode_responses=True) # 第一個(gè)客戶端訪問 re_pool = redis.Redis(connection_pool=conn_pool, decode_responses=True) # 第二個(gè)客戶端訪問 re_pool2 = redis.Redis(connection_pool=conn_pool, decode_responses=True) # key value存儲(chǔ)到redis數(shù)據(jù)庫 try:re_pool.set('chinese1', 'hello_world')re_pool2.set('chinese2', 'hello_python') except Exception as e:print(e) # 根據(jù)key獲取存的數(shù)據(jù)的內(nèi)容 data_info = re_pool.get('chinese1') data_info2 = re_pool.get('chinese2') # 輸出從redis庫中取出來的數(shù)據(jù)的內(nèi)容 print(data_info) print(data_info2) # 獲取兩個(gè)連接的信息 id1 = re_pool.client_list() id2 = re_pool2.client_list() # 輸出兩個(gè)連接的id,判斷是否一致 print('re_pool_id{}======re_pool2_id{}'.format(id1[0]['id'], id2[0]['id']))?
實(shí)例封裝
#! /usr/bin/env python # -*- coding: utf-8 -*-import sys import time import loggingimport redislogger = logging.getLogger()class RedisDB(object):def __init__(self, host, port, password, db, list_name, ):self.host = hostself.port = portself.password = passwordself.db = dbself.list_name = list_nameself.lock = Falseself.redis_conn = None#self.redis_conn = self.get_redis_connect()#if self.redis_conn is None:# logger.error("Redis connect fail.")# sys.exit()def get_redis_connect(self):#connect_num = 5#while connect_num:while True:try:self.redis_conn = redis.Redis(self.host,self.port,self.db,self.password,decode_responses=True)self.redis_conn.ping()self.lock = Truebreakexcept Exception as e:logger.error("Redis connection timeout, waiting for reconnection.")#if connect_num > 0:# connect_num -= 1# continueself.lock = Falsetime.sleep(5)continuedef pinging(self):try:self.redis_conn.ping()return Trueexcept Exception as e:logger.error("Redis connection timeout")return Falsedef get_redis_data(self):try:data = self.redis_conn.rpop(self.list_name)return dataexcept Exception as e:logger.error("Get redis data fail.")return Nonedef send_result(self, result_list_name, result):try:self.redis_conn.lpush(result_list_name, result)return Trueexcept Exception as e:logger.error("Send ai result data to redis fail: {}.".format(e))return False調(diào)用
def thread_get_data():global redis_objtry:while True:if not redis_obj.lock:time.sleep(1)continueif thread_lock:continue#data = json.loads(redis_obj.get_redis_data())data = ast.literal_eval(line.strip()) if data is None:time.sleep(1)continueevent_type = data["event_type"]# dnsif event_type.lower() == "dns":。。。。。。# icmpelif event_type.lower() == "icmp":。。。。。。except Exception as e:#logger.error("Get redis data thread error:{0}, {1}".format(e, e.__traceback__.tb_lineno))print("Get redis data thread error:{0}, {1}".format(e, e.__traceback__.tb_lineno))?
總結(jié)
以上是生活随笔為你收集整理的Python操作redis(普通操作,连接池,封装)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: docker 启动mongodb
- 下一篇: Python三目运算符