locust压测工具:测试信息输出与分布式模式
发布日期:2021-07-25 13:04:54 浏览次数:9 分类:技术文章

本文共 34847 字,大约阅读时间需要 116 分钟。

locust压测环境描述

本文环境python3.5.2locust版本0.9.0

locust测试信息输出与分布式模式

本文将主要分析两个方面的内容:

1.locust在启动运行在跑测试用例的时候,会将相关的测试信息输出

2.locust支持分布式测试,已达到可以通过多机器来达到高并发的测试。

下文将会对这两个内容进行分析。

locust测试时的相关测试信息

在locust启动流程的分析过程中,在main函数的执行过程中,有如下代码:

if not options.only_summary and (options.print_stats or (options.no_web and not options.slave)):    # spawn stats printing greenlet    gevent.spawn(stats_printer)                             # 控制台输出相关信息

主要通过stats_printer将相关的运行信息输出,我们就从此处开始分析用例的执行流程;

def stats_printer():    from . import runners    while True:        print_stats(runners.locust_runner.request_stats)            # 首先获取runners.locust_runner对应的此刻的locust实例并获取request_stats属性值        gevent.sleep(CONSOLE_STATS_INTERVAL_SEC)                    # 打印完成后,睡眠2秒

首先分析一下print_stats函数;

def print_stats(stats):    console_logger.info((" %-" + str(STATS_NAME_WIDTH) + "s %7s %12s %7s %7s %7s  | %7s %7s") % ('Name', '# reqs', '# fails', 'Avg', 'Min', 'Max', 'Median', 'req/s'))   # 打印头部信息    console_logger.info("-" * (80 + STATS_NAME_WIDTH))                                                                                                                   # 打印横线    total_rps = 0    total_reqs = 0    total_failures = 0    for key in sorted(six.iterkeys(stats)):                                     # 遍历所有的状态属性值        r = stats[key]        total_rps += r.current_rps                                              # 获取当前值        total_reqs += r.num_requests        total_failures += r.num_failures        console_logger.info(r)    console_logger.info("-" * (80 + STATS_NAME_WIDTH))                          # 打印横线    try:        fail_percent = (total_failures/float(total_reqs))*100                   # 计算失败比率    except ZeroDivisionError:        fail_percent = 0    console_logger.info((" %-" + str(STATS_NAME_WIDTH) + "s %7d %12s %42.2f") % ('Total', total_reqs, "%d(%.2f%%)" % (total_failures, fail_percent), total_rps))        # 打印相关计算出来的信息    console_logger.info("")

该函数的主要作用将就是先计算出相关比率,总的执行错误数等相关信息,然后按照表格的信息输出到控制台,其主要的信息数据是来源于输入的参数stats值,而该值就是runners.locust_runner.request_stats,查看LocustRunner类的初始化过程可知,stats为global_stats,

global_stats = RequestStats()

此时我们查看RequestStats的相关信息:

