日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python连接redis哨兵_Python redis.sentinel方法代码示例

發(fā)布時(shí)間:2025/3/8 python 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python连接redis哨兵_Python redis.sentinel方法代码示例 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本文整理匯總了Python中redis.sentinel方法的典型用法代碼示例。如果您正苦于以下問題:Python redis.sentinel方法的具體用法?Python redis.sentinel怎么用?Python redis.sentinel使用的例子?那么恭喜您, 這里精選的方法代碼示例或許可以為您提供幫助。您也可以進(jìn)一步了解該方法所在模塊redis的用法示例。

在下文中一共展示了redis.sentinel方法的14個(gè)代碼示例,這些例子默認(rèn)根據(jù)受歡迎程度排序。您可以為喜歡或者感覺有用的代碼點(diǎn)贊,您的評(píng)價(jià)將有助于我們的系統(tǒng)推薦出更棒的Python代碼示例。

示例1: _get_service_names

?點(diǎn)贊 6

?

# 需要導(dǎo)入模塊: import redis [as 別名]

# 或者: from redis import sentinel [as 別名]

def _get_service_names(self): # type: () -> List[six.text_type]

"""

Get a list of service names from Sentinel. Tries Sentinel hosts until one succeeds; if none succeed,

raises a ConnectionError.

:return: the list of service names from Sentinel.

"""

master_info = None

connection_errors = []

for sentinel in self._sentinel.sentinels:

# Unfortunately, redis.sentinel.Sentinel does not support sentinel_masters, so we have to step

# through all of its connections manually

try:

master_info = sentinel.sentinel_masters()

break

except (redis.ConnectionError, redis.TimeoutError) as e:

connection_errors.append('Failed to connect to {} due to error: "{}".'.format(sentinel, e))

continue

if master_info is None:

raise redis.ConnectionError(

'Could not get master info from Sentinel\n{}:'.format('\n'.join(connection_errors))

)

return list(master_info.keys())

開發(fā)者ID:eventbrite,項(xiàng)目名稱:pysoa,代碼行數(shù):25,

示例2: _get_connection

?點(diǎn)贊 6

?

# 需要導(dǎo)入模塊: import redis [as 別名]

# 或者: from redis import sentinel [as 別名]

def _get_connection(self, index): # type: (int) -> redis.StrictRedis

if not 0 <= index < self._ring_size:

raise ValueError(

'There are only {count} hosts, but you asked for connection {index}.'.format(

count=self._ring_size,

index=index,

)

)

for i in range(self._sentinel_failover_retries + 1):

try:

return self._get_master_client_for(self._services[index])

except redis.sentinel.MasterNotFoundError:

self.reset_clients() # make sure we reach out to get master info again on next call

_logger.warning('Redis master not found, so resetting clients (failover?)')

if i != self._sentinel_failover_retries:

self._get_counter('backend.sentinel.master_not_found_retry').increment()

time.sleep((2 ** i + random.random()) / 4.0)

raise CannotGetConnectionError('Master not found; gave up reloading master info after failover.')

開發(fā)者ID:eventbrite,項(xiàng)目名稱:pysoa,代碼行數(shù):22,

示例3: setUp

?點(diǎn)贊 5

?

# 需要導(dǎo)入模塊: import redis [as 別名]

# 或者: from redis import sentinel [as 別名]

def setUp(self):

""" Clear all spans before a test run """

self.recorder = tracer.recorder

self.recorder.clear_spans()

# self.sentinel = Sentinel([(testenv['redis_host'], 26379)], socket_timeout=0.1)

# self.sentinel_master = self.sentinel.discover_master('mymaster')

# self.client = redis.Redis(host=self.sentinel_master[0])

self.client = redis.Redis(host=testenv['redis_host'])

開發(fā)者ID:instana,項(xiàng)目名稱:python-sensor,代碼行數(shù):12,

示例4: setUp

?點(diǎn)贊 5

?

# 需要導(dǎo)入模塊: import redis [as 別名]

# 或者: from redis import sentinel [as 別名]

def setUp(self):

pymemcache.client.Client(('localhost', 22122)).flush_all()

redis.from_url('unix:///tmp/limits.redis.sock').flushall()

