kazoo源码分析:服务器交互的实现细节
发布日期:2021-07-25 13:04:58 浏览次数:9 分类:技术文章

本文共 26921 字,大约阅读时间需要 89 分钟。

kazoo源码分析

kazoo-2.6.1

kazoo客户端与服务器概述

上文start概述中,只是简单的概述了kazoo客户端初始化之后,调用了start方法,本文继续详细的了解相关的细节。

kazoo启动之后的主要技术细节

示例代码

本文主要是分析一下基本流程与启动的架构实现,示例代码如下;

from kazoo.client import KazooClientzk = KazooClient(hosts='127.0.0.1:2181')zk.start()# Determine if a node existsif zk.exists("/my/favorite"):    # Do something# Print the version of a node and its datadata, stat = zk.get("/my/favorite")zk.stop()

示例代码还是如上所示。

通过上文分析,可知在线程中运行的是self.zk_loop函数,

def zk_loop(self):    """Main Zookeeper handling loop"""    self.logger.log(BLATHER, 'ZK loop started')    self.connection_stopped.clear()                                 # 清除停止连接    retry = self.retry_sleeper.copy()                               # 重试类拷贝    try:        while not self.client._stopped.is_set():                    # 如果没有被设置为停止            # If the connect_loop returns STOP_CONNECTING, stop retrying            if retry(self._connect_loop, retry) is STOP_CONNECTING:     # 循环执行self._connect_loop                break    except RetryFailedError:        self.logger.warning("Failed connecting to Zookeeper "                            "within the connection retry policy.")    finally:        self.connection_stopped.set()                               # 如果退出或者异常则清除相关数据并关闭会话        self.client._session_callback(KeeperState.CLOSED)        self.logger.log(BLATHER, 'Connection stopped')

可知运行的时候是通过包裹了retry来进行启动运行的,首先来分析一下该类,默认情况下是KazooRetry类实例。