class RequestStats(object):    def __init__(self):        self.entries = {}                                                                   # state输出的信息        self.errors = {}        self.total = StatsEntry(self, "Total", None, use_response_times_cache=True)         # 定义了total数量值        self.start_time = None                                                              # 定义开始时间        @property    def num_requests(self):        return self.total.num_requests                                                      # 获取总共的请求数        @property    def num_failures(self):        return self.total.num_failures                                                      # 获取总共失败的次数        @property    def last_request_timestamp(self):        return self.total.last_request_timestamp                                            # 获取最后一次请求的时间戳        def log_request(self, method, name, response_time, content_length):        self.total.log(response_time, content_length)                                       # 通过total记录总的数量        self.get(name, method).log(response_time, content_length)                           # 通过name 与method 定义一个StatsEntry来记录数据        def log_error(self, method, name, error):        self.total.log_error(error)                                                         # 通过total记录总的错误数量        self.get(name, method).log_error(error)                                             # 通过 name 与method组合的一个StatsEntry来记录数据                # store error in errors dict        key = StatsError.create_key(method, name, error)                                    # 生成一个错误的key        entry = self.errors.get(key)                                                        # 先从errors中去获取该key        if not entry:                                                                       # 如果没有获取到            entry = StatsError(method, name, error)                                         # 生成一个错误记录的StatsError实例            self.errors[key] = entry                                                        # 保存该实例        entry.occured()                                                                     # 记录该错误        def get(self, name, method):        """        Retrieve a StatsEntry instance by name and method        """        entry = self.entries.get((name, method))                                            # 先从enteries字典中获取是否有以name method为key的StatsEntry        if not entry:            entry = StatsEntry(self, name, method)                                          # 如果没有则实例化一个并保存到enteries中            self.entries[(name, method)] = entry        return entry                                                                        # 返回该entery        def reset_all(self):        """        Go through all stats entries and reset them to zero        """        self.start_time = time.time()                                                       # 重置当前开始时间        self.total.reset()                                                                  # 重新置零 从新开始记录        for r in six.itervalues(self.entries):                                              # 遍历所有的statsentery然后并重置所有数据            r.reset()        def clear_all(self):        """        Remove all stats entries and errors        """        self.total = StatsEntry(self, "Total", None, use_response_times_cache=True)         # 重新生成total        self.entries = {}                                                                   # 全部重新初始化值        self.errors = {}        self.start_time = None        def serialize_stats(self):        return [self.entries[key].get_stripped_report() for key in six.iterkeys(self.entries) if not (self.entries[key].num_requests == 0 and self.entries[key].num_failures == 0)] # 初始化输出信息        def serialize_errors(self):        return dict([(k, e.to_dict()) for k, e in six.iteritems(self.errors)])              # 生成错误信息的字典值

此时里面主要的记录主要依靠StatsEntry类来实现,查看该类的代码;

