发布日期:2021-07-25 13:04:58 浏览次数:28 分类:技术文章

本文共 26655 字,大约阅读时间需要 88 分钟。








from kazoo.client import KazooClientzk = KazooClient(hosts='')zk.start()# Determine if a node existsif zk.exists("/my/favorite"):    # Do something# Print the version of a node and its datadata, stat = zk.get("/my/favorite")zk.stop()


class KazooClient(object):    """An Apache Zookeeper Python client supporting alternate callback    handlers and high-level functionality.    Watch functions registered with this class will not get session    events, unlike the default Zookeeper watches. They will also be    called with a single argument, a    :class:`~kazoo.protocol.states.WatchedEvent` instance.    """    def __init__(self, hosts='',                 timeout=10.0, client_id=None, handler=None,                 default_acl=None, auth_data=None, read_only=None,                 randomize_hosts=True, connection_retry=None,                 command_retry=None, logger=None, keyfile=None,                 keyfile_password=None, certfile=None, ca=None,                 use_ssl=False, verify_certs=True, **kwargs):        """Create a :class:`KazooClient` instance. All time arguments        are in seconds.        :param hosts: Comma-separated list of hosts to connect to                      (e.g.,,[::1]:2183).        :param timeout: The longest to wait for a Zookeeper connection.        :param client_id: A Zookeeper client id, used when                          re-establishing a prior session connection.        :param handler: An instance of a class implementing the                        :class:`~kazoo.interfaces.IHandler` interface                        for callback handling.        :param default_acl: A default ACL used on node creation.        :param auth_data:            A list of authentication credentials to use for the            connection. Should be a list of (scheme, credential)            tuples as :meth:`add_auth` takes.        :param read_only: Allow connections to read only servers.        :param randomize_hosts: By default randomize host selection.        :param connection_retry:            A :class:`kazoo.retry.KazooRetry` object to use for            retrying the connection to Zookeeper. Also can be a dict of            options which will be used for creating one.        :param command_retry:            A :class:`kazoo.retry.KazooRetry` object to use for            the :meth:`KazooClient.retry` method. Also can be a dict of            options which will be used for creating one.        :param logger: A custom logger to use instead of the module            global `log` instance.        :param keyfile: SSL keyfile to use for authentication        :param keyfile_password: SSL keyfile password        :param certfile: SSL certfile to use for authentication        :param ca: SSL CA file to use for authentication        :param use_ssl: argument to control whether SSL is used or not        :param verify_certs: when using SSL, argument to bypass            certs verification        Basic Example:        .. code-block:: python            zk = KazooClient()            zk.start()            children = zk.get_children('/')            zk.stop()        As a convenience all recipe classes are available as attributes        and get automatically bound to the client. For example::            zk = KazooClient()            zk.start()            lock = zk.Lock('/lock_path')        .. versionadded:: 0.6            The read_only option. Requires Zookeeper 3.4+        .. versionadded:: 0.6            The retry_max_delay option.        .. versionadded:: 0.6            The randomize_hosts option.        .. versionchanged:: 0.8            Removed the unused watcher argument (was second argument).        .. versionadded:: 1.2            The connection_retry, command_retry and logger options.        """        self.logger = logger or log                                 # 日志配置        # Record the handler strategy used        self.handler = handler if handler else SequentialThreadingHandler()     # 处理的模型,默认是线程处理        if inspect.isclass(self.handler):            raise ConfigurationError("Handler must be an instance of a class, "                                     "not the class: %s" % self.handler)        self.auth_data = auth_data if auth_data else set([])        # 是否传入认证信息        self.default_acl = default_acl                              # 创建节点的标志        self.randomize_hosts = randomize_hosts                      # 随机选择给定的远程主机,因为zookeeper可以集群部署        self.hosts = None                                           # 默认为连接的主机        self.chroot = None        self.set_hosts(hosts)                                       # 设置client连接的服务器        self.use_ssl = use_ssl                                      # 是否使用ssl来加密通信        self.verify_certs = verify_certs                            # 相关的ssl证书配置        self.certfile = certfile        self.keyfile = keyfile        self.keyfile_password = keyfile_password        self.ca = ca        # Curator like simplified state tracking, and listeners for        # state transitions        self._state = KeeperState.CLOSED                            # 设置当前状态为关闭        self.state = KazooState.LOST         self.state_listeners = set()                                # 添加接受者        self._child_watchers = defaultdict(set)                     # 设置监听者        self._data_watchers = defaultdict(set)        self._reset()                                               # 重新初始化相关数据        self.read_only = read_only        if client_id:            self._session_id = client_id[0]                         # 是否使用传入的id与密码            self._session_passwd = client_id[1]        else:            self._reset_session()                                   # 没有则重置会话        # ZK uses milliseconds        self._session_timeout = int(timeout * 1000)                 # 设置会话超时时间        # We use events like twitter's client to track current and        # desired state (connected, and whether to shutdown)        self._live = self.handler.event_object()                    # 获取事件如果为thread 则返回event        self._writer_stopped = self.handler.event_object()        self._stopped = self.handler.event_object()        self._stopped.set()        self._writer_stopped.set()        self.retry = self._conn_retry = None                        # 重连设置 默认情况下会被设置为KazooRetry实例        if type(connection_retry) is dict:            self._conn_retry = KazooRetry(**connection_retry)        elif type(connection_retry) is KazooRetry:            self._conn_retry = connection_retry        if type(command_retry) is dict:            self.retry = KazooRetry(**command_retry)        elif type(command_retry) is KazooRetry:            self.retry = command_retry        if type(self._conn_retry) is KazooRetry:            if self.handler.sleep_func != self._conn_retry.sleep_func:                raise ConfigurationError("Retry handler and event handler "                                         " must use the same sleep func")        if type(self.retry) is KazooRetry:            if self.handler.sleep_func != self.retry.sleep_func:                raise ConfigurationError(                    "Command retry handler and event handler "                    "must use the same sleep func")        if self.retry is None or self._conn_retry is None:            old_retry_keys = dict(_RETRY_COMPAT_DEFAULTS)            for key in old_retry_keys:                try:                    old_retry_keys[key] = kwargs.pop(key)                    warnings.warn(                        'Passing retry configuration param %s to the '                        'client directly is deprecated, please pass a '                        'configured retry object (using param %s)' % (                            key, _RETRY_COMPAT_MAPPING[key]),                        DeprecationWarning, stacklevel=2)                except KeyError:                    pass            retry_keys = {}            for oldname, value in old_retry_keys.items():                retry_keys[_RETRY_COMPAT_MAPPING[oldname]] = value            if self._conn_retry is None:                self._conn_retry = KazooRetry(                    sleep_func=self.handler.sleep_func,                    **retry_keys)            if self.retry is None:                self.retry = KazooRetry(                    sleep_func=self.handler.sleep_func,                    **retry_keys)        self._conn_retry.interrupt = lambda: self._stopped.is_set()        self._connection = ConnectionHandler(            self, self._conn_retry.copy(), logger=self.logger)                  # 连接初始化        # Every retry call should have its own copy of the retry helper        # to avoid shared retry counts        self._retry = self.retry        def _retry(*args, **kwargs):            return self._retry.copy()(*args, **kwargs)                          # 重新初始化一个相同参数的类实例        self.retry = _retry        self.Barrier = partial(Barrier, self)                                   # 默认在类初始化时传入client        self.Counter = partial(Counter, self)        self.DoubleBarrier = partial(DoubleBarrier, self)        self.ChildrenWatch = partial(ChildrenWatch, self)        self.DataWatch = partial(DataWatch, self)        self.Election = partial(Election, self)        self.NonBlockingLease = partial(NonBlockingLease, self)        self.MultiNonBlockingLease = partial(MultiNonBlockingLease, self)        self.Lock = partial(Lock, self)        self.ReadLock = partial(ReadLock, self)        self.WriteLock = partial(WriteLock, self)        self.Party = partial(Party, self)        self.Queue = partial(Queue, self)        self.LockingQueue = partial(LockingQueue, self)        self.SetPartitioner = partial(SetPartitioner, self)        self.Semaphore = partial(Semaphore, self)        self.ShallowParty = partial(ShallowParty, self)        # Managing SASL client        self.use_sasl = False        for scheme, auth in self.auth_data:            if scheme == "sasl":                self.use_sasl = True                # Could be used later for GSSAPI implementation                self.sasl_server_principal = "zk-sasl-md5"                break        # If we got any unhandled keywords, complain like Python would        if kwargs:            raise TypeError('__init__() got unexpected keyword arguments: %s'                            % (kwargs.keys(),))


