日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > python >内容正文

python

python locust 时间戳过期_Locust源码分析之runners.py模块(6)

發(fā)布時(shí)間:2023/12/20 python 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python locust 时间戳过期_Locust源码分析之runners.py模块(6) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

runners.py模塊是控制locust運(yùn)行的核心功能,控制著性能測試的運(yùn)行機(jī)制。包含了本地運(yùn)行,分布式運(yùn)行等。

主要由LocustRunner,LocalLocustRunner,DistributedLocustRunner,MasterLocustRunner以及SlaveLocustRunner三個(gè)類組成。其中LocustRunner類是其他類的基類,MasterLocustRunner和SlaveLocustRunner是DistributedLocustRunner的拓展類。

LocustRunner

LocustRunner是其余所有runner類的基類,也是整個(gè)locust中運(yùn)行的核心代碼

class LocustRunner(object):

# 實(shí)例化LocustRunner時(shí),需要傳入兩個(gè)參數(shù):locust_classes和options

def __init__(self, locust_classes, options):

self.options = options # 命令行中傳入的參數(shù)

self.locust_classes = locust_classes # locust類,task任務(wù)列表

self.hatch_rate = options.hatch_rate # 命令行參數(shù):每秒啟動(dòng)數(shù)

self.num_clients = options.num_clients # 命令行參數(shù):并發(fā)數(shù)

self.host = options.host # 命令行參數(shù):被壓服務(wù)器地址

self.locusts = Group() #協(xié)程組:gevent.pool.Group()

self.greenlet = self.locusts

self.state = STATE_INIT # 壓測狀態(tài),初始狀態(tài)為“READY”

self.hatching_greenlet = None

self.exceptions = {}

self.stats = global_stats # 在stats模塊中,有一個(gè)全局變量global_stats用于存儲(chǔ)當(dāng)前狀態(tài)

# 注冊(cè)監(jiān)聽器,當(dāng)收到reset_stats指令時(shí)重置當(dāng)前性能數(shù)據(jù)stats狀態(tài)

def on_hatch_complete(user_count):

self.state = STATE_RUNNING

if self.options.reset_stats:

logger.info("Resetting stats\n")

self.stats.reset_all()

events.hatch_complete += on_hatch_complete

# 屬性request_stats:返回當(dāng)前性能指標(biāo)狀態(tài)

@property

def request_stats(self):

return self.stats.entries

# 屬性errors:返回當(dāng)前錯(cuò)誤信息

@property

def errors(self):

return self.stats.errors

# 屬性u(píng)ser_count,返回用戶并發(fā)數(shù)

@property

def user_count(self):

return len(self.locusts)

def weight_locusts(self, amount, stop_timeout = None):

"""

根據(jù)權(quán)重分發(fā)各個(gè)Locust類占有的并發(fā)數(shù)量bucket,amount為總并發(fā)數(shù)

"""

bucket = []

# 計(jì)算權(quán)重之和

weight_sum = sum((locust.weight for locust in self.locust_classes if locust.task_set))

for locust in self.locust_classes:

if not locust.task_set:

warnings.warn("Notice: Found Locust class (%s) got no task_set. Skipping..." % locust.__name__)

continue

if self.host is not None:

locust.host = self.host

if stop_timeout is not None:

locust.stop_timeout = stop_timeout

# 根據(jù)每個(gè)locust請(qǐng)求的權(quán)重計(jì)算所占的比例

percent = locust.weight / float(weight_sum)

# 計(jì)算出每個(gè)locust請(qǐng)求需要多少個(gè)并發(fā)數(shù)

num_locusts = int(round(amount * percent))

bucket.extend([locust for x in xrange(0, num_locusts)])

return bucket

def spawn_locusts(self, spawn_count=None, stop_timeout=None, wait=False):

"""

執(zhí)行壓力測試并發(fā)任務(wù)

spawn_count: 并發(fā)數(shù)

stop_timeout:超時(shí)時(shí)間

wait: task任務(wù)執(zhí)行間隔

"""

# 如果沒有傳入spawn_count參數(shù)則使用命令行傳入的并發(fā)數(shù)

if spawn_count is None:

spawn_count = self.num_clients

#獲取任務(wù)并發(fā)數(shù)

bucket = self.weight_locusts(spawn_count, stop_timeout)

spawn_count = len(bucket)

#如果是首次啟動(dòng)/重啟性能測試,并發(fā)數(shù)等于傳入的spawn_count

if self.state == STATE_INIT or self.state == STATE_STOPPED:

self.state = STATE_HATCHING

self.num_clients = spawn_count