redis.from_url("redis://localhost:7379").flushall()

redis.from_url("redis://:sekret@localhost:7389").flushall()

redis.sentinel.Sentinel([

("localhost", 26379)

]).master_for("localhost-redis-sentinel").flushall()

rediscluster.RedisCluster("localhost", 7000).flushall()

if RUN_GAE:

from google.appengine.ext import testbed

tb = testbed.Testbed()

tb.activate()

tb.init_memcache_stub()

開發(fā)者ID:alisaifee,項(xiàng)目名稱:limits,代碼行數(shù):16,

示例5: test_storage_check

?點(diǎn)贊 5

?

# 需要導(dǎo)入模塊: import redis [as 別名]

# 或者: from redis import sentinel [as 別名]

def test_storage_check(self):

self.assertTrue(

storage_from_string("memory://").check()

)

self.assertTrue(

storage_from_string("redis://localhost:7379").check()

)

self.assertTrue(

storage_from_string("redis://:sekret@localhost:7389").check()

)

self.assertTrue(

storage_from_string(

"redis+unix:///tmp/limits.redis.sock"

).check()

)

self.assertTrue(

storage_from_string("memcached://localhost:22122").check()

)

self.assertTrue(

storage_from_string(

"memcached://localhost:22122,localhost:22123"

).check()

)

self.assertTrue(

storage_from_string(

"memcached:///tmp/limits.memcached.sock"

).check()

)

self.assertTrue(

storage_from_string(

"redis+sentinel://localhost:26379",

service_name="localhost-redis-sentinel"

).check()

)

self.assertTrue(

storage_from_string("redis+cluster://localhost:7000").check()

)

if RUN_GAE:

self.assertTrue(storage_from_string("gaememcached://").check())

開發(fā)者ID:alisaifee,項(xiàng)目名稱:limits,代碼行數(shù):41,

示例6: setUp

?點(diǎn)贊 5

?

# 需要導(dǎo)入模塊: import redis [as 別名]

# 或者: from redis import sentinel [as 別名]

def setUp(self):

self.storage_url = 'redis+sentinel://localhost:26379'

self.service_name = 'localhost-redis-sentinel'

self.storage = RedisSentinelStorage(

self.storage_url,

service_name=self.service_name

)

redis.sentinel.Sentinel([

("localhost", 26379)

]).master_for(self.service_name).flushall()

開發(fā)者ID:alisaifee,項(xiàng)目名稱:limits,代碼行數(shù):12,

示例7: setUp

?點(diǎn)贊 5

?

# 需要導(dǎo)入模塊: import redis [as 別名]

# 或者: from redis import sentinel [as 別名]

def setUp(self):

pymemcache.client.Client(('localhost', 22122)).flush_all()

redis.from_url("redis://localhost:7379").flushall()

redis.from_url("redis://:sekret@localhost:7389").flushall()

redis.sentinel.Sentinel([

("localhost", 26379)

]).master_for("localhost-redis-sentinel").flushall()

rediscluster.RedisCluster("localhost", 7000).flushall()

開發(fā)者ID:alisaifee,項(xiàng)目名稱:limits,代碼行數(shù):10,

示例8: test_fixed_window_with_elastic_expiry_redis_sentinel

?點(diǎn)贊 5

?

# 需要導(dǎo)入模塊: import redis [as 別名]

# 或者: from redis import sentinel [as 別名]

def test_fixed_window_with_elastic_expiry_redis_sentinel(self):

storage = RedisSentinelStorage(

"redis+sentinel://localhost:26379",

service_name="localhost-redis-sentinel"

)

limiter = FixedWindowElasticExpiryRateLimiter(storage)

limit = RateLimitItemPerSecond(10, 2)

self.assertTrue(all([limiter.hit(limit) for _ in range(0, 10)]))

time.sleep(1)

self.assertFalse(limiter.hit(limit))

time.sleep(1)

self.assertFalse(limiter.hit(limit))

self.assertEqual(limiter.get_window_stats(limit)[1], 0)

開發(fā)者ID:alisaifee,項(xiàng)目名稱:limits,代碼行數(shù):15,

示例9: get_connection

?點(diǎn)贊 5