def start(self, timeout=15):    """Initiate connection to ZK.    :param timeout: Time in seconds to wait for connection to                    succeed.    :raises: :attr:`~kazoo.interfaces.IHandler.timeout_exception`             if the connection wasn't established within `timeout`             seconds.    """    event = self.start_async()                                  # 启动连接    event.wait(timeout=timeout)                                 # 等待默认的时间    if not self.connected:                                      # 如果连接不成功        # We time-out, ensure we are disconnected        self.stop()                                             # 关闭连接并报错        raise self.handler.timeout_exception("Connection time-out")    if self.chroot and not self.exists("/"):                            warnings.warn("No chroot path exists, the chroot path "                      "should be created before normal use.")


def start_async(self):    """Asynchronously initiate connection to ZK.    :returns: An event object that can be checked to see if the              connection is alive.    :rtype: :class:`~threading.Event` compatible object.    """    # If we're already connected, ignore    if self._live.is_set():                                 # 检查是否已经开始如果开始则直接返回该值        return self._live    # Make sure we're safely closed    self._safe_close()                                      # 确保在重新开始的时候,旧的已经安全关闭,即如果有数据在进行处理先将数据处理完成并停止    # We've been asked to connect, clear the stop and our writer    # thread indicator    self._stopped.clear()                                   # 重置    self._writer_stopped.clear()    # Start the handler    self.handler.start()                                    # 调用handler的start方法,默认是SequentialThreadingHandler的start方法    # Start the connection    self._connection.start()                                # 连接开始    return self._live                                       # 返回


