
本文共 5897 字,大约阅读时间需要 19 分钟。
Python ���������������������������������
������
���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������Python������������������������������������������������������������������������������������������������������������������������������
������������
���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������CPU���������������������������������������������������������������������������������
������������
������������������
���������������������������������������������������������������������������������������������������������������������������������������������������������������������TaskThread
���������������������������������������������������������������������
threading.Thread
������������������������������������������������������
���������������������������������������������������������������������������������������������������������������������������������������������������������
������������������
���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
������������
������������
������������������������������������������������������������������������
threading
���time
������������������������������������TaskThread
���������������������������������������LimitDecor
������������������������������������Start
���������������������������������������������������������������������
import threadingimport timeclass TaskThread(threading.Thread): def __init__(self, target, args=()): super().__init__() self.func = target self.args = args def run(self): self.result = self.func(self.args) def GetResult(self): try: return self.result except Exception: return Noneclass LimitDecorator: def Functions(func): def Run(params): limit_time = params[0] thre_func = TaskThread(target=func, args=params) thre_func.setDaemon(True) thre_func.start() sleep_num = int(limit_time // 1) sleep_left = round(limit_time % 1, 1) while True: time.sleep(1) if thre_func.isAlive(): pass else: info = thre_func.GetResult() if info: return info break if sleep_num <= 0: break thre_func.join(timeout=sleep_left) return thre_func.GetResult() return Rundef Start(limit_time, process): thSetter = TaskThread(target=Task, args=[limit_time, process]) thSetter.start() thSetter.join() return thSetter.GetResult()class Task: def __init__(self, args=()): self.args = args def Run(self): process = self.args[1] info = process.Run() return infoclass Process: def __init__(self, args=()): self.args = args def Run(self): print("start Process: args --", self.args) print("start Process: sleep 4s") time.sleep(4) print("over Process") return None
������������
���������������������������������run
���GetResult
���������������������������������������������������������������������������������������������
������������������������LimitDecorator
���������������������������������������������������������������������������������������������������������������������������������������������������
���������������������������������Start
������������������������������������������������������������������������������������������������������������������������������
������������������������������������������������������������������������������������������������thre_func.join(timeout)
������������������������������������������������������������������
���������������������
���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
from Process import Processfrom Start import Startprocess = Process(["log Analyzer", "Cohesive Analyze"])result = Start(10, process)print("���������������������", result)
������
������������������������������������������������������������Python���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������
发表评论
最新留言
关于作者