KazooRetry类实例
class KazooRetry(object):    """Helper for retrying a method in the face of retry-able    exceptions"""    RETRY_EXCEPTIONS = (        ConnectionLoss,        OperationTimeoutError,        ForceRetryError    )    EXPIRED_EXCEPTIONS = (        SessionExpiredError,    )    def __init__(self, max_tries=1, delay=0.1, backoff=2, max_jitter=None,                 max_delay=60, ignore_expire=True, sleep_func=time.sleep,                 deadline=None, interrupt=None):        """Create a :class:`KazooRetry` instance for retrying function        calls with uniform jitter        :param max_tries: How many times to retry the command. -1 means                          infinite tries.        :param delay: Initial delay between retry attempts.        :param backoff: Backoff multiplier between retry attempts.                        Defaults to 2 for exponential backoff.        :param max_jitter: *Deprecated* Jitter is now uniformly distributed                           across retries.        :param max_delay: Maximum delay in seconds, regardless of other                          backoff settings. Defaults to one minute.        :param ignore_expire:            Whether a session expiration should be ignored and treated            as a retry-able command.        :param interrupt:            Function that will be called with no args that may return            True if the retry should be ceased immediately. This will            be called no more than every 0.1 seconds during a wait            between retries.        """        if max_jitter is not None:            warnings.warn(                'Passing max_jitter to retry configuration is deprecated.'                ' Retry jitter is now automacallity uniform across retries.'                ' The parameter will be ignored.',                DeprecationWarning, stacklevel=2)        self.max_tries = max_tries 					# 最大重试次数        self.delay = delay 							# 重试延时时间        self.backoff = backoff        self.max_delay = float(max_delay)        self._attempts = 0 							# 尝试次数        self._cur_delay = delay 					# 当前延时        self.deadline = deadline 					# 停止时间        self._cur_stoptime = None         self.sleep_func = sleep_func 				# 休眠函数        self.retry_exceptions = self.RETRY_EX如果连接Los或者操作超时等错误CEPTIONS        self.interrupt = interrupt        if ignore_expire:            self.retry_exceptions += self.EXPIRED_EXCEPTIONS    def reset(self):        """Reset the attempt counter"""        self._attempts = 0 							# 重置为0        self._cur_delay = self.delay 				# 重置延迟        self._cur_stoptime = None 					# 置空停止时间    def copy(self):        """Return a clone of this retry manager"""        obj = KazooRetry(max_tries=self.max_tries,                         delay=self.delay,                         backoff=self.backoff,                         max_delay=self.max_delay,                         sleep_func=self.sleep_func,                         deadline=self.deadline,                         interrupt=self.interrupt)        obj.retry_exceptions = self.retry_exceptions        return obj 									# 重新生成一个已有参数的该类    def __call__(self, func, *args, **kwargs):        """Call a function with arguments until it completes without        throwing a Kazoo exception        :param func: Function to call        :param args: Positional arguments to call the function with        :params kwargs: Keyword arguments to call the function with        The function will be called until it doesn't throw one of the        retryable exceptions (ConnectionLoss, OperationTimeout, or        ForceRetryError), and optionally retrying on session        expiration.        """        self.reset() 								# 首先重置        while True: 								# 循环            try:                if self.deadline is not None and self._cur_stoptime is None: 	# 如果截止时间与当前停止时间都为空                    self._cur_stoptime = time.time() + self.deadline 			# 设置当前需要停止的时间                return func(*args, **kwargs) 									# 执行函数            except ConnectionClosedError:										# 如果连接出错直接报错                raise            except self.retry_exceptions: 						  				# 如果连接Los或者操作超时等错误				                # Note: max_tries == -1 means infinite tries.                if self._attempts == self.max_tries: 							# 检查当前重试次数是否与最大尝试次数一致                    raise RetryFailedError("Too many retry attempts") 			# 如果与最大尝试次数一致则报错                self._attempts += 1                sleeptime = random.randint(0, 1 + int(self._cur_delay)) 		# 随机休眠时间                if self._cur_stoptime is not None and \                   time.time() + sleeptime >= self._cur_stoptime:                    raise RetryFailedError("Exceeded retry deadline") 			# 如果当前停止时间不为空并且当前待休眠的时间超过需要停止的时间则报错                if self.interrupt: 												# 如果有中断                    while sleeptime > 0: 										# 当前休眠时间大于0                        # Break the time period down and sleep for no                        # longer than 0.1 before calling the interrupt                        if sleeptime < 0.1: 									# 如果休眠时间小于0.1                            self.sleep_func(sleeptime) 							# 休息该时间并减掉该时间                            sleeptime -= sleeptime                        else:                            self.sleep_func(0.1) 								# 休息0.1                            sleeptime -= 0.1                        if self.interrupt(): 									# 调用中断回调函数如果为真则报错                            raise InterruptedError()                else:                    self.sleep_func(sleeptime) 									# 如果没有则直接休眠                self._cur_delay = min(self._cur_delay * self.backoff,                                      self.max_delay) 							# 获取最小的延迟时间

通过调用retry的call方法,将执行传入的func并将该func加入了重试机制,并制定了休眠时间等额外扩展功能。

self._connect_attempt函数
while not close_connection:                # Watch for something to read or send                jitter_time = random.randint(0, 40) / 100.0                # Ensure our timeout is positive                timeout = max([read_timeout / 2.0 - jitter_time,                               jitter_time])                s = self.handler.select([self._socket, self._read_sock],                                        [], [], timeout)[0]                if not s:                    if self.ping_outstanding.is_set():                        self.ping_outstanding.clear()                        raise ConnectionDropped(                            "outstanding heartbeat ping not received")                    self._send_ping(connect_timeout)                elif s[0] == self._socket:                    response = self._read_socket(read_timeout)                    close_connection = response == CLOSE_RESPONSE                else:                    self._send_request(read_timeout, connect_timeout)

首先讲解的就是self.handler.select函数

handle.select

在self._connect_attempt函数中,调用了handler.select方法来进行对本地连接的socket进行可读可写的检查,在默认启动的过程中,handle默认初始化的是SequentialThreadingHandler,查看该类的select方法;