def _create_thread_worker(self, queue):    def _thread_worker():  # pragma: nocover        while True:                                         # 循环获取队列中的数据            try:                func = queue.get()                          # 获取数据                try:                    if func is _STOP:                       # 如果是停止则中断循环                        break                    func()                                  # 直接执行回调函数                except Exception:                    log.exception("Exception in worker queue thread")                finally:                    queue.task_done()                       # 确保task_done                    del func  # release before possible idle            except self.queue_empty:                        # 如果为空则继续循环                continue    t = self.spawn(_thread_worker)                          # 调用生成工作线程的方法并传入待执行函数    return tdef spawn(self, func, *args, **kwargs):    t = threading.Thread(target=func, args=args, kwargs=kwargs)     # 设置线程    t.daemon = True                                                 # 设置为守护线程    t.start()                                                       # 开始运行并返回    return tdef start(self):    """Start the worker threads."""    with self._state_change:                                # 获取线程锁        if self._running:                                   # 如果是运行状态则返回            return        # Spawn our worker threads, we have        # - A callback worker for watch events to be called        # - A completion worker for completion events to be called        for queue in (self.completion_queue, self.callback_queue):  # 遍历两个队列            w = self._create_thread_worker(queue)                   # 调用创建用户工作线程            self._workers.append(w)                                 # 添加到_workers中        self._running = True                                        # 设置运行状态        python2atexit.register(self.stop)