class StatsEntry(object):    """    Represents a single stats entry (name and method)    """        name = None    """ Name (URL) of this stats entry """        method = None    """ Method (GET, POST, PUT, etc.) """        num_requests = None    """ The number of requests made """        num_failures = None    """ Number of failed request """        total_response_time = None    """ Total sum of the response times """        min_response_time = None    """ Minimum response time """        max_response_time = None    """ Maximum response time """        num_reqs_per_sec = None    """ A {second => request_count} dict that holds the number of requests made per second """        response_times = None    """    A {response_time => count} dict that holds the response time distribution of all    the requests.        The keys (the response time in ms) are rounded to store 1, 2, ... 9, 10, 20. .. 90,     100, 200 .. 900, 1000, 2000 ... 9000, in order to save memory.        This dict is used to calculate the median and percentile response times.    """        use_response_times_cache = False    """    If set to True, the copy of the response_time dict will be stored in response_times_cache     every second, and kept for 20 seconds (by default, will be CURRENT_RESPONSE_TIME_PERCENTILE_WINDOW + 10).     We can use this dict to calculate the *current*  median response time, as well as other response     time percentiles.    """        response_times_cache = None    """    If use_response_times_cache is set to True, this will be a {timestamp => CachedResponseTimes()}     OrderedDict that holds a copy of the response_times dict for each of the last 20 seconds.    """        total_content_length = None    """ The sum of the content length of all the requests for this entry """        start_time = None    """ Time of the first request for this entry """        last_request_timestamp = None    """ Time of the last request for this entry """        def __init__(self, stats, name, method, use_response_times_cache=False):        self.stats = stats                                                      # 保存传入的stats        self.name = name                                                        # 名称        self.method = method                                                    # 方法        self.use_response_times_cache = use_response_times_cache                        self.reset()                                                            # 重新初始化        def reset(self):        self.start_time = time.time()                                           # 重新设置开始时间        self.num_requests = 0        self.num_failures = 0        self.total_response_time = 0        self.response_times = {}        self.min_response_time = None        self.max_response_time = 0        self.last_request_timestamp = int(time.time())        self.num_reqs_per_sec = {}        self.total_content_length = 0        if self.use_response_times_cache:            self.response_times_cache = OrderedDict()            self._cache_response_times(int(time.time()))        def log(self, response_time, content_length):        # get the time        t = int(time.time())                                                        # 获取当前时间                if self.use_response_times_cache and self.last_request_timestamp and t > self.last_request_timestamp:            # see if we shall make a copy of the respone_times dict and store in the cache            self._cache_response_times(t-1)                                                         self.num_requests += 1                                                      # 请求加1        self._log_time_of_request(t)                                                # 记录当前时间        self._log_response_time(response_time)                                      # 记录响应时间        # increase total content-length        self.total_content_length += content_length                                 # 记录总的响应数据的大小    def _log_time_of_request(self, t):        self.num_reqs_per_sec[t] = self.num_reqs_per_sec.setdefault(t, 0) + 1       # 保存最新的时间        self.last_request_timestamp = t                                             # 获取最新的请求的时间戳    def _log_response_time(self, response_time):        self.total_response_time += response_time                                   # 增加总的记录时间        if self.min_response_time is None:            self.min_response_time = response_time        self.min_response_time = min(self.min_response_time, response_time)         # 比较获取最小的响应时间        self.max_response_time = max(self.max_response_time, response_time)         # 比较获取最大的响应时间        # to avoid to much data that has to be transfered to the master node when        # running in distributed mode, we save the response time rounded in a dict        # so that 147 becomes 150, 3432 becomes 3400 and 58760 becomes 59000        if response_time < 100:                                                     # 如果小于一百            rounded_response_time = response_time                                   # 则直接赋值        elif response_time < 1000:                                                  # 如果大于100小于1000            rounded_response_time = int(round(response_time, -1))                   # 倒数第一位四舍五入        elif response_time < 10000:            rounded_response_time = int(round(response_time, -2))        else:            rounded_response_time = int(round(response_time, -3))                   # 倒数第三位四舍五入        # increase request count for the rounded key in response time dict        self.response_times.setdefault(rounded_response_time, 0)                    # 设置时间        self.response_times[rounded_response_time] += 1                                 def log_error(self, error):        self.num_failures += 1                                                      # 失败次数加1    @property    def fail_ratio(self):        try:            return float(self.num_failures) / (self.num_requests + self.num_failures)  # 计算错误的比率        except ZeroDivisionError:            if self.num_failures > 0:                return 1.0            else:                return 0.0    @property    def avg_response_time(self):        try:            return float(self.total_response_time) / self.num_requests                 # 计算平均响应时间        except ZeroDivisionError:            return 0    @property    def median_response_time(self):        if not self.response_times:            return 0        return median_from_dict(self.num_requests, self.response_times)                 # 求出每个请求平均花了多久    @property    def current_rps(self):        if self.stats.last_request_timestamp is None:            return 0        slice_start_time = max(self.stats.last_request_timestamp - 12, int(self.stats.start_time or 0))     # 获取总共的运行时间        reqs = [self.num_reqs_per_sec.get(t, 0) for t in range(slice_start_time, self.stats.last_request_timestamp-2)]  # 获取总的运行时间的请求的列表        return avg(reqs)                                                                                                # 求平均值每秒多少个请求                @property    def total_rps(self):        if not self.stats.last_request_timestamp or not self.stats.start_time:            return 0.0        return self.num_requests / max(self.stats.last_request_timestamp - self.stats.start_time, 1)                    # 求出每个请求的平均处理时间    @property    def avg_content_length(self):        try:            return self.total_content_length / self.num_requests                                                        # 求出平均每个请求多大        except ZeroDivisionError:            return 0        def extend(self, other):        """        Extend the data from the current StatsEntry with the stats from another        StatsEntry instance.         """        self.last_request_timestamp = max(self.last_request_timestamp, other.last_request_timestamp)            # 获取最新的运行的时间值        self.start_time = min(self.start_time, other.start_time)                                                # 获取最小的开始时间        self.num_requests = self.num_requests + other.num_requests                                              # 获取总的请求数        self.num_failures = self.num_failures + other.num_failures                                              # 获取总的失败请求数        self.total_response_time = self.total_response_time + other.total_response_time                         # 获取总的响应时间        self.max_response_time = max(self.max_response_time, other.max_response_time)                           # 获取最大的响应时间        self.min_response_time = min(self.min_response_time or 0, other.min_response_time or 0) or other.min_response_time  # 获取最小的响应时间        self.total_content_length = self.total_content_length + other.total_content_length                      # 获取总的响应请求的大小        for key in other.response_times:            self.response_times[key] = self.response_times.get(key, 0) + other.response_times[key]              # 获取每个请求的并计算总的响应时间        for key in other.num_reqs_per_sec:            self.num_reqs_per_sec[key] = self.num_reqs_per_sec.get(key, 0) +  other.num_reqs_per_sec[key]        def serialize(self):        return {            "name": self.name,            "method": self.method,            "last_request_timestamp": self.last_request_timestamp,            "start_time": self.start_time,            "num_requests": self.num_requests,            "num_failures": self.num_failures,            "total_response_time": self.total_response_time,            "max_response_time": self.max_response_time,            "min_response_time": self.min_response_time,            "total_content_length": self.total_content_length,            "response_times": self.response_times,            "num_reqs_per_sec": self.num_reqs_per_sec,        }                                                                               # 序列化所有值        @classmethod    def unserialize(cls, data):        obj = cls(None, data["name"], data["method"])                                   # 反序列化 生成该类实例        for key in [            "last_request_timestamp",            "start_time",            "num_requests",            "num_failures",            "total_response_time",            "max_response_time",            "min_response_time",            "total_content_length",            "response_times",            "num_reqs_per_sec",        ]:            setattr(obj, key, data[key])                                                # 给该类设置相关属性值        return obj                                                                      # 返回初始化的实例    def get_stripped_report(self):        """        Return the serialized version of this StatsEntry, and then clear the current stats.        """        report = self.serialize()                                                       # 获取序列化的数据        self.reset()                                                                    # 重新初始化        return report                                                                   # 返回当前属性值    def __str__(self):        try:            fail_percent = (self.num_failures/float(self.num_requests + self.num_failures))*100        except ZeroDivisionError:            fail_percent = 0                return (" %-" + str(STATS_NAME_WIDTH) + "s %7d %12s %7d %7d %7d  | %7d %7.2f") % (            (self.method and self.method + " " or "") + self.name,            self.num_requests,            "%d(%.2f%%)" % (self.num_failures, fail_percent),            self.avg_response_time,            self.min_response_time or 0,            self.max_response_time,            self.median_response_time or 0,            self.current_rps or 0        )                                                                               # 打印Object时输出的信息