def select(self, *args, **kwargs):    # if we have epoll, and select is not expected to work    # use an epoll-based "select". Otherwise don't touch    # anything to minimize changes    if _HAS_EPOLL: 												# 检查本机是否支持epoll        # if the highest fd we've seen is > 1023        if max(map(_to_fileno, chain(*args[:3]))) > 1023: 	            return self._epoll_select(*args, **kwargs) 			# 调用epoll事件驱动模型    return self._select(*args, **kwargs) 						# 调用select事件驱动模型def _select(self, *args, **kwargs):    timeout = kwargs.pop('timeout', None) 						# 获取超时时间    # either the time to give up, or None    end = (time.time() + timeout) if timeout else None 			# 计算得出截止时间    while end is None or time.time() < end: 					# 如果超时时间没有或者当前时间小于终止时间        if end is not None: 									# 如果终止时间不为空则设置过期时间            # make a list, since tuples aren't mutable            args = list(args)            # set the timeout to the remaining time            args[3] = end - time.time()        try:            return select.select(*args, **kwargs) 				# 调用select函数        except select.error as ex:            # if the system call was interrupted, we'll retry until timeout            # in Python 3, system call interruptions are a native exception            # in Python 2, they are not            errnum = ex.errno if isinstance(ex, OSError) else ex[0]            if errnum == errno.EINTR:                continue            raise    # if we hit our timeout, lets return as a timeout    return ([], [], [])def _epoll_select(self, rlist, wlist, xlist, timeout=None):    """epoll-based drop-in replacement for select to overcome select    limitation on a maximum filehandle value    """    if timeout is None: 											#  如果超时时间未空        timeout = -1  												# 设置为-1    eventmasks = defaultdict(int)    rfd2obj = defaultdict(list)    wfd2obj = defaultdict(list)    xfd2obj = defaultdict(list) 									# 获取读写列表    read_evmask = select.EPOLLIN | select.EPOLLPRI  # Just in case 	# 设置读信号    def store_evmasks(obj_list, evmask, fd2obj):        for obj in obj_list:            fileno = _to_fileno(obj)            eventmasks[fileno] |= evmask            fd2obj[fileno].append(obj)    store_evmasks(rlist, read_evmask, rfd2obj) 						# 添加需要监听的信号    store_evmasks(wlist, select.EPOLLOUT, wfd2obj)    store_evmasks(xlist, select.EPOLLERR, xfd2obj)    poller = select.epoll() 										# 获取epoll    for fileno in eventmasks:        poller.register(fileno, eventmasks[fileno]) 				# 注册相关的文件描述符    try:        events = poller.poll(timeout) 								# 检查是否有事件发生        revents = []        wevents = []        xevents = []        for fileno, event in events: 								# 遍历事件            if event & read_evmask:                revents += rfd2obj.get(fileno, []) 					# 如果有读事件则添加到读事件中            if event & select.EPOLLOUT:                wevents += wfd2obj.get(fileno, []) 					# 如果有写事件则添加到写事件中            if event & select.EPOLLERR:                xevents += xfd2obj.get(fileno, []) 					# 如果有出错事件则添加到出错事件中    finally:        poller.close() 												# 关闭    return revents, wevents, xevents 								# 返回处理事件列表

从该类的处理流程可知,依靠IO复用来提供与服务端的交互。