else: #如果當(dāng)前性能測試已經(jīng)啟動(dòng),則疊加spawn_count并發(fā)數(shù)

self.num_clients += spawn_count

logger.info("Hatching and swarming %i clients at the rate %g clients/s..." % (spawn_count, self.hatch_rate))

#獲取每一個(gè)task任務(wù)

occurence_count = dict([(l.__name__, 0) for l in self.locust_classes])

def hatch():

sleep_time = 1.0 / self.hatch_rate #用戶并發(fā)間隔,為每秒新增請(qǐng)求數(shù)的倒數(shù)

while True:

if not bucket:

# 當(dāng)bucket為空時(shí),表示增壓已經(jīng)完成

logger.info("All locusts hatched: %s" % ", ".join(["%s: %d" % (name, count) for name, count in six.iteritems(occurence_count)]))

events.hatch_complete.fire(user_count=self.num_clients) #任務(wù)條目全部執(zhí)行完,hatch_complete觸發(fā)相應(yīng)的事件處理Hook

return

# 當(dāng)bucket不為空時(shí),表示仍然需要繼續(xù)增加壓力。

locust = bucket.pop(random.randint(0, len(bucket)-1)) #從并發(fā)任務(wù)中隨機(jī)抽取一個(gè)執(zhí)行

occurence_count[locust.__name__] += 1 #將被執(zhí)行的任務(wù)+1

def start_locust(_):

try:

locust().run(runner=self) #執(zhí)行任務(wù)

except GreenletExit:

pass

new_locust = self.locusts.spawn(start_locust, locust)

if len(self.locusts) % 10 == 0:

logger.debug("%i locusts hatched" % len(self.locusts))

gevent.sleep(sleep_time)

hatch() #執(zhí)行壓力測試

if wait: #如果添加了wait參數(shù),則暫停所有的locust

self.locusts.join()

logger.info("All locusts dead\n")

def kill_locusts(self, kill_count):

"""

終止kill_count指定的并發(fā)任務(wù),并從當(dāng)前的Group中減少并發(fā)數(shù)目

"""

bucket = self.weight_locusts(kill_count)

kill_count = len(bucket)

self.num_clients -= kill_count

logger.info("Killing %i locusts" % kill_count)

dying = []

for g in self.locusts:

for l in bucket:

if l == g.args[0]:

dying.append(g)

bucket.remove(l)

break

for g in dying:

self.locusts.killone(g)

events.hatch_complete.fire(user_count=self.num_clients)

def start_hatching(self, locust_count=None, hatch_rate=None, wait=False):

"""

啟動(dòng)性能測試

locust_count:并發(fā)數(shù)量

hatch_rate:每秒啟動(dòng)并發(fā)數(shù)

"""

if self.state != STATE_RUNNING and self.state != STATE_HATCHING:

self.stats.clear_all() # 清空之前存儲(chǔ)的性能測試狀態(tài)數(shù)據(jù)

self.stats.start_time = time() # 記錄當(dāng)前時(shí)間為開始時(shí)間

self.exceptions = {} # exception信息清空

events.locust_start_hatching.fire() # 觸發(fā)start_hatching事件

# 動(dòng)態(tài)修改當(dāng)前并發(fā)的用戶

if self.state != STATE_INIT and self.state != STATE_STOPPED: # 當(dāng)前的狀態(tài)不是hatching和running時(shí)

self.state = STATE_HATCHING # 當(dāng)前狀態(tài)不是ready或stop時(shí),將當(dāng)前的狀態(tài)設(shè)置為hatching

if self.num_clients > locust_count: # 如果目前的并發(fā)數(shù)超過需要的并發(fā)數(shù),則kill多余的并發(fā)數(shù)

kill_count = self.num_clients - locust_count

self.kill_locusts(kill_count)

elif self.num_clients < locust_count:

# 如果當(dāng)前并發(fā)數(shù)小于實(shí)際需要的并發(fā)數(shù),根據(jù)設(shè)置的每秒啟動(dòng)數(shù)量來繼續(xù)創(chuàng)建缺少的并發(fā)數(shù)

if hatch_rate:

self.hatch_rate = hatch_rate

spawn_count = locust_count - self.num_clients

self.spawn_locusts(spawn_count=spawn_count)

else: #如果并發(fā)數(shù)和實(shí)際需求的并發(fā)數(shù)相同,則表示壓力測試完成

events.hatch_complete.fire(user_count=self.num_clients)

# 如果當(dāng)前的狀態(tài)還是ready或stop時(shí),則調(diào)用spawn_locusts需要傳遞wait參數(shù)

else:

if hatch_rate:

self.hatch_rate = hatch_rate

if locust_count is not None:

self.spawn_locusts(locust_count, wait=wait)

else:

self.spawn_locusts(wait=wait)

def stop(self):

# 停止壓測時(shí),如果當(dāng)前還在增壓狀態(tài)下,首先需要停止增壓

if self.hatching_greenlet and not self.hatching_greenlet.ready():

self.hatching_greenlet.kill(block=True)

# 停止當(dāng)前的所有施壓單元并修改狀態(tài)為stop

self.locusts.kill(block=True)

self.state = STATE_STOPPED

events.locust_stop_hatching.fire()

def quit(self):

# 退出時(shí)首先調(diào)用stop,再停止所有g(shù)reenlet

self.stop()

self.greenlet.kill(block=True)

def log_exception(self, node_id, msg, formatted_tb):

# 記錄異常值

key = hash(formatted_tb)

row = self.exceptions.setdefault(key, {"count": 0, "msg": msg, "traceback": formatted_tb, "nodes": set()})

row["count"] += 1

row["nodes"].add(node_id)

self.exceptions[key] = row

LocalLocustRunner

LocalLocustRunner是LocustRunner的擴(kuò)展類

class LocalLocustRunner(LocustRunner):

"""

當(dāng)options參數(shù)中沒有傳入master和slave時(shí),默認(rèn)使用本地執(zhí)行模式

"""

def __init__(self, locust_classes, options):

# 繼承LocalLocustRunner類的全部功能

super(LocalLocustRunner, self).__init__(locust_classes, options)

# 增加一個(gè)監(jiān)聽器用于監(jiān)控exception

def on_locust_error(locust_instance, exception, tb):

formatted_tb = "".join(traceback.format_tb(tb))

self.log_exception("local", str(exception), formatted_tb)

events.locust_error += on_locust_error

def start_hatching(self, locust_count=None, hatch_rate=None, wait=False):

# 調(diào)用LocustRunner中的start_hatching方法后得到的Group賦給greenlet

self.hatching_greenlet = gevent.spawn(lambda: super(LocalLocustRunner, self).start_hatching(locust_count, hatch_rate, wait=wait))

self.greenlet = self.hatching_greenlet

DistributedLocustRunner

設(shè)置master節(jié)點(diǎn)的host地址/端口號(hào)/綁定地址/綁定端口,是MasterLocustRunner和SlaveLocustRunner類的基類

class DistributedLocustRunner(LocustRunner):

def __init__(self, locust_classes, options):

# 繼承LocustRunner類的全部功能,并設(shè)置host/port/bind_host/bind_port

super(DistributedLocustRunner, self).__init__(locust_classes, options)

self.master_host = options.master_host #master節(jié)點(diǎn)host

self.master_port = options.master_port

self.master_bind_host = options.master_bind_host

self.master_bind_port = options.master_bind_port

MasterLocustRunner

分布式壓測master節(jié)點(diǎn)任務(wù)分發(fā),slave節(jié)點(diǎn)狀態(tài)控制,并發(fā)數(shù)計(jì)算,以及性能測試數(shù)據(jù)收集

class MasterLocustRunner(DistributedLocustRunner):

def __init__(self, *args, **kwargs):

super(MasterLocustRunner, self).__init__(*args, **kwargs)

class SlaveNodesDict(dict):

# 設(shè)置salve節(jié)點(diǎn)的壓測狀態(tài)

def get_by_state(self, state):

return [c for c in six.itervalues(self) if c.state == state]

# 設(shè)置全部slave節(jié)點(diǎn)狀態(tài)為ready

@property

def ready(self):

return self.get_by_state(STATE_INIT)

# 設(shè)置全部slave節(jié)點(diǎn)狀態(tài)為加壓中

@property

def hatching(self):

return self.get_by_state(STATE_HATCHING)

# 設(shè)置全部slave節(jié)點(diǎn)狀態(tài)為壓測中

@property

def running(self):

return self.get_by_state(STATE_RUNNING)

self.clients = SlaveNodesDict()

# 綁定master節(jié)點(diǎn)遠(yuǎn)程rpc調(diào)用服務(wù)器

self.server = rpc.Server(self.master_bind_host, self.master_bind_port)

self.greenlet = Group()

self.greenlet.spawn(self.client_listener).link_exception(callback=self.noop)

# 添加監(jiān)聽器收集slave節(jié)點(diǎn)用戶并發(fā)數(shù)量

def on_slave_report(client_id, data):

if client_id not in self.clients:

logger.info("Discarded report from unrecognized slave %s", client_id)

return