def start(self):    """Start the connection up"""    if self.connection_closed.is_set():        rw_sockets = self.handler.create_socket_pair()              # 获取读写描述符        self._read_sock, self._write_sock = rw_sockets        self.connection_closed.clear()                                  if self._connection_routine:        raise Exception("Unable to start, connection routine already "                        "active.")    self._connection_routine = self.handler.spawn(self.zk_loop)     # 通过client的handler来生成工作线程def zk_loop(self):    """Main Zookeeper handling loop"""    self.logger.log(BLATHER, 'ZK loop started')    self.connection_stopped.clear()                                 # 清除停止连接    retry = self.retry_sleeper.copy()                               # 重试类拷贝    try:        while not self.client._stopped.is_set():                    # 如果没有被设置为停止            # If the connect_loop returns STOP_CONNECTING, stop retrying            if retry(self._connect_loop, retry) is STOP_CONNECTING:     # 循环执行self._connect_loop                break    except RetryFailedError:        self.logger.warning("Failed connecting to Zookeeper "                            "within the connection retry policy.")    finally:        self.connection_stopped.set()                               # 如果退出或者异常则清除相关数据并关闭会话        self.client._session_callback(KeeperState.CLOSED)        self.logger.log(BLATHER, 'Connection stopped')def _connect_loop(self, retry):    # Iterate through the hosts a full cycle before starting over    status = None    host_ports = self._expand_client_hosts()    # Check for an empty hostlist, indicating none resolved    if len(host_ports) == 0:                                        # 判断连接的hosts列表是否为空为空则返回停止连接        return STOP_CONNECTING    for host, port in host_ports:                                   # 遍历列表        if self.client._stopped.is_set():                           # 检查是否停止            status = STOP_CONNECTING            break        status = self._connect_attempt(host, port, retry)           # 尝试连接        if status is STOP_CONNECTING:                                           break    if status is STOP_CONNECTING:                                   # 如果所有host都连接失败则返回停止        return STOP_CONNECTING    else:        raise ForceRetryError('Reconnecting')                       # 否则就是重连def _connect_attempt(self, host, port, retry):    client = self.client    KazooTimeoutError = self.handler.timeout_exception              # 获取超时异常    close_connection = False                                        # 设置为false    self._socket = None    # Were we given a r/w server? If so, use that instead    if self._rw_server:        self.logger.log(BLATHER,                        "Found r/w server to use, %s:%s", host, port)        host, port = self._rw_server        self._rw_server = None    if client._state != KeeperState.CONNECTING:                     # 如果不是连接中则添加会话回调函数        client._session_callback(KeeperState.CONNECTING)    try:        self._xid = 0        read_timeout, connect_timeout = self._connect(host, port)   # 连接远端并返回时间        read_timeout = read_timeout / 1000.0        connect_timeout = connect_timeout / 1000.0        retry.reset()        self.ping_outstanding.clear()                                       with self._socket_error_handling():            while not close_connection:                             # 如果没有关闭                # Watch for something to read or send                jitter_time = random.randint(0, 40) / 100.0         # 获取随机的时间                # Ensure our timeout is positive                timeout = max([read_timeout / 2.0 - jitter_time,                               jitter_time])                        # 计算出timeout时间                s = self.handler.select([self._socket, self._read_sock],                                        [], [], timeout)[0]         # 调用handler的IO复用函数,监听读事件                if not s:                                           # 如果没有事件                    if self.ping_outstanding.is_set():              # 如果Ping没有被设置                        self.ping_outstanding.clear()                        raise ConnectionDropped(                            "outstanding heartbeat ping not received")                    self._send_ping(connect_timeout)                # 发送心跳                elif s[0] == self._socket:                          # 如果是连接请求则证明有数据可读                    response = self._read_socket(read_timeout)      # 读数据并处理数据                    close_connection = response == CLOSE_RESPONSE   # 判断是否关闭响应                else:                    self._send_request(read_timeout, connect_timeout)  # 尝试发送数据给服务端        self.logger.info('Closing connection to %s:%s', host, port)        client._session_callback(KeeperState.CLOSED)                # 事件循环停止则证明结束        return STOP_CONNECTING                                      # 返回停止连接    except (ConnectionDropped, KazooTimeoutError) as e:        if isinstance(e, ConnectionDropped):            self.logger.warning('Connection dropped: %s', e)        else:            self.logger.warning('Connection time-out: %s', e)        if client._state != KeeperState.CONNECTING:            self.logger.warning("Transition to CONNECTING")            client._session_callback(KeeperState.CONNECTING)    except AuthFailedError:        retry.reset()        self.logger.warning('AUTH_FAILED closing')        client._session_callback(KeeperState.AUTH_FAILED)        return STOP_CONNECTING    except SessionExpiredError:        retry.reset()        self.logger.warning('Session has expired')        client._session_callback(KeeperState.EXPIRED_SESSION)    except RWServerAvailable:        retry.reset()        self.logger.warning('Found a RW server, dropping connection')        client._session_callback(KeeperState.CONNECTING)    except Exception:        self.logger.exception('Unhandled exception in connection loop')        raise    finally:        if self._socket is not None:            self._socket.close()def _connect(self, host, port):    client = self.client    self.logger.info('Connecting to %s:%s, use_ssl: %r',                     host, port, self.client.use_ssl)    self.logger.log(BLATHER,                    '    Using session_id: %r session_passwd: %s',                    client._session_id,                    hexlify(client._session_passwd))    with self._socket_error_handling():        self._socket = self.handler.create_connection(            address=(host, port),            timeout=client._session_timeout / 1000.0,            use_ssl=self.client.use_ssl,            keyfile=self.client.keyfile,            certfile=self.client.certfile,            ca=self.client.ca,            keyfile_password=self.client.keyfile_password,            verify_certs=self.client.verify_certs,        )                                                               # 创建连接    self._socket.setblocking(0)                                         # 设置为非阻塞    connect = Connect(0, client.last_zxid, client._session_timeout,                      client._session_id or 0, client._session_passwd,                      client.read_only)                                 # 序列化连接的协议数据    connect_result, zxid = self._invoke(        client._session_timeout / 1000.0 / len(client.hosts), connect)  # 将该数据发送给客户端    if connect_result.time_out <= 0:        raise SessionExpiredError("Session has expired")                # 判断是否超时    if zxid:        client.last_zxid = zxid                                         # 设置返回值    # Load return values    client._session_id = connect_result.session_id                      # 设置session返回的id 协议与数据等相关参数    client._protocol_version = connect_result.protocol_version    negotiated_session_timeout = connect_result.time_out    connect_timeout = negotiated_session_timeout / len(client.hosts)    read_timeout = negotiated_session_timeout * 2.0 / 3.0    client._session_passwd = connect_result.passwd    self.logger.log(BLATHER,                    'Session created, session_id: %r session_passwd: %s\n'                    '    negotiated session timeout: %s\n'                    '    connect timeout: %s\n'                    '    read timeout: %s', client._session_id,                    hexlify(client._session_passwd),                    negotiated_session_timeout, connect_timeout,                    read_timeout)    if connect_result.read_only:        self._ro = True    # Get a copy of the auth data before iterating, in case it is    # changed.    client_auth_data_copy = copy.copy(client.auth_data)    if client.use_sasl and self.sasl_cli is None:        if PURESASL_AVAILABLE:            for scheme, auth in client_auth_data_copy:                if scheme == 'sasl':                    username, password = auth.split(":")                    self.sasl_cli = SASLClient(                        host=client.sasl_server_principal,                        service='zookeeper',                        mechanism='DIGEST-MD5',                        username=username,                        password=password                    )                    break            # As described in rfc            # https://tools.ietf.org/html/rfc2831#section-2.1            # sending empty challenge            self._send_sasl_request(challenge=b'',                                    timeout=connect_timeout)        else:            self.logger.warn('Pure-sasl library is missing while sasl'                             ' authentification is configured. Please'                             ' install pure-sasl library to connect '                             'using sasl. Now falling back '                             'connecting WITHOUT any '                             'authentification.')            client.use_sasl = False            self._set_connected_ro_or_rw(client)    else:        self._set_connected_ro_or_rw(client)        for scheme, auth in client_auth_data_copy:            if scheme == "digest":                ap = Auth(0, scheme, auth)                zxid = self._invoke(                    connect_timeout / 1000.0,                    ap,                    xid=AUTH_XID                )                if zxid:                    client.last_zxid = zxid    return read_timeout, connect_timeoutdef _invoke(self, timeout, request, xid=None):    """A special writer used during connection establishment    only"""    self._submit(request, timeout, xid)                     # 提交数据并发送请求    zxid = None    if xid:                                                 # 如果传入了设置id        header, buffer, offset = self._read_header(timeout)  # 检查返回的是否包含该处理的id        if header.xid != xid:                                   # 如果不相等则报错            raise RuntimeError('xids do not match, expected %r '                               'received %r', xid, header.xid)        if header.zxid > 0:            zxid = header.zxid        if header.err:            callback_exception = EXCEPTIONS[header.err]()            self.logger.debug(                'Received error(xid=%s) %r', xid, callback_exception)            raise callback_exception        return zxid    msg = self._read(4, timeout)                            # 获取信息    length = int_struct.unpack(msg)[0]                      # 获取长度    msg = self._read(length, timeout)                       # 读取信息    if hasattr(request, 'deserialize'):                     # 检查是否有解析方法        try:            obj, _ = request.deserialize(msg, 0)            # 解析返回的数据信息        except Exception:            self.logger.exception(                "Exception raised during deserialization "                "of request: %s", request)            # raise ConnectionDropped so connect loop will retry            raise ConnectionDropped('invalid server response')        self.logger.log(BLATHER, 'Read response %s', obj)        return obj, zxid    return zxid




转载地址:https://blog.csdn.net/qq_33339479/article/details/90312051 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!




[***.217.46.12]2023年09月17日 01时03分54秒