读取服务器数据_read_socket
def _read_socket(self, read_timeout):    """Called when there's something to read on the socket"""    client = self.client    header, buffer, offset = self._read_header(read_timeout) 	 			# 读取头部信息	    if header.xid == PING_XID: 												# 如果读取的是ping心跳        self.logger.log(BLATHER, 'Received Ping')        self.ping_outstanding.clear()    elif header.xid == AUTH_XID: 											# 如果是认证信息        self.logger.log(BLATHER, 'Received AUTH')        request, async_object, xid = client._pending.popleft() 				        if header.err:            async_object.set_exception(AuthFailedError())            client._session_callback(KeeperState.AUTH_FAILED)        else:            async_object.set(True)    elif header.xid == WATCH_XID: 											# 如果是观察器        self._read_watch_event(buffer, offset)    elif self.sasl_cli and not self.sasl_cli.complete:        # SASL authentication is not yet finished, this can only        # be a SASL packet        self.logger.log(BLATHER, 'Received SASL')        try:            challenge, _ = SASL.deserialize(buffer, offset)        except Exception:            raise ConnectionDropped('error while SASL authentication.')        response = self.sasl_cli.process(challenge)        if response:            # authentication not yet finished, answering the challenge            self._send_sasl_request(challenge=response,                                    timeout=client._session_timeout)        else:            # authentication is ok, state is CONNECTED or CONNECTED_RO            # remove sensible information from the object            self._set_connected_ro_or_rw(client)            self.sasl_cli.dispose()    else:        self.logger.log(BLATHER, 'Reading for header %r', header)        return self._read_response(header, buffer, offset) 					# 如果都不是则返回读响应请求体def _read(self, length, timeout):    msgparts = []    remaining = length    with self._socket_error_handling():        while remaining > 0: 													# 如果还需要继续读            # Because of SSL framing, a select may not return when using            # an SSL socket because the underlying physical socket may not            # have anything to select, but the wrapped object may still            # have something to read as it has previously gotten enough            # data from the underlying socket.            if (hasattr(self._socket, "pending")                    and self._socket.pending() > 0): 							# 如果有pending属性并且pending不为空则什么都不执行                pass            else:                s = self.handler.select([self._socket], [], [], timeout)[0] 	# 检查socket是否有读有读事件                if not s:  # pragma: nocover                    # If the read list is empty, we got a timeout. We don't                    # have to check wlist and xlist as we don't set any                    raise self.handler.timeout_exception(                        "socket time-out during read") 							# 如果超时则报超时异常            chunk = self._socket.recv(remaining) 								# 获取指定长度的数据            if chunk == b'': 													# 如果为空则读失败即连接失败                raise ConnectionDropped('socket connection broken')            msgparts.append(chunk) 												# 保存读取的数据            remaining -= len(chunk)  											# 计算还需要读入多少数据        return b"".join(msgparts) 												# 返回总共的数据def _read_header(self, timeout):    b = self._read(4, timeout) 													# 读长度为4的数据    length = int_struct.unpack(b)[0] 											# 解析获取的数据包长度    b = self._read(length, timeout) 											# 读取剩余的长度    header, offset = ReplyHeader.deserialize(b, 0) 								# 解析数据    return header, b, offsetdef _read_response(self, header, buffer, offset):    client = self.client 														# 获取发送的客户端    request, async_object, xid = client._pending.popleft() 						# 获取请求队列中的数据    if header.zxid and header.zxid > 0: 										# 获取头部的zxid标记        client.last_zxid = header.zxid    if header.xid != xid: 														# 如果头部id与发送的id不一致则报错        exc = RuntimeError('xids do not match, expected %r '                           'received %r', xid, header.xid)        async_object.set_exception(exc)        raise exc    # Determine if its an exists request and a no node error    exists_error = (header.err == NoNodeError.code and                    request.type == Exists.type)    # Set the exception if its not an exists error    if header.err and not exists_error: 										# 检查是否出错        callback_exception = EXCEPTIONS[header.err]()        self.logger.debug(            'Received error(xid=%s) %r', xid, callback_exception)        if async_object:            async_object.set_exception(callback_exception)    elif request and async_object: 												# 检查是否有请求与处理结果对象        if exists_error:            # It's a NoNodeError, which is fine for an exists            # request            async_object.set(None)        else:            try:                response = request.deserialize(buffer, offset) 					# 解析返回数据中的数据            except Exception as exc:                 self.logger.exception(                    "Exception raised during deserialization "                    "of request: %s", request)                async_object.set_exception(exc)                return            self.logger.debug(                'Received response(xid=%s): %r', xid, response)            # We special case a Transaction as we have to unchroot things            if request.type == Transaction.type:                response = Transaction.unchroot(client, response)				            async_object.set(response) 											# 唤醒客户端等待数据的线程并将值传入        # Determine if watchers should be registered        watcher = getattr(request, 'watcher', None) 							# 检查是否有watcher        if not client._stopped.is_set() and watcher:            if isinstance(request, (GetChildren, GetChildren2)):                client._child_watchers[request.path].add(watcher) 				# 添加到watcher中            else:                client._data_watchers[request.path].add(watcher)    if isinstance(request, Close): 												# 检查是否关闭如果是则关闭        self.logger.log(BLATHER, 'Read close response')        return CLOSE_RESPONSE