self.clients[client_id].user_count = data["user_count"]

events.slave_report += on_slave_report

# 添加監(jiān)聽器發(fā)送quit信息到各個(gè)slave節(jié)點(diǎn)

def on_quitting():

self.quit()

events.quitting += on_quitting

#user_count屬性,用以返回各slave節(jié)點(diǎn)并發(fā)數(shù)之和

@property

def user_count(self):

return sum([c.user_count for c in six.itervalues(self.clients)])

#開始施壓

def start_hatching(self, locust_count, hatch_rate):

"""

locust_count: 并發(fā)任務(wù)數(shù)量

hatch_rate:每秒并發(fā)數(shù)

"""

#空閑slave節(jié)點(diǎn)數(shù)=ready狀態(tài)+running狀態(tài)之和

num_slaves = len(self.clients.ready) + len(self.clients.running)

if not num_slaves:

logger.warning("You are running in distributed mode but have no slave servers connected. "

"Please connect slaves prior to swarming.")

return

self.num_clients = locust_count # 并發(fā)總量

slave_num_clients = locust_count // (num_slaves or 1) #每個(gè)slave節(jié)點(diǎn)需要執(zhí)行的并發(fā)數(shù)

slave_hatch_rate = float(hatch_rate) / (num_slaves or 1) #每個(gè)slave節(jié)點(diǎn)的每秒并發(fā)數(shù)

remaining = locust_count % num_slaves #未分配到salve節(jié)點(diǎn)的并發(fā)數(shù)

logger.info("Sending hatch jobs to %d ready clients", num_slaves)

if self.state != STATE_RUNNING and self.state != STATE_HATCHING:

self.stats.clear_all() #重置性能測試數(shù)據(jù)

self.exceptions = {}

events.master_start_hatching.fire() #開始施加壓力

for client in six.itervalues(self.clients):

data = {

"hatch_rate":slave_hatch_rate,

"num_clients":slave_num_clients,

"host":self.host,

"stop_timeout":None

}

# 未分配的并發(fā)數(shù)再依次添加到各個(gè)slave上

if remaining > 0:

data["num_clients"] += 1

remaining -= 1

self.server.send(Message("hatch", data, None)) #master節(jié)點(diǎn)發(fā)送數(shù)據(jù)data到slave

#壓測開始時(shí)間

self.stats.start_time = time()

self.state = STATE_HATCHING

#將加壓中和運(yùn)行中的壓力測試停止

def stop(self):

for client in self.clients.hatching + self.clients.running:

self.server.send(Message("stop", None, None)) #發(fā)送停止指令到slave節(jié)點(diǎn)

events.master_stop_hatching.fire() #觸發(fā)停止任務(wù)hook

#中止壓測

def quit(self):

for client in six.itervalues(self.clients):

self.server.send(Message("quit", None, None))

self.greenlet.kill(block=True)

#獲取master節(jié)點(diǎn)發(fā)送的數(shù)據(jù)

def client_listener(self):

while True:

msg = self.server.recv()

if msg.type == "client_ready": #如果msg消息類型為client_ready,從cliens列表中刪除slave節(jié)點(diǎn)

id = msg.node_id

self.clients[id] = SlaveNode(id)

logger.info("Client %r reported as ready. Currently %i clients ready to swarm." % (id, len(self.clients.ready)))

elif msg.type == "client_stopped": #如果msg消息類型為client_stopped,從cliens列表中刪除slave節(jié)點(diǎn)

del self.clients[msg.node_id]

if len(self.clients.hatching + self.clients.running) == 0: #如果沒有slave節(jié)點(diǎn)處于hatching/running狀態(tài),則標(biāo)記壓測狀態(tài)為STATE_STOPPED

self.state = STATE_STOPPED

logger.info("Removing %s client from running clients" % (msg.node_id))

elif msg.type == "stats": #如果msg消息類型為stats,則計(jì)算各slave節(jié)點(diǎn)并發(fā)數(shù)之和

events.slave_report.fire(client_id=msg.node_id, data=msg.data)

elif msg.type == "hatching": #如果msg消息類型為hatching,則標(biāo)記slave節(jié)點(diǎn)狀態(tài)為STATE_HATCHING

self.clients[msg.node_id].state = STATE_HATCHING

elif msg.type == "hatch_complete": #如果msg消息類型為hatch_complete,標(biāo)記slave狀態(tài)為STATE_RUNNING

self.clients[msg.node_id].state = STATE_RUNNING

self.clients[msg.node_id].user_count = msg.data["count"] #獲取并發(fā)數(shù)

if len(self.clients.hatching) == 0:

count = sum(c.user_count for c in six.itervalues(self.clients)) #計(jì)算所有slave節(jié)點(diǎn)的并發(fā)數(shù)之和

events.hatch_complete.fire(user_count=count) #記錄并發(fā)數(shù)

elif msg.type == "quit": #如果msg消息類型為quit,則結(jié)束壓測任務(wù)

if msg.node_id in self.clients:

del self.clients[msg.node_id]

logger.info("Client %r quit. Currently %i clients connected." % (msg.node_id, len(self.clients.ready)))

elif msg.type == "exception":

self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"])

# slave_count屬性,返回ready/hatching/running狀態(tài)slave節(jié)點(diǎn)之和

@property

def slave_count(self):

return len(self.clients.ready) + len(self.clients.hatching) + len(self.clients.running)

SlaveLocustRunner

slave節(jié)點(diǎn)任務(wù)執(zhí)行

class SlaveLocustRunner(DistributedLocustRunner):

def __init__(self, *args, **kwargs):

super(SlaveLocustRunner, self).__init__(*args, **kwargs)

# 根據(jù)hostname,當(dāng)前時(shí)間戳和隨機(jī)數(shù)生成唯一client_id

self.client_id = socket.gethostname() + "_" + md5(str(time() + random.randint(0,10000)).encode('utf-8')).hexdigest()

# 連接rpc服務(wù)端,即master節(jié)點(diǎn)

self.client = rpc.Client(self.master_host, self.master_port)

self.greenlet = Group() #協(xié)程任務(wù)池

# 執(zhí)行任務(wù)

self.greenlet.spawn(self.worker).link_exception(callback=self.noop)

# slave節(jié)點(diǎn)發(fā)送當(dāng)前狀態(tài)client_ready和client_id到master

self.client.send(Message("client_ready", None, self.client_id))

# salve發(fā)送性能測試狀態(tài)數(shù)據(jù)到master

self.greenlet.spawn(self.stats_reporter).link_exception(callback=self.noop)

# 增加監(jiān)聽器,當(dāng)任務(wù)完成時(shí)發(fā)送hatch_complete狀態(tài)/并發(fā)數(shù)/client_id到master

def on_hatch_complete(user_count):

self.client.send(Message("hatch_complete", {"count":user_count}, self.client_id))

events.hatch_complete += on_hatch_complete

# 增加監(jiān)聽器,發(fā)送salve節(jié)點(diǎn)并發(fā)數(shù)到master

def on_report_to_master(client_id, data):

data["user_count"] = self.user_count

events.report_to_master += on_report_to_master

# 增加監(jiān)聽器,發(fā)送quit消息到master

def on_quitting():

self.client.send(Message("quit", None, self.client_id))

events.quitting += on_quitting

# 增加監(jiān)聽器,發(fā)送異常消息到master

def on_locust_error(locust_instance, exception, tb):

formatted_tb = "".join(traceback.format_tb(tb))

self.client.send(Message("exception", {"msg" : str(exception), "traceback" : formatted_tb}, self.client_id))

events.locust_error += on_locust_error

# slave節(jié)點(diǎn)執(zhí)行壓測任務(wù)

def worker(self):

while True:

msg = self.client.recv()

if msg.type == "hatch": # 收到hatch指令,根據(jù)master傳遞的data,施壓

self.client.send(Message("hatching", None, self.client_id))

job = msg.data

self.hatch_rate = job["hatch_rate"]

#self.num_clients = job["num_clients"]

self.host = job["host"]

self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"]))

elif msg.type == "stop": # 收到stop指令,停止壓測,并發(fā)送slave狀態(tài)到master

self.stop()

self.client.send(Message("client_stopped", None, self.client_id))

self.client.send(Message("client_ready", None, self.client_id))

elif msg.type == "quit": # 收到quit指令,停止壓測,并強(qiáng)制kill壓測協(xié)程

logger.info("Got quit message from master, shutting down...")

self.stop()

self.greenlet.kill(block=True)

# 發(fā)送性能數(shù)據(jù)到master

def stats_reporter(self):

while True:

data = {}

events.report_to_master.fire(client_id=self.client_id, data=data)

try:

self.client.send(Message("stats", data, self.client_id))

except:

logger.error("Connection lost to master server. Aborting...")

break

gevent.sleep(SLAVE_REPORT_INTERVAL)

參考文獻(xiàn):

https://www.missshi.cn/api/view/blog/5a0aef86483c561314000002

總結(jié)

以上是生活随笔為你收集整理的python locust 时间戳过期_Locust源码分析之runners.py模块(6)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。