日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > python >内容正文

python

python装饰器函数后执行_Python装饰器限制函数运行时间超时则退出执行

發(fā)布時(shí)間:2023/12/10 python 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python装饰器函数后执行_Python装饰器限制函数运行时间超时则退出执行 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

實(shí)際項(xiàng)目中會(huì)涉及到需要對(duì)有些函數(shù)的響應(yīng)時(shí)間做一些限制,如果超時(shí)就退出函數(shù)的執(zhí)行,停止等待。

可以利用python中的裝飾器實(shí)現(xiàn)對(duì)函數(shù)執(zhí)行時(shí)間的控制。

python裝飾器簡(jiǎn)單來(lái)說(shuō)可以在不改變某個(gè)函數(shù)內(nèi)部實(shí)現(xiàn)和原來(lái)調(diào)用方式的前提下對(duì)該函數(shù)增加一些附件的功能,提供了對(duì)該函數(shù)功能的擴(kuò)展。

方法一. 使用 signal

# coding=utf-8

import signal

import time

def set_timeout(num, callback):

def wrap(func):

def handle(signum, frame): # 收到信號(hào) SIGALRM 后的回調(diào)函數(shù),第一個(gè)參數(shù)是信號(hào)的數(shù)字,第二個(gè)參數(shù)是the interrupted stack frame.

raise RuntimeError

def to_do(*args, **kwargs):

try:

signal.signal(signal.SIGALRM, handle) # 設(shè)置信號(hào)和回調(diào)函數(shù)

signal.alarm(num) # 設(shè)置 num 秒的鬧鐘

print('start alarm signal.')

r = func(*args, **kwargs)

print('close alarm signal.')

signal.alarm(0) # 關(guān)閉鬧鐘

return r

except RuntimeError as e:

callback()

return to_do

return wrap

def after_timeout(): # 超時(shí)后的處理函數(shù)

print("Time out!")

@set_timeout(2, after_timeout) # 限時(shí) 2 秒超時(shí)

def connect(): # 要執(zhí)行的函數(shù)

time.sleep(3) # 函數(shù)執(zhí)行時(shí)間,寫(xiě)大于2的值,可測(cè)試超時(shí)

print('Finished without timeout.')

if __name__ == '__main__':

connect()

方法一中使用的signal有所限制,需要在linux系統(tǒng)上,并且需要在主線(xiàn)程中使用。方法二使用線(xiàn)程計(jì)時(shí),不受此限制。

方法二. 使用Thread

# -*- coding: utf-8 -*-

from threading import Thread

import time

class TimeoutException(Exception):

pass

ThreadStop = Thread._Thread__stop

def timelimited(timeout):

def decorator(function):

def decorator2(*args,**kwargs):

class TimeLimited(Thread):

def __init__(self,_error= None,):

Thread.__init__(self)

self._error = _error

def run(self):

try:

self.result = function(*args,**kwargs)

except Exception,e:

self._error = str(e)

def _stop(self):

if self.isAlive():

ThreadStop(self)

t = TimeLimited()

t.start()

t.join(timeout)

if isinstance(t._error,TimeoutException):

t._stop()

raise TimeoutException('timeout for %s' % (repr(function)))

if t.isAlive():

t._stop()

raise TimeoutException('timeout for %s' % (repr(function)))

if t._error is None:

return t.result

return decorator2

return decorator

@timelimited(2) # 設(shè)置運(yùn)行超時(shí)時(shí)間2S

def fn_1(secs):

time.sleep(secs)

return 'Finished without timeout'

def do_something_after_timeout():

print('Time out!')

if __name__ == "__main__":

try:

print(fn_1(3)) # 設(shè)置函數(shù)執(zhí)行3S

except TimeoutException as e:

print(str(e))

do_something_after_timeout()

總結(jié)

以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)找一找教程網(wǎng)的支持。如果你想了解更多相關(guān)內(nèi)容請(qǐng)查看下面相關(guān)鏈接

總結(jié)

以上是生活随笔為你收集整理的python装饰器函数后执行_Python装饰器限制函数运行时间超时则退出执行的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。