主要就是根据传入的数据,去读取相关的数据,并根据接受的数据去解析头部,来决定是否需要继续读入,如果还需要读入则读入后解析成response,然后将response传入给当前客户端等待的线程。

向服务端发送数据_send_request
def _send_request(self, read_timeout, connect_timeout):    """Called when we have something to send out on the socket"""    client = self.client 													# 获取客户端    try:        request, async_object = client._queue[0] 							# 从需要发送数据的队列中读取待发送的数据    except IndexError:        # Not actually something on the queue, this can occur if        # something happens to cancel the request such that we        # don't clear the socket below after sending        try:            # Clear possible inconsistence (no request in the queue            # but have data in the read socket), which causes cpu to spin.            self._read_sock.recv(1)        except OSError:            pass        return    # Special case for testing, if this is a _SessionExpire object    # then throw a SessionExpiration error as if we were dropped    if request is _SESSION_EXPIRED: 										# 如果是请求过期则报错        raise SessionExpiredError("Session expired: Testing")    if request is _CONNECTION_DROP:        raise ConnectionDropped("Connection dropped: Testing")    # Special case for auth packets    if request.type == Auth.type: 											# 判断请求类型        xid = AUTH_XID    else:        self._xid = (self._xid % 2147483647) + 1        xid = self._xid    self._submit(request, connect_timeout, xid) 							# 提交该发送的请求    client._queue.popleft() 												# 从队列中移除该请求    self._read_sock.recv(1)    client._pending.append((request, async_object, xid)) 					# 在_pending中添加该已经处理的请求数据def _submit(self, request, timeout, xid=None):    """Submit a request object with a timeout value and optional    xid"""    b = bytearray() 														# 生成二进制    if xid:        b.extend(int_struct.pack(xid)) 										# 如果有xid则添加xid    if request.type:        b.extend(int_struct.pack(request.type)) 							# 如果有类型则添加类型    b += request.serialize() 												# 获取请求的序列化数据    self.logger.log(        (BLATHER if isinstance(request, Ping) else logging.DEBUG),        "Sending request(xid=%s): %s", xid, request)    self._write(int_struct.pack(len(b)) + b, timeout) 						# 将该数据发送出去def _write(self, msg, timeout):    """Write a raw msg to the socket"""    sent = 0    msg_length = len(msg)    with self._socket_error_handling():        while sent < msg_length: 											# 检查已经发送的数据长度            s = self.handler.select([], [self._socket], [], timeout)[1] 	# 检查是否可写            if not s:  # pragma: nocover                # If the write list is empty, we got a timeout. We don't                # have to check rlist and xlist as we don't set any                raise self.handler.timeout_exception("socket time-out"                                                     " during write")            msg_slice = buffer(msg, sent) 									# 获取待发送的数据            bytes_sent = self._socket.send(msg_slice) 						# 发送该数据            if not bytes_sent: 												# 如果发送失败则报错                raise ConnectionDropped('socket connection broken')            sent += bytes_sent 												# 增加已经发送的数据长度

发送请求的过程,就是根据请求进行序列化,然后将序列化的数据通过IO复用发送给服务器。

zk.exists("/my/favorite")的执行流程