通过StatsEntry类的定义可知,相关信息的记录与计算都是通过该类来实现,计算平均每个请求的耗时,计算每个请求多大的返回大小等参数都是通过该类来完成。

整体的记录的体系大概知道后,请求的信息是如何记录的呢?

"""A global instance for holding the statistics. Should be removed eventually."""def on_request_success(request_type, name, response_time, response_length, **kwargs):    global_stats.log_request(request_type, name, response_time, response_length)            # 记录访问成功的请求def on_request_failure(request_type, name, response_time, exception, **kwargs):    global_stats.log_request(request_type, name, response_time, 0)                          # 记录访问请求    global_stats.log_error(request_type, name, exception)                                   # 记录该错误的请求    events.request_success += on_request_success                        # 添加到events.request_success中events.request_failure += on_request_failure                        # 添加到events.request_failure中

由该段代码可知,只需要调用events.request_success.fire()函数就可以记录该请求数据,此时我们回看http执行过程中的如下代码:

try:            response.raise_for_status()        except RequestException as e:            events.request_failure.fire(                request_type=request_meta["method"],                 name=request_meta["name"],                 response_time=request_meta["response_time"],                 exception=e,             )        else:            events.request_success.fire(                request_type=request_meta["method"],                name=request_meta["name"],                response_time=request_meta["response_time"],                response_length=request_meta["content_size"],            )        return response