?

# 需要導(dǎo)入模塊: import redis [as 別名]

# 或者: from redis import sentinel [as 別名]

def get_connection(self, is_read_only=False) -> redis.StrictRedis:

"""

Gets a StrictRedis connection for normal redis or for redis sentinel

based upon redis mode in configuration.

:type is_read_only: bool

:param is_read_only: In case of redis sentinel, it returns connection

to slave

:return: Returns a StrictRedis connection

"""

if self.connection is not None:

return self.connection

if self.is_sentinel:

kwargs = dict()

if self.password:

kwargs["password"] = self.password

sentinel = Sentinel([(self.host, self.port)], **kwargs)

if is_read_only:

connection = sentinel.slave_for(self.sentinel_service,

decode_responses=True)

else:

connection = sentinel.master_for(self.sentinel_service,

decode_responses=True)

else:

connection = redis.StrictRedis(host=self.host, port=self.port,

decode_responses=True,

password=self.password)

self.connection = connection

return connection

開發(fā)者ID:biplap-sarkar,項(xiàng)目名稱:pylimit,代碼行數(shù):33,

示例10: get_atomic_connection

?點(diǎn)贊 5

?

# 需要導(dǎo)入模塊: import redis [as 別名]

# 或者: from redis import sentinel [as 別名]

def get_atomic_connection(self) -> Pipeline:

"""

Gets a Pipeline for normal redis or for redis sentinel based upon

redis mode in configuration

:return: Returns a Pipeline object

"""

return self.get_connection().pipeline(True)

開發(fā)者ID:biplap-sarkar,項(xiàng)目名稱:pylimit,代碼行數(shù):10,

示例11: __init__

?點(diǎn)贊 5

?

# 需要導(dǎo)入模塊: import redis [as 別名]

# 或者: from redis import sentinel [as 別名]

def __init__(self):

if self.REDIS_SENTINEL:

sentinels = [tuple(s.split(':')) for s in self.REDIS_SENTINEL.split(';')]

self._sentinel = redis.sentinel.Sentinel(sentinels,

db=self.REDIS_SENTINEL_DB,

socket_timeout=0.1)

else:

self._redis = redis.Redis.from_url(self.rewrite_redis_url())

開發(fā)者ID:ybrs,項(xiàng)目名稱:single-beat,代碼行數(shù):10,

示例12: _get_master_client_for

?點(diǎn)贊 5

?

# 需要導(dǎo)入模塊: import redis [as 別名]

# 或者: from redis import sentinel [as 別名]

def _get_master_client_for(self, service_name): # type: (six.text_type) -> redis.StrictRedis

if service_name not in self._master_clients:

self._get_counter('backend.sentinel.populate_master_client').increment()

self._master_clients[service_name] = self._sentinel.master_for(service_name)

master_address = self._master_clients[service_name].connection_pool.get_master_address()

_logger.info('Sentinel master address: {}'.format(master_address))

return self._master_clients[service_name]

開發(fā)者ID:eventbrite,項(xiàng)目名稱:pysoa,代碼行數(shù):10,

示例13: _get_redis_connection

?點(diǎn)贊 4

?

# 需要導(dǎo)入模塊: import redis [as 別名]

# 或者: from redis import sentinel [as 別名]

def _get_redis_connection(self, group, shard):

"""

Create and return a Redis Connection for the given group

Returns:

redis.StrictRedis: The Redis Connection

Raises:

Exception: Passes through any exceptions that happen in trying to get the connection pool

"""

redis_group = self.__config.redis_urls_by_group[group][shard]

self.__logger.info(u'Attempting to connect to Redis for group "{}", shard "{}", url "{}"'.format(group, shard,

redis_group))

if isinstance(redis_group, PanoptesRedisConnectionConfiguration):

redis_pool = redis.BlockingConnectionPool(host=redis_group.host,

port=redis_group.port,

db=redis_group.db,

password=redis_group.password)

redis_connection = redis.StrictRedis(connection_pool=redis_pool)

elif isinstance(redis_group, PanoptesRedisSentinelConnectionConfiguration):

sentinels = [(sentinel.host, sentinel.port) for sentinel in redis_group.sentinels]

