
本文共 26655 字,大约阅读时间需要 88 分钟。
kazoo源码分析
kazoo-2.6.1
kazoo客户端
kazoo是一个由Python编写的zookeeper客户端,实现了zookeeper协议,从而提供了Python与zookeeper服务器连接的工具,有关zookeeper服务端的使用大家可自行查阅相关资料,本文主要概述一下kazoo作为一个客户端是如何实现与zookeeper进行通信,客户端的架构是如何实现的。
kazoo客户端start启动流程
示例代码
本文主要是分析一下基本流程与启动的架构实现,示例代码如下;
from kazoo.client import KazooClientzk = KazooClient(hosts='127.0.0.1:2181')zk.start()# Determine if a node existsif zk.exists("/my/favorite"): # Do something# Print the version of a node and its datadata, stat = zk.get("/my/favorite")zk.stop()
初始化
下面就从KazooClient类的初始化开始;
class KazooClient(object): """An Apache Zookeeper Python client supporting alternate callback handlers and high-level functionality. Watch functions registered with this class will not get session events, unlike the default Zookeeper watches. They will also be called with a single argument, a :class:`~kazoo.protocol.states.WatchedEvent` instance. """ def __init__(self, hosts='127.0.0.1:2181', timeout=10.0, client_id=None, handler=None, default_acl=None, auth_data=None, read_only=None, randomize_hosts=True, connection_retry=None, command_retry=None, logger=None, keyfile=None, keyfile_password=None, certfile=None, ca=None, use_ssl=False, verify_certs=True, **kwargs): """Create a :class:`KazooClient` instance. All time arguments are in seconds. :param hosts: Comma-separated list of hosts to connect to (e.g. 127.0.0.1:2181,127.0.0.1:2182,[::1]:2183). :param timeout: The longest to wait for a Zookeeper connection. :param client_id: A Zookeeper client id, used when re-establishing a prior session connection. :param handler: An instance of a class implementing the :class:`~kazoo.interfaces.IHandler` interface for callback handling. :param default_acl: A default ACL used on node creation. :param auth_data: A list of authentication credentials to use for the connection. Should be a list of (scheme, credential) tuples as :meth:`add_auth` takes. :param read_only: Allow connections to read only servers. :param randomize_hosts: By default randomize host selection. :param connection_retry: A :class:`kazoo.retry.KazooRetry` object to use for retrying the connection to Zookeeper. Also can be a dict of options which will be used for creating one. :param command_retry: A :class:`kazoo.retry.KazooRetry` object to use for the :meth:`KazooClient.retry` method. Also can be a dict of options which will be used for creating one. :param logger: A custom logger to use instead of the module global `log` instance. :param keyfile: SSL keyfile to use for authentication :param keyfile_password: SSL keyfile password :param certfile: SSL certfile to use for authentication :param ca: SSL CA file to use for authentication :param use_ssl: argument to control whether SSL is used or not :param verify_certs: when using SSL, argument to bypass certs verification Basic Example: .. code-block:: python zk = KazooClient() zk.start() children = zk.get_children('/') zk.stop() As a convenience all recipe classes are available as attributes and get automatically bound to the client. For example:: zk = KazooClient() zk.start() lock = zk.Lock('/lock_path') .. versionadded:: 0.6 The read_only option. Requires Zookeeper 3.4+ .. versionadded:: 0.6 The retry_max_delay option. .. versionadded:: 0.6 The randomize_hosts option. .. versionchanged:: 0.8 Removed the unused watcher argument (was second argument). .. versionadded:: 1.2 The connection_retry, command_retry and logger options. """ self.logger = logger or log # 日志配置 # Record the handler strategy used self.handler = handler if handler else SequentialThreadingHandler() # 处理的模型,默认是线程处理 if inspect.isclass(self.handler): raise ConfigurationError("Handler must be an instance of a class, " "not the class: %s" % self.handler) self.auth_data = auth_data if auth_data else set([]) # 是否传入认证信息 self.default_acl = default_acl # 创建节点的标志 self.randomize_hosts = randomize_hosts # 随机选择给定的远程主机,因为zookeeper可以集群部署 self.hosts = None # 默认为连接的主机 self.chroot = None self.set_hosts(hosts) # 设置client连接的服务器 self.use_ssl = use_ssl # 是否使用ssl来加密通信 self.verify_certs = verify_certs # 相关的ssl证书配置 self.certfile = certfile self.keyfile = keyfile self.keyfile_password = keyfile_password self.ca = ca # Curator like simplified state tracking, and listeners for # state transitions self._state = KeeperState.CLOSED # 设置当前状态为关闭 self.state = KazooState.LOST self.state_listeners = set() # 添加接受者 self._child_watchers = defaultdict(set) # 设置监听者 self._data_watchers = defaultdict(set) self._reset() # 重新初始化相关数据 self.read_only = read_only if client_id: self._session_id = client_id[0] # 是否使用传入的id与密码 self._session_passwd = client_id[1] else: self._reset_session() # 没有则重置会话 # ZK uses milliseconds self._session_timeout = int(timeout * 1000) # 设置会话超时时间 # We use events like twitter's client to track current and # desired state (connected, and whether to shutdown) self._live = self.handler.event_object() # 获取事件如果为thread 则返回event self._writer_stopped = self.handler.event_object() self._stopped = self.handler.event_object() self._stopped.set() self._writer_stopped.set() self.retry = self._conn_retry = None # 重连设置 默认情况下会被设置为KazooRetry实例 if type(connection_retry) is dict: self._conn_retry = KazooRetry(**connection_retry) elif type(connection_retry) is KazooRetry: self._conn_retry = connection_retry if type(command_retry) is dict: self.retry = KazooRetry(**command_retry) elif type(command_retry) is KazooRetry: self.retry = command_retry if type(self._conn_retry) is KazooRetry: if self.handler.sleep_func != self._conn_retry.sleep_func: raise ConfigurationError("Retry handler and event handler " " must use the same sleep func") if type(self.retry) is KazooRetry: if self.handler.sleep_func != self.retry.sleep_func: raise ConfigurationError( "Command retry handler and event handler " "must use the same sleep func") if self.retry is None or self._conn_retry is None: old_retry_keys = dict(_RETRY_COMPAT_DEFAULTS) for key in old_retry_keys: try: old_retry_keys[key] = kwargs.pop(key) warnings.warn( 'Passing retry configuration param %s to the ' 'client directly is deprecated, please pass a ' 'configured retry object (using param %s)' % ( key, _RETRY_COMPAT_MAPPING[key]), DeprecationWarning, stacklevel=2) except KeyError: pass retry_keys = {} for oldname, value in old_retry_keys.items(): retry_keys[_RETRY_COMPAT_MAPPING[oldname]] = value if self._conn_retry is None: self._conn_retry = KazooRetry( sleep_func=self.handler.sleep_func, **retry_keys) if self.retry is None: self.retry = KazooRetry( sleep_func=self.handler.sleep_func, **retry_keys) self._conn_retry.interrupt = lambda: self._stopped.is_set() self._connection = ConnectionHandler( self, self._conn_retry.copy(), logger=self.logger) # 连接初始化 # Every retry call should have its own copy of the retry helper # to avoid shared retry counts self._retry = self.retry def _retry(*args, **kwargs): return self._retry.copy()(*args, **kwargs) # 重新初始化一个相同参数的类实例 self.retry = _retry self.Barrier = partial(Barrier, self) # 默认在类初始化时传入client self.Counter = partial(Counter, self) self.DoubleBarrier = partial(DoubleBarrier, self) self.ChildrenWatch = partial(ChildrenWatch, self) self.DataWatch = partial(DataWatch, self) self.Election = partial(Election, self) self.NonBlockingLease = partial(NonBlockingLease, self) self.MultiNonBlockingLease = partial(MultiNonBlockingLease, self) self.Lock = partial(Lock, self) self.ReadLock = partial(ReadLock, self) self.WriteLock = partial(WriteLock, self) self.Party = partial(Party, self) self.Queue = partial(Queue, self) self.LockingQueue = partial(LockingQueue, self) self.SetPartitioner = partial(SetPartitioner, self) self.Semaphore = partial(Semaphore, self) self.ShallowParty = partial(ShallowParty, self) # Managing SASL client self.use_sasl = False for scheme, auth in self.auth_data: if scheme == "sasl": self.use_sasl = True # Could be used later for GSSAPI implementation self.sasl_server_principal = "zk-sasl-md5" break # If we got any unhandled keywords, complain like Python would if kwargs: raise TypeError('__init__() got unexpected keyword arguments: %s' % (kwargs.keys(),))
从该流程可以看出,初始化的过程中,大致重置了数据,设置会话,配置是否认证,连接等信息。大家通过英文注释也可看出主要的参数信息与含义。
zk.start的过程
def start(self, timeout=15): """Initiate connection to ZK. :param timeout: Time in seconds to wait for connection to succeed. :raises: :attr:`~kazoo.interfaces.IHandler.timeout_exception` if the connection wasn't established within `timeout` seconds. """ event = self.start_async() # 启动连接 event.wait(timeout=timeout) # 等待默认的时间 if not self.connected: # 如果连接不成功 # We time-out, ensure we are disconnected self.stop() # 关闭连接并报错 raise self.handler.timeout_exception("Connection time-out") if self.chroot and not self.exists("/"): warnings.warn("No chroot path exists, the chroot path " "should be created before normal use.")
此时,执行了self.start_async()方法,并调用了wait()。
def start_async(self): """Asynchronously initiate connection to ZK. :returns: An event object that can be checked to see if the connection is alive. :rtype: :class:`~threading.Event` compatible object. """ # If we're already connected, ignore if self._live.is_set(): # 检查是否已经开始如果开始则直接返回该值 return self._live # Make sure we're safely closed self._safe_close() # 确保在重新开始的时候,旧的已经安全关闭,即如果有数据在进行处理先将数据处理完成并停止 # We've been asked to connect, clear the stop and our writer # thread indicator self._stopped.clear() # 重置 self._writer_stopped.clear() # Start the handler self.handler.start() # 调用handler的start方法,默认是SequentialThreadingHandler的start方法 # Start the connection self._connection.start() # 连接开始 return self._live # 返回
这其中就是,主要是先优雅的关闭如果已经存在的连接,然后重置数据,调用handler的start方法,然后调用连接的start方法。我们继续查看SequentialThreadingHandler的start方法与_connection.start方法。
def _create_thread_worker(self, queue): def _thread_worker(): # pragma: nocover while True: # 循环获取队列中的数据 try: func = queue.get() # 获取数据 try: if func is _STOP: # 如果是停止则中断循环 break func() # 直接执行回调函数 except Exception: log.exception("Exception in worker queue thread") finally: queue.task_done() # 确保task_done del func # release before possible idle except self.queue_empty: # 如果为空则继续循环 continue t = self.spawn(_thread_worker) # 调用生成工作线程的方法并传入待执行函数 return tdef spawn(self, func, *args, **kwargs): t = threading.Thread(target=func, args=args, kwargs=kwargs) # 设置线程 t.daemon = True # 设置为守护线程 t.start() # 开始运行并返回 return tdef start(self): """Start the worker threads.""" with self._state_change: # 获取线程锁 if self._running: # 如果是运行状态则返回 return # Spawn our worker threads, we have # - A callback worker for watch events to be called # - A completion worker for completion events to be called for queue in (self.completion_queue, self.callback_queue): # 遍历两个队列 w = self._create_thread_worker(queue) # 调用创建用户工作线程 self._workers.append(w) # 添加到_workers中 self._running = True # 设置运行状态 python2atexit.register(self.stop)
可知SequentialThreadingHandler主要就是启动了两个线程去处理队列中的回调函数,并通过向队列中发送_STOP数据来控制线程是否继续执行。接下来看connection的start方法;
def start(self): """Start the connection up""" if self.connection_closed.is_set(): rw_sockets = self.handler.create_socket_pair() # 获取读写描述符 self._read_sock, self._write_sock = rw_sockets self.connection_closed.clear() if self._connection_routine: raise Exception("Unable to start, connection routine already " "active.") self._connection_routine = self.handler.spawn(self.zk_loop) # 通过client的handler来生成工作线程def zk_loop(self): """Main Zookeeper handling loop""" self.logger.log(BLATHER, 'ZK loop started') self.connection_stopped.clear() # 清除停止连接 retry = self.retry_sleeper.copy() # 重试类拷贝 try: while not self.client._stopped.is_set(): # 如果没有被设置为停止 # If the connect_loop returns STOP_CONNECTING, stop retrying if retry(self._connect_loop, retry) is STOP_CONNECTING: # 循环执行self._connect_loop break except RetryFailedError: self.logger.warning("Failed connecting to Zookeeper " "within the connection retry policy.") finally: self.connection_stopped.set() # 如果退出或者异常则清除相关数据并关闭会话 self.client._session_callback(KeeperState.CLOSED) self.logger.log(BLATHER, 'Connection stopped')def _connect_loop(self, retry): # Iterate through the hosts a full cycle before starting over status = None host_ports = self._expand_client_hosts() # Check for an empty hostlist, indicating none resolved if len(host_ports) == 0: # 判断连接的hosts列表是否为空为空则返回停止连接 return STOP_CONNECTING for host, port in host_ports: # 遍历列表 if self.client._stopped.is_set(): # 检查是否停止 status = STOP_CONNECTING break status = self._connect_attempt(host, port, retry) # 尝试连接 if status is STOP_CONNECTING: break if status is STOP_CONNECTING: # 如果所有host都连接失败则返回停止 return STOP_CONNECTING else: raise ForceRetryError('Reconnecting') # 否则就是重连def _connect_attempt(self, host, port, retry): client = self.client KazooTimeoutError = self.handler.timeout_exception # 获取超时异常 close_connection = False # 设置为false self._socket = None # Were we given a r/w server? If so, use that instead if self._rw_server: self.logger.log(BLATHER, "Found r/w server to use, %s:%s", host, port) host, port = self._rw_server self._rw_server = None if client._state != KeeperState.CONNECTING: # 如果不是连接中则添加会话回调函数 client._session_callback(KeeperState.CONNECTING) try: self._xid = 0 read_timeout, connect_timeout = self._connect(host, port) # 连接远端并返回时间 read_timeout = read_timeout / 1000.0 connect_timeout = connect_timeout / 1000.0 retry.reset() self.ping_outstanding.clear() with self._socket_error_handling(): while not close_connection: # 如果没有关闭 # Watch for something to read or send jitter_time = random.randint(0, 40) / 100.0 # 获取随机的时间 # Ensure our timeout is positive timeout = max([read_timeout / 2.0 - jitter_time, jitter_time]) # 计算出timeout时间 s = self.handler.select([self._socket, self._read_sock], [], [], timeout)[0] # 调用handler的IO复用函数,监听读事件 if not s: # 如果没有事件 if self.ping_outstanding.is_set(): # 如果Ping没有被设置 self.ping_outstanding.clear() raise ConnectionDropped( "outstanding heartbeat ping not received") self._send_ping(connect_timeout) # 发送心跳 elif s[0] == self._socket: # 如果是连接请求则证明有数据可读 response = self._read_socket(read_timeout) # 读数据并处理数据 close_connection = response == CLOSE_RESPONSE # 判断是否关闭响应 else: self._send_request(read_timeout, connect_timeout) # 尝试发送数据给服务端 self.logger.info('Closing connection to %s:%s', host, port) client._session_callback(KeeperState.CLOSED) # 事件循环停止则证明结束 return STOP_CONNECTING # 返回停止连接 except (ConnectionDropped, KazooTimeoutError) as e: if isinstance(e, ConnectionDropped): self.logger.warning('Connection dropped: %s', e) else: self.logger.warning('Connection time-out: %s', e) if client._state != KeeperState.CONNECTING: self.logger.warning("Transition to CONNECTING") client._session_callback(KeeperState.CONNECTING) except AuthFailedError: retry.reset() self.logger.warning('AUTH_FAILED closing') client._session_callback(KeeperState.AUTH_FAILED) return STOP_CONNECTING except SessionExpiredError: retry.reset() self.logger.warning('Session has expired') client._session_callback(KeeperState.EXPIRED_SESSION) except RWServerAvailable: retry.reset() self.logger.warning('Found a RW server, dropping connection') client._session_callback(KeeperState.CONNECTING) except Exception: self.logger.exception('Unhandled exception in connection loop') raise finally: if self._socket is not None: self._socket.close()def _connect(self, host, port): client = self.client self.logger.info('Connecting to %s:%s, use_ssl: %r', host, port, self.client.use_ssl) self.logger.log(BLATHER, ' Using session_id: %r session_passwd: %s', client._session_id, hexlify(client._session_passwd)) with self._socket_error_handling(): self._socket = self.handler.create_connection( address=(host, port), timeout=client._session_timeout / 1000.0, use_ssl=self.client.use_ssl, keyfile=self.client.keyfile, certfile=self.client.certfile, ca=self.client.ca, keyfile_password=self.client.keyfile_password, verify_certs=self.client.verify_certs, ) # 创建连接 self._socket.setblocking(0) # 设置为非阻塞 connect = Connect(0, client.last_zxid, client._session_timeout, client._session_id or 0, client._session_passwd, client.read_only) # 序列化连接的协议数据 connect_result, zxid = self._invoke( client._session_timeout / 1000.0 / len(client.hosts), connect) # 将该数据发送给客户端 if connect_result.time_out <= 0: raise SessionExpiredError("Session has expired") # 判断是否超时 if zxid: client.last_zxid = zxid # 设置返回值 # Load return values client._session_id = connect_result.session_id # 设置session返回的id 协议与数据等相关参数 client._protocol_version = connect_result.protocol_version negotiated_session_timeout = connect_result.time_out connect_timeout = negotiated_session_timeout / len(client.hosts) read_timeout = negotiated_session_timeout * 2.0 / 3.0 client._session_passwd = connect_result.passwd self.logger.log(BLATHER, 'Session created, session_id: %r session_passwd: %s\n' ' negotiated session timeout: %s\n' ' connect timeout: %s\n' ' read timeout: %s', client._session_id, hexlify(client._session_passwd), negotiated_session_timeout, connect_timeout, read_timeout) if connect_result.read_only: self._ro = True # Get a copy of the auth data before iterating, in case it is # changed. client_auth_data_copy = copy.copy(client.auth_data) if client.use_sasl and self.sasl_cli is None: if PURESASL_AVAILABLE: for scheme, auth in client_auth_data_copy: if scheme == 'sasl': username, password = auth.split(":") self.sasl_cli = SASLClient( host=client.sasl_server_principal, service='zookeeper', mechanism='DIGEST-MD5', username=username, password=password ) break # As described in rfc # https://tools.ietf.org/html/rfc2831#section-2.1 # sending empty challenge self._send_sasl_request(challenge=b'', timeout=connect_timeout) else: self.logger.warn('Pure-sasl library is missing while sasl' ' authentification is configured. Please' ' install pure-sasl library to connect ' 'using sasl. Now falling back ' 'connecting WITHOUT any ' 'authentification.') client.use_sasl = False self._set_connected_ro_or_rw(client) else: self._set_connected_ro_or_rw(client) for scheme, auth in client_auth_data_copy: if scheme == "digest": ap = Auth(0, scheme, auth) zxid = self._invoke( connect_timeout / 1000.0, ap, xid=AUTH_XID ) if zxid: client.last_zxid = zxid return read_timeout, connect_timeoutdef _invoke(self, timeout, request, xid=None): """A special writer used during connection establishment only""" self._submit(request, timeout, xid) # 提交数据并发送请求 zxid = None if xid: # 如果传入了设置id header, buffer, offset = self._read_header(timeout) # 检查返回的是否包含该处理的id if header.xid != xid: # 如果不相等则报错 raise RuntimeError('xids do not match, expected %r ' 'received %r', xid, header.xid) if header.zxid > 0: zxid = header.zxid if header.err: callback_exception = EXCEPTIONS[header.err]() self.logger.debug( 'Received error(xid=%s) %r', xid, callback_exception) raise callback_exception return zxid msg = self._read(4, timeout) # 获取信息 length = int_struct.unpack(msg)[0] # 获取长度 msg = self._read(length, timeout) # 读取信息 if hasattr(request, 'deserialize'): # 检查是否有解析方法 try: obj, _ = request.deserialize(msg, 0) # 解析返回的数据信息 except Exception: self.logger.exception( "Exception raised during deserialization " "of request: %s", request) # raise ConnectionDropped so connect loop will retry raise ConnectionDropped('invalid server response') self.logger.log(BLATHER, 'Read response %s', obj) return obj, zxid return zxid
该方法就是在启动的线程里面,进行的发送会话的请求,然后在执行一个IO事件循环并处理客户端与服务端数据的交互,至此事件循环机制启动,就可以处理与发送到服务端的数据,zk.start()的流程基本分析完成。
总结
zakoo作为zookeeper的客户端的Python实现,其主要的思想就是开启线程做事件循环与服务端收发数据,设计思想是比较典型的客户端软件的实现思路,本文只是概述了start的过程,zk.exists的具体的执行流程,留待后文继续分析,由于本人才疏学浅,如有错误请批评指正。
转载地址:https://blog.csdn.net/qq_33339479/article/details/90312051 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
关于作者