def exists(self, path, watch=None):    """Check if a node exists.    If a watch is provided, it will be left on the node with the    given path. The watch will be triggered by a successful    operation that creates/deletes the node or sets the data on the    node.    :param path: Path of node.    :param watch: Optional watch callback to set for future changes                  to this path.    :returns: ZnodeStat of the node if it exists, else None if the              node does not exist.    :rtype: :class:`~kazoo.protocol.states.ZnodeStat` or `None`.    :raises:        :exc:`~kazoo.exceptions.ZookeeperError` if the server        returns a non-zero error code.    """    return self.exists_async(path, watch=watch).get() 		# 调用exists_async方法然后调用返回值的get方法def exists_async(self, path, watch=None):    """Asynchronously check if a node exists. Takes the same    arguments as :meth:`exists`.    :rtype: :class:`~kazoo.interfaces.IAsyncResult`    """    if not isinstance(path, string_types):        raise TypeError("Invalid type for 'path' (string expected)")    if watch and not callable(watch):        raise TypeError("Invalid type for 'watch' (must be a callable)")    async_result = self.handler.async_result() 					# 默认请求时SequentialThreadingHandler的async_result即是AsyncResult类实例    self._call(Exists(_prefix_root(self.chroot, path), watch), 			# 调用self._call方法并包装Exists类进行数据的序列化               async_result)    return async_result 										# 返回该实例def _call(self, request, async_object):    """Ensure there's an active connection and put the request in    the queue if there is.    Returns False if the call short circuits due to AUTH_FAILED,    CLOSED, EXPIRED_SESSION or CONNECTING state.    """    if self._state == KeeperState.AUTH_FAILED: 					# 检查是否认证失败        async_object.set_exception(AuthFailedError()) 			# 认证失败则报错认证失败        return False    elif self._state == KeeperState.CLOSED: 					# 检查是否关闭        async_object.set_exception(ConnectionClosedError(            "Connection has been closed")) 						# 抛出错误异常        return False    elif self._state in (KeeperState.EXPIRED_SESSION,                         KeeperState.CONNECTING): 				# 检查是否过期或者链接中        async_object.set_exception(SessionExpiredError()) 		# 报错        return False    self._queue.append((request, async_object)) 				# 添加到待处理列表中    # wake the connection, guarding against a race with close()    write_sock = self._connection._write_sock 					# 检查socket是否正常    if write_sock is None:        async_object.set_exception(ConnectionClosedError(            "Connection has been closed"))    try:        write_sock.send(b'\0')    except:        async_object.set_exception(ConnectionClosedError(            "Connection has been closed"))

此时就将exists通过Exists来序列化与解析从服务端返回的数据,调用self._call方法将request返回到_queue队列中,然后会在_send_request的处理流程中从队列中取出,然后进行数据发送,在数据发送给服务器的过程中,生成了一个async_result类实例,最后代用该实例的get()方法进行等待,由于默认的是SequentialThreadingHandler中的async_result实例,继续分析如下;