在通过requests.Session发送请求之后,如果访问失败则调用events.request_failure来记录失败的请求,否则就调动events.request_success来记录成功的请求,至此相关执行过程中的数据信息的大致执行流程就分析完成,大家如有兴趣可翻看源码查看更多有关记录的相关信息。

分布式模式的流程

因为locust支持分布式的测试,在此我们简要分析一下分布式的大致流程,首先根据文档中的分布式的启动的命令,通过不同的输入参数而启动不同的模式,在main函数的执行过程中有如下代码;

elif options.master:    runners.locust_runner = MasterLocustRunner(locust_classes, options)    if options.no_web:        while len(runners.locust_runner.clients.ready)

有代码可知,通过传入的不同的参数而运行在不同的模式之下,我们首先分析一下master模式。

master模式的启动

从代码可知,主从模式不过是初始化了不同的Locust类,master初始化的是MasterLocustRunner类,查看该类的代码;

class DistributedLocustRunner(LocustRunner):    def __init__(self, locust_classes, options):        super(DistributedLocustRunner, self).__init__(locust_classes, options)        self.master_host = options.master_host                                      # 初始化master的域名        self.master_port = options.master_port                                      # 初始化master的端口        self.master_bind_host = options.master_bind_host                            # 初始化master绑定的域名        self.master_bind_port = options.master_bind_port                            # 初始化master绑定的端口        self.heartbeat_liveness = options.heartbeat_liveness                        # 获取心跳检查的次数 默认为3        self.heartbeat_interval = options.heartbeat_interval                        # 获取心跳的间隔时间 默认为1        def noop(self, *args, **kwargs):        """ Used to link() greenlets to in order to be compatible with gevent 1.0 """        passclass SlaveNode(object):    def __init__(self, id, state=STATE_INIT, heartbeat_liveness=3):        self.id = id         self.state = state                                                  # 初始化节点的状态        self.user_count = 0        self.heartbeat = heartbeat_liveness                                 # 初始化心跳检查的次数class MasterLocustRunner(DistributedLocustRunner):    def __init__(self, *args, **kwargs):        super(MasterLocustRunner, self).__init__(*args, **kwargs)        class SlaveNodesDict(dict):            def get_by_state(self, state):                return [c for c in six.itervalues(self) if c.state == state]                        @property            def all(self):                return six.itervalues(self)            @property            def ready(self):                return self.get_by_state(STATE_INIT)                        @property            def hatching(self):                return self.get_by_state(STATE_HATCHING)                        @property            def running(self):                return self.get_by_state(STATE_RUNNING)                self.clients = SlaveNodesDict()                                                 # 初始化一个从节点的字典        self.server = rpc.Server(self.master_bind_host, self.master_bind_port)          # 启动rpc服务端        self.greenlet = Group()                         self.greenlet.spawn(self.heartbeat_worker).link_exception(callback=self.noop)   # 启动心动检查        self.greenlet.spawn(self.client_listener).link_exception(callback=self.noop)    # 启动客户端监听                # listener that gathers info on how many locust users the slaves has spawned        def on_slave_report(client_id, data):            if client_id not in self.clients:                logger.info("Discarded report from unrecognized slave %s", client_id)                return            self.clients[client_id].user_count = data["user_count"]        events.slave_report += on_slave_report                                          # 记录从节点的报告                # register listener that sends quit message to slave nodes        def on_quitting():            self.quit()        events.quitting += on_quitting                                                  # 记录离开函数        @property    def user_count(self):        return sum([c.user_count for c in six.itervalues(self.clients)])                # 获取所有的并发的用户数        def start_hatching(self, locust_count, hatch_rate):        num_slaves = len(self.clients.ready) + len(self.clients.running) + len(self.clients.hatching)  # 获取所有的从节点数量        if not num_slaves:            logger.warning("You are running in distributed mode but have no slave servers connected. "                           "Please connect slaves prior to swarming.")            return        self.num_clients = locust_count                                                 # 获取客户端数量        slave_num_clients = locust_count // (num_slaves or 1)                           # 计算从节点的数量        slave_hatch_rate = float(hatch_rate) / (num_slaves or 1)                        # 计算启动速率        remaining = locust_count % num_slaves                                           # 计算还有多少启动        logger.info("Sending hatch jobs to %d ready clients", num_slaves)        if self.state != STATE_RUNNING and self.state != STATE_HATCHING:                # 如果既不是运行状态并且也不是启动状态            self.stats.clear_all()                                                      # 重新初始化            self.exceptions = {}            events.master_start_hatching.fire()                 for client in (self.clients.ready + self.clients.running + self.clients.hatching):  # 遍历所有的客户端            data = {                "hatch_rate":slave_hatch_rate,                "num_clients":slave_num_clients,                "host":self.host,                "stop_timeout":None            }            if remaining > 0:                data["num_clients"] += 1                remaining -= 1            self.server.send_to_client(Message("hatch", data, client.id))               # 向客户端发送启动的命令参数                self.stats.start_time = time()                                                  # 记录启动时间        self.state = STATE_HATCHING                                                     # 修改状态为运行态    def stop(self):        for client in self.clients.all:                                                 # 停止的话就遍历所有的客户端            self.server.send_to_client(Message("stop", None, client.id))                # 向客户端发送停止命令        events.master_stop_hatching.fire()                                              # master停止        def quit(self):        for client in self.clients.all:                                                 # 遍历所有客户端发送退出指令            self.server.send_to_client(Message("quit", None, client.id))                    self.greenlet.kill(block=True)                                                  # master退出        def heartbeat_worker(self):        while True:                                                                     # 死循环            gevent.sleep(self.heartbeat_interval)                                       # 休眠指定时间默认1秒            for client in self.clients.all:                                             # 遍历所有客户端                if client.heartbeat < 0 and client.state != STATE_MISSING:              # 如果客户端的hearbeat小于0并且客户端状态不为丢失状态则设置为丢失                    logger.info('Slave %s failed to send heartbeat, setting state to missing.' % str(client.id))                    client.state = STATE_MISSING                else:                    client.heartbeat -= 1                                               # heartbeat减一    def client_listener(self):        while True:            client_id, msg = self.server.recv_from_client()                             # 从客户端接受请求            msg.node_id = client_id                                                     # 设置客户端id            if msg.type == "client_ready":                                              # 判断消息类型                id = msg.node_id                self.clients[id] = SlaveNode(id, heartbeat_liveness=self.heartbeat_liveness)    # 如果客户端准备完成则服务端生成一个SlaveNode实例与之对应                logger.info("Client %r reported as ready. Currently %i clients ready to swarm." % (id, len(self.clients.ready)))                ## emit a warning if the slave's clock seem to be out of sync with our clock                #if abs(time() - msg.data["time"]) > 5.0:                #    warnings.warn("The slave node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")            elif msg.type == "client_stopped":                                          # 如果客户端停止                del self.clients[msg.node_id]                                           # 则删除该节点                logger.info("Removing %s client from running clients" % (msg.node_id))            elif msg.type == "heartbeat":                                               # 如果是心跳                if msg.node_id in self.clients:                                         # 判断是否在子节点列表中                    self.clients[msg.node_id].heartbeat = self.heartbeat_liveness       # 重置该node的心跳计数                    self.clients[msg.node_id].state = msg.data['state']                 # 重置该node的节点状态            elif msg.type == "stats":                                                   # 如果是显示节点状态                events.slave_report.fire(client_id=msg.node_id, data=msg.data)          # 打印节点信息            elif msg.type == "hatching":                                                # 如果是运行状态则更改为运行状态                self.clients[msg.node_id].state = STATE_HATCHING            elif msg.type == "hatch_complete":                                                          self.clients[msg.node_id].state = STATE_RUNNING                                         self.clients[msg.node_id].user_count = msg.data["count"]                if len(self.clients.hatching) == 0:                    count = sum(c.user_count for c in six.itervalues(self.clients))                    events.hatch_complete.fire(user_count=count)            elif msg.type == "quit":                                                    # 如果退出                if msg.node_id in self.clients:                                         # 遍历所有子节点并删除                    del self.clients[msg.node_id]                    logger.info("Client %r quit. Currently %i clients connected." % (msg.node_id, len(self.clients.ready)))            elif msg.type == "exception":                                               # 如果报错则停止                self.log_exception(msg.node_id, msg.data["msg"], msg.data["traceback"])            if not self.state == STATE_INIT and all(map(lambda x: x.state == STATE_INIT, self.clients.all)): # 如果不 是 状态为初始化中并且所有节点都为初始化状态                    self.state = STATE_STOPPED                                                               # 状态修改为停止状态    @property    def slave_count(self):        return len(self.clients.ready) + len(self.clients.hatching) + len(self.clients.running)