self.__logger.info(u'Querying Redis Sentinels "{}" for group "{}", shard "{}"'.format(repr(redis_group),

group, shard))

sentinel = redis.sentinel.Sentinel(sentinels)

master = sentinel.discover_master(redis_group.master_name)

password_present = u'yes' if redis_group.master_password else u'no'

self.__logger.info(u'Going to connect to master "{}" ({}:{}, password: {}) for group "{}", shard "{}""'

.format(redis_group.master_name, master[0], master[1], password_present, group, shard))

redis_connection = sentinel.master_for(redis_group.master_name, password=redis_group.master_password)

else:

self.__logger.info(u'Unknown Redis configuration object type: {}'.format(type(redis_group)))

return

self.__logger.info(u'Successfully connected to Redis for group "{}", shard "{}", url "{}"'.format(group,

shard,

redis_group))

return redis_connection

開發(fā)者ID:yahoo,項(xiàng)目名稱:panoptes,代碼行數(shù):46,

示例14: __init__

?點(diǎn)贊 4

?

# 需要導(dǎo)入模塊: import redis [as 別名]

# 或者: from redis import sentinel [as 別名]

def __init__(

self,

hosts=None, # type: Optional[Iterable[Union[six.text_type, Tuple[six.text_type, int]]]]

connection_kwargs=None, # type: Dict[six.text_type, Any]

sentinel_services=None, # type: Iterable[six.text_type]

sentinel_failover_retries=0, # type: int

sentinel_kwargs=None, # type: Dict[six.text_type, Any]

):

# type: (...) -> None

# Master client caching

self._master_clients = {} # type: Dict[six.text_type, redis.StrictRedis]

# Master failover behavior

if sentinel_failover_retries < 0:

raise ValueError('sentinel_failover_retries must be >= 0')

self._sentinel_failover_retries = sentinel_failover_retries

connection_kwargs = dict(connection_kwargs) if connection_kwargs else {}

if 'socket_connect_timeout' not in connection_kwargs:

connection_kwargs['socket_connect_timeout'] = 5.0 # so that we don't wait indefinitely during failover

if 'socket_keepalive' not in connection_kwargs:

connection_kwargs['socket_keepalive'] = True

if (

connection_kwargs.pop('ssl', False) or 'ssl_certfile' in connection_kwargs

) and 'connection_class' not in connection_kwargs:

# TODO: Remove this when https://github.com/andymccurdy/redis-py/issues/1306 is released

connection_kwargs['connection_class'] = _SSLSentinelManagedConnection

sentinel_kwargs = dict(sentinel_kwargs) if sentinel_kwargs else {}

if 'socket_connect_timeout' not in sentinel_kwargs:

sentinel_kwargs['socket_connect_timeout'] = 5.0 # so that we don't wait indefinitely connecting to Sentinel

if 'socket_timeout' not in sentinel_kwargs:

sentinel_kwargs['socket_timeout'] = 5.0 # so that we don't wait indefinitely if a Sentinel goes down

if 'socket_keepalive' not in sentinel_kwargs:

sentinel_kwargs['socket_keepalive'] = True

self._sentinel = redis.sentinel.Sentinel(

self._convert_hosts(hosts),

sentinel_kwargs=sentinel_kwargs,

**connection_kwargs

)

if sentinel_services:

self._validate_service_names(sentinel_services)

self._services = list(sentinel_services) # type: List[six.text_type]

else:

self._services = self._get_service_names()

super(SentinelRedisClient, self).__init__(ring_size=len(self._services))

開發(fā)者ID:eventbrite,項(xiàng)目名稱:pysoa,代碼行數(shù):50,

注:本文中的redis.sentinel方法示例整理自Github/MSDocs等源碼及文檔管理平臺(tái),相關(guān)代碼片段篩選自各路編程大神貢獻(xiàn)的開源項(xiàng)目,源碼版權(quán)歸原作者所有,傳播和使用請(qǐng)參考對(duì)應(yīng)項(xiàng)目的License;未經(jīng)允許,請(qǐng)勿轉(zhuǎn)載。

總結(jié)

以上是生活随笔為你收集整理的python连接redis哨兵_Python redis.sentinel方法代码示例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。