class AsyncResult(utils.AsyncResult):    """A one-time event that stores a value or an exception"""    def __init__(self, handler):        super(AsyncResult, self).__init__(handler,                                          threading.Condition,                                          KazooTimeoutError)class AsyncResult(object):    """A one-time event that stores a value or an exception"""    def __init__(self, handler, condition_factory, timeout_factory):        self._handler = handler 									# 传入SequentialThreadingHandler        self._exception = _NONE        self._condition = condition_factory() 						# 传入threading.Condition        self._callbacks = []        self._timeout_factory = timeout_factory        self.value = None    def ready(self):        """Return true if and only if it holds a value or an        exception"""        return self._exception is not _NONE    def successful(self):        """Return true if and only if it is ready and holds a value"""        return self._exception is None    @property    def exception(self):        if self._exception is not _NONE:            return self._exception    def set(self, value=None):        """Store the value. Wake up the waiters."""        with self._condition:            self.value = value 								# 当服务端将数据返回时先保存该值            self._exception = None 							# 设置异常为空            self._do_callbacks() 							# 执行回调函数            self._condition.notify_all() 					# 唤醒等待该条件变量的线程,以此达到将数据传入    def set_exception(self, exception):        """Store the exception. Wake up the waiters."""        with self._condition: 									            self._exception = exception  					# 如果有异常            self._do_callbacks() 							# 执行回调方法            self._condition.notify_all() 					# 唤醒所有等待该事件的线程    def get(self, block=True, timeout=None):        """Return the stored value or raise the exception.        If there is no value raises TimeoutError.        """        with self._condition: 								# 通过条件变量来实现数据的等待交互             if self._exception is not _NONE: 				# 如果异常不为空                if self._exception is None: 				# 如果异常为空                    return self.value 						# 返回存入的值                raise self._exception 						# 否则报错            elif block: 									# 如果是阻塞等待服务端结果                self._condition.wait(timeout) 				# 通过条件变量等待timeout时间,在本例中是阻塞等待数据                if self._exception is not _NONE: 			# 如果异常不为空                    if self._exception is None: 			# 判断是否为空                        return self.value					# 返回保存的数据                    raise self._exception 					# 否则直接抛出            # if we get to this point we timeout            raise self._timeout_factory()    def _do_callbacks(self):        """Execute the callbacks that were registered by :meth:`rawlink`.        If the handler is in running state this method only schedules        the calls to be performed by the handler. If it's stopped,        the callbacks are called right away."""        for callback in self._callbacks:            if self._handler.running:                self._handler.completion_queue.put(                    functools.partial(callback, self)) 				# 执行将回调函数推到队列中去执行            else:                functools.partial(callback, self)() 				# 如果没有运行则直接本地运行

至此可以完成的看出,通过conditon来完成将服务端处理完成的数据返回然等待服务端返回数据的线程继续执行下去,至此与服务器的基本交互流程就分析完成。

总结

通过分析zk.exists的工作流程,工作线程将客户端要发送的数据发送给服务器端,然后客户端线程陷入等待,等待工作线程读取到服务端返回的数据,然后继续执行下去,这大概就是与服务器交互的基本流程。由于本人才疏学浅,如有错误请批评指正。

转载地址:https://blog.csdn.net/qq_33339479/article/details/90322288 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:python3.7源码分析-字典
下一篇:kazoo源码分析:Zookeeper客户端start概述

发表评论

最新留言

初次前来,多多关照!
[***.217.46.12]2024年03月25日 01时55分15秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

java窗口内容如何复制_求助Java窗口菜单如何实现复制粘贴剪切等功能(内附源代码)... 2019-04-21
盾神与砝码称重java_[蓝桥杯][算法提高VIP]盾神与砝码称重 2019-04-21
java输出狗的各类信息_第九章Java输入输出操作 2019-04-21
java notify怎么用_java 如何使用notify() 2019-04-21
java加载指定文件为当前文本,java:如何使用bufferedreader读取特定的行 2019-04-21
java metrics 怎么样,Java metrics 2019-04-21
在vscode中php语言配置,Visual Studio Code C / C++ 语言环境配置 2019-04-21
php怎么翻译数据库中的中文,javascript – 如何将翻译后的文本插入数据库php 2019-04-21
普朗克公式matlab,用MATLAB实现普朗克函数积分的快捷计算.pdf 2019-04-21
swoolec+%3c?php,PHP+Swoole并发编程的魅力 2019-04-21
php 404配置,phpcms如何配置404 2019-04-21
matlab wash矩阵产生,洗衣机净衣效能与衣损程度的关系分析 2019-04-21
php中如何调用sql server,php调用SQL SERVER 2008及以上版本的方法 2019-04-21
python多线程实现kmeans,3种方式实现python多线程并发处理 2019-04-21
matlab 变量不存在,matlab程序运行时提示变量未定义 2019-04-21
php编码函数 base58,1. Base58可逆加密 2019-04-21
oracle 在需要下列之一,Oracle存储过程中PLS-00103:出现符号“/”在需要下列之一时:(... 2019-04-21
oracle10g dblink优化,Oracle10g通过dblink访问数据异常 2019-04-21
linux安装时的iso文件,直接用ISO文件在linux上安装新系统 2019-04-21
linux修改文件权限为所有人都可以访问,Linux 笔记分享八:文件权限的设定 2019-04-21