由该类可知,master主要是通过rpc来与slave进行交互,并且通过rpc来进行各种交互,并且在启动的时候master就启动心跳检查,并且监听客户端发送来的数据,此时我们继续分析一下slave的相关状态。

slave模式的执行

slave的代码如下;

class SlaveLocustRunner(DistributedLocustRunner):    def __init__(self, *args, **kwargs):        super(SlaveLocustRunner, self).__init__(*args, **kwargs)        self.client_id = socket.gethostname() + "_" + uuid4().hex                       # 生成一个client_id                self.client = rpc.Client(self.master_host, self.master_port, self.client_id)    # 连接master并传入节点的client_id        self.greenlet = Group()        self.greenlet.spawn(self.heartbeat).link_exception(callback=self.noop)          # 启动心跳函数        self.greenlet.spawn(self.worker).link_exception(callback=self.noop)             # 启动工作函数        self.client.send(Message("client_ready", None, self.client_id))                 # 向Master发送准备完成信息        self.slave_state = STATE_INIT                                                   # 客户端设置状态为初始化        self.greenlet.spawn(self.stats_reporter).link_exception(callback=self.noop)     # 启动向master发送状态报告                # register listener for when all locust users have hatched, and report it to the master node        def on_hatch_complete(user_count):            self.client.send(Message("hatch_complete", {"count":user_count}, self.client_id))            self.slave_state = STATE_RUNNING        events.hatch_complete += on_hatch_complete                # register listener that adds the current number of spawned locusts to the report that is sent to the master node         def on_report_to_master(client_id, data):            data["user_count"] = self.user_count        events.report_to_master += on_report_to_master                # register listener that sends quit message to master        def on_quitting():            self.client.send(Message("quit", None, self.client_id))        events.quitting += on_quitting                                                  # 如果退出向master发送退出信息        # register listener thats sends locust exceptions to master        def on_locust_error(locust_instance, exception, tb):            formatted_tb = "".join(traceback.format_tb(tb))            self.client.send(Message("exception", {"msg" : str(exception), "traceback" : formatted_tb}, self.client_id))        events.locust_error += on_locust_error    def heartbeat(self):        while True:            self.client.send(Message('heartbeat', {'state': self.slave_state}, self.client_id))     # 向master发送心跳信息            gevent.sleep(self.heartbeat_interval)                                                   # 休眠一段时间    def worker(self):        while True:            msg = self.client.recv()                                                                # 客户端等待接受数据            if msg.type == "hatch":                                                                 # 如果是hatch                self.slave_state = STATE_HATCHING                                                   # 修改状态为运行                self.client.send(Message("hatching", None, self.client_id))                         # 向master发送hatching数据                job = msg.data                                                                      # 获取slave启动的参数信息                self.hatch_rate = job["hatch_rate"]                                                 # 获取速率信息                #self.num_clients = job["num_clients"]                self.host = job["host"]                                                             # 获取Host                self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"]))   # 启动测试            elif msg.type == "stop":                                                                # 如果是停止                self.stop()                                                                         # 停止                self.client.send(Message("client_stopped", None, self.client_id))                   # 向master发送客户端停止命令                self.client.send(Message("client_ready", None, self.client_id))                     # 向客户端发送子节点已经准备好                self.slave_state = STATE_INIT                                                       # 修改状态为初始化状态            elif msg.type == "quit":                logger.info("Got quit message from master, shutting down...")                self.stop()                                                                         # 如果主节点需要子节点退出                self.greenlet.kill(block=True)                                                      # 退出    def stats_reporter(self):        while True:            data = {}            events.report_to_master.fire(client_id=self.client_id, data=data)                       # 获取节点信息            try:                self.client.send(Message("stats", data, self.client_id))                            # 发送给主节点            except:                logger.error("Connection lost to master server. Aborting...")                break                        gevent.sleep(SLAVE_REPORT_INTERVAL)                                                     # 休眠指定时间

slave需要实时的想master报告子节点的运行的状态,通过主节点发送的指令进行相关操作,slave从主节点获取启动参数并启动执行,然后休眠一段时间之后就向master发送运行的数据。

至此分布式的基本内容分析完成,主要是通过rpc来进行节点与master之间的通信,并又心跳检测来获取客户端的状态。大家如有兴趣课自行查看相关代码。

总结

本文主要是分析了locust在运行的过程中对相关的数据进行处理的流程,来反应测试的过程中成功率等相关指标,接着分析了分布式相关的实现的方式,主要通过rpc调用来进行主从之间的通信,节点测试的数据汇总到master,已到达分布式的测试的效果。相关内容大家可自行阅读查看。鉴于本人才疏学浅,如有疏漏请批评指正。

转载地址:https://blog.csdn.net/qq_33339479/article/details/87921264 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:coverage代码覆盖率测试工具:基本原理分析与使用
下一篇:locust压测工具:http测试过程与定时停止

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年03月04日 06时12分13秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

centos 7 mysql图形界面_centos7-vnstat图形界面搭建 2019-04-21
java 防渗透_「java、工程师工作经验怎么写」-看准网 2019-04-21
java中跳出当前循环怎么做_在java中,如何跳出当前的多重循环? 2019-04-21
java程序中执行maven_java – 将一个enviornment变量传递给Maven中的已执行进程 2019-04-21
java16下载_java lombok下载 2019-04-21
python 图像处理与识别书籍_Python图像处理之识别图像中的文字(实例讲解) 2019-04-21
java安全初始化_java安全编码指南之:声明和初始化 2019-04-21
java jstat gc_分析JVM GC及内存情况的方法 2019-04-21
php pclzip.lib.php,php使用pclzip类实现文件压缩的方法(附pclzip类下载地址) 2019-04-21
php dns更新,php_mzdns: 站群,大量域名 通过 dns 服务商 api 批量添加 ip 工具。你懂的~ 基于 mzphp2 框架。... 2019-04-21
jdk 1.8 java.policy,JDK1.8 导致系统报错:java.security.InvalidKeyException:illegal Key Size 2019-04-21
php linux权限,Linux权限详细介绍 2019-04-21
典型环节的matlab仿真分析,典型环节的MATLAB仿真.doc 2019-04-21
Php contenttype类型,各种类型文件的Content Type 2019-04-21
php使用redis持久化,redis如何持久化 2019-04-21
php7.1解压包安装,【Swoole】php7.1安装swoole扩展 2019-04-21
linux centos删除安装的包,CentOS yum认为已删除的软件包仍在安装中 2019-04-21
酒店管理系统c语言带注释,酒店管理系统--C语言版.pdf 2019-04-21
c语言 实现sizeof功能,C语言简单实现sizeof功能代码 2019-04-21
c语言sin函数近似值,用泰勒公式求sin(x)的近似值 2019-04-21