
本文共 16494 字,大约阅读时间需要 54 分钟。
Tomcat是怎样初始化socket连接及怎样接受socket连接:
前面一篇有讲到StandardService的start还有两部分,因为这两部分都与Socket连接相关,所以放到下篇:
一、 mapperListener.start();
可以看到其也有Lifecycle接口,所以直接看其startInternal方法:
public void startInternal() throws LifecycleException { setState(LifecycleState.STARTING); Engine engine = service.getContainer(); if (engine == null) { return; } findDefaultHost(); addListeners(engine); Container[] conHosts = engine.findChildren(); for (Container conHost : conHosts) { Host host = (Host) conHost; if (!LifecycleState.NEW.equals(host.getState())) { // Registering the host will register the context and wrappers registerHost(host); } }}
这里的findDefaultHost方法就是将StandardEngine的defaultHost设置到这个MapperListener类的Mapper对象中:
private void findDefaultHost() { Engine engine = service.getContainer(); String defaultHost = engine.getDefaultHost(); .......... if (found) { mapper.setDefaultHostName(defaultHost); } ............}
对应的就是server.xml配置文件中的这里:
然后addListeners(engine)方法:
private void addListeners(Container container) { container.addContainerListener(this); container.addLifecycleListener(this); for (Container child : container.findChildren()) { addListeners(child); }}
这里的最开始的容器是StandardEngine,然后findChildren,所以这里会一直往下往下设置,Engine->Host->Context->Wrapper。
再然后就是下面的registerHost(host)方法:
private void registerHost(Host host) { ........... registerContext((Context) container); .........}
private void registerContext(Context context) { String contextPath = context.getPath(); ....... Host host = (Host)context.getParent(); WebResourceRoot resources = context.getResources(); String[] welcomeFiles = context.findWelcomeFiles(); Listwrappers = new ArrayList<>(); .......... prepareWrapperMappingInfo(context, (Wrapper) container, wrappers); .......... mapper.addContextVersion(host.getName(), host, contextPath, context.getWebappVersion(), context, welcomeFiles, resources, wrappers);}
这个还会调prepareWrapperMappingInfo方法。关于mapperListener.start()的这个关键逻辑就是在这里,这里要处理的内容就是将Engine、Host、context、wrapper(用于查到对应Servlet映射处理类)容器的内容设置到MapperListener类中,关键的是设置到其成员变量Mapper类中:
Mapper类中有些属性值:
同时关键的地方要知道,MapperListener是StandardService的成员变量:
并且StandardService也有一个Mapper类的成员变量:
再来看下其的构造方法:
public MapperListener(Service service) { this.service = service; this.mapper = service.getMapper();}
所以这个MapperListener类的Mapper最终也是有赋值到StandardService的Mapper。
二、Connector的start ,Connector也有实现Lifecycle接口,所以看其startInternal方法:
@Overrideprotected void startInternal() throws LifecycleException { ....... setState(LifecycleState.STARTING); try { protocolHandler.start(); ........}
前面有提到这里的连接器是Http11NioProtocol:
可以看到其基础实现是AbstractProtocol:
看其的start方法:
public void start() throws Exception { .......... endpoint.start(); // Start async timeout thread asyncTimeout = new AsyncTimeout(); Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout"); int priority = endpoint.getThreadPriority(); if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { priority = Thread.NORM_PRIORITY; } timeoutThread.setPriority(priority); timeoutThread.setDaemon(true); timeoutThread.start();}
这个endpoint就是NioEndpoint,看其startInternal方法:
@Overridepublic void startInternal() throws Exception { .......... // Create worker collection if ( getExecutor() == null ) { createExecutor(); } // Start poller threads pollers = new Poller[getPollerThreadCount()]; for (int i=0; i
public Poller() throws IOException { this.selector = Selector.open(); }
一、 先看createExecutor方法(我们并没有在server.xml配置,所以为空):
public void createExecutor() { internalExecutor = true; TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor);}
可以看到这里的执行器是一个线程池,其中生产线程的工厂是TaskThreadFactory,现在看下TaskThreadFactory是生产一个怎样的线程:
@Overridepublic Thread newThread(Runnable r) { TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement()); t.setDaemon(daemon); t.setPriority(threadPriority); ........ return t;}
可以看到其是将一个Runnable接口变为TaskThread线程对象。
二、现在看下Poller类(轮询器),其是NioEndpoint的内部类:
,
其实现了Runable接口,看其run接口:
@Overridepublic void run() { .......... hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } } ........... //either we timed out o we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events()); Iteratoriterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); processKey(sk, attachment); } ..........}
看下event方法:
private final SynchronizedQueueevents = new SynchronizedQueue<>(); public boolean events() { boolean result = false; PollerEvent pe = null; for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) { result = true; try { pe.run(); pe.reset(); .......... } return result;}
其run方法:@Overridepublic void run() { if (interestOps == OP_REGISTER) { try { socket.getIOChannel().register( socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper); } catch (Exception x) { log.error(sm.getString("endpoint.nio.registerFail"), x); } } else { final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { if (key == null) { socket.socketWrapper.getEndpoint().countDownConnection(); ((NioSocketWrapper) socket.socketWrapper).closed = true; } else { final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment(); if (socketWrapper != null) { //we are registering the key to start with, reset the fairness counter. int ops = key.interestOps() | interestOps; socketWrapper.interestOps(ops); key.interestOps(ops); } ............. }}
可以看到这里PollerEvent类实现Runnable接口,同时其有三个成员变量:NioChannel(可以简单看为是像ServerSocketChannel/SocketChannel之类的类)、interestOps(表面现在是那种时间、Nio的SelectionKey有四种状态)、NioSocketWrapper(与SelectKey绑定的,其实现SocketWrapperBase):
这个方法,如果当前EventPoller的interestOps是OP_REGISTER,则将SocketChannel注册为SelectionKey.OP_READ。如果不是,则将Selector查询到的SelectionKey的interestOps同步到SocketWrapper,并将当前EventPoller的interestOps也同步到对应的SelectionKey中。所以这里的先后顺序是OP_REGISTER->OP_READ,我们主要关注OP_READ。
我们再来看下这个socket.getIOChannel().register()方法:
public final SelectionKey register(Selector sel, int ops, Object att).....{ ......... SelectionKey k = findKey(sel); if (k != null) { k.interestOps(ops); k.attach(att); } ........}
这里可以知道socketWrapper(NioSocketWrapper),就是在这个时候与设置到SelectionKey的。
那这里的events是在什么时候填充的?同时这里通过selector去轮询查询SelectKey,之后再看processKey(sk, attachment),这个processKey方法到后面讲,因为还缺一些东西,不好理解。
EventPoller的的run方法结束。
三、接着我们看startAcceptorThreads方法:
protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new ArrayList<>(count); for (int i = 0; i < count; i++) { Acceptor acceptor = new Acceptor<>(this); String threadName = getName() + "-Acceptor-" + i; acceptor.setThreadName(threadName); acceptors.add(acceptor); Thread t = new Thread(acceptor, threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); }}
这个Acceptor(接收器,所以其应该是是接受Socket请求的),与前面Poller类似,看其run方法:
@Overridepublic void run() { while (endpoint.isRunning()) { ......... state = AcceptorState.RUNNING; .......... U socket = null; ........ socket = endpoint.serverSocketAccept(); .......... if (endpoint.isRunning() && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } ............. } state = AcceptorState.ENDED;}
这里endpoint就是NioEndpoint,看下endpoint.serverSocketAccept():
@Overrideprotected SocketChannel serverSocketAccept() throws Exception { return serverSock.accept();}
这里的serverSocket就是ServerSocketChannel,所以就是这个NioEndPoint的内部类Acceptor来接受socket请求,然后再来看下
endpoint.setSocketOptions(socket)方法:
@Overrideprotected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //disable blocking, APR style, we are gonna be polling it socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); NioChannel channel = nioChannels.pop(); //其一个先创建、缓存、到时候能反复利用的作用: (SynchronizedStacknioChannels) if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler(.....); if (isSSLEnabled()) { //加密例如Https使用 channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); //可以知道这里有将这个Socket设置到这个 NioChannel } } else { channel.setIOChannel(socket); channel.reset(); } getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("",t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true;}
看下其 getPoller0().register(channel)方法:
public void register(final NioChannel socket) { socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); PollerEvent r = eventCache.pop(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r);}
这里先是将NioChannel 设置到 NioSocketWrapper(就是接受到的socket、serverSocket.accept()),之后可以将这个NioSocketWrapper看做是接收到的那个SocketChannel 了。然后是PollerEvent r = eventCache.pop(),我们现在还没有添加所以是空,则 new PollerEvent(socket,ka,OP_REGISTER),这里就与前面那个OP_REGISTER->OP-READl联系起来了,如果有就是将这个EventPoller再设置一次, r.reset(socket,ka,OP_REGISTER):
public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) { reset(ch, w, intOps);}public void reset(NioChannel ch, NioSocketWrapper w, int intOps) { socket = ch; interestOps = intOps; socketWrapper = w;}
现在看下addEvent(r)方法(就是在这里将event添加到events的):
private void addEvent(PollerEvent event) { events.offer(event); if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();}
这里就是将这个event添加到前面那个Poller类需要处理的events了,到这里两个实现Runnable接口的NioEndAppoint类的内部类Poller、Accptor,都已经通过:while (endpoint.isPaused() && endpoint.isRunning())、与while(true)在循环运行了。现在我们将这两个结合到一起来将前面的processKey(sk, attachment)方法:
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); processKey(sk, attachment); }
这里通过Selector获取SelectionKey,然后通过SelectionKey获取NioSocketWrapper(即socketChanel)来调用processKey方法:
所以这里整个的这个流程是:
1、先在Accptor线程中通过serverSocketChannel.accpet去接收socketChannel,然后将这个SocketChannel转换为:EventPllor放放到队列events中。然后Poller线程获取到这个socketChannel,将其由OP_REGISTER,改为OP_READ可读的,之后再通过selector获取SocketionKey,再通过processKey方法处理这个SelectionKey:
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { ........... if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write if (sk.isReadable()) { if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } ......... }
,一般并不会调用attachment.getSendfileData(),所以应该是sk.isReadable(),再调用processSocket(attachment, SocketEvent.OPEN_READ, true):
SynchronizedStack<SocketProcessorBase<S>> processorCache
public boolean processSocket(SocketWrapperBasesocketWrapper, SocketEvent event, boolean dispatch) { ........ SocketProcessorBasesc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } Executor executor = getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); ...... return true;}
这里的createSocketProcessor方法创建的是SocketProcessor
所以SocketProcessor也有实现Runnable接口:
而这里的Executor executor = getExecutor()就是前面创建的ThreadPoolExecutor,其创建的线程对象是TaskThread:
其run方法:
@Overridepublic final void run() { synchronized (socketWrapper) { if (socketWrapper.isClosed()) { return; } doRun(); }}
@Override protected void doRun() { NioChannel socket = socketWrapper.getSocket(); SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); ............ if (event == null) { state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ); } else { state = getHandler().process(socketWrapper, event); } ......}
这里的getHandler是ConnectionHandler,其的初始化:
public AbstractHttp11Protocol(AbstractEndpointendpoint) { super(endpoint); setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT); ConnectionHandlercHandler = new ConnectionHandler<>(this); setHandler(cHandler); getEndpoint().setHandler(cHandler);}
process方法:
@Overridepublic SocketState process(SocketWrapperBasewrapper, SocketEvent status) { ............ S socket = wrapper.getSocket(); .......... if (processor == null) { processor = getProtocol().createProcessor(); register(processor); } ........ SocketState state = SocketState.CLOSED; do { state = processor.process(wrapper, status); ..........}
这里的getProtocol().createProcessor():
Overrideprotected Processor createProcessor() { Http11Processor processor = new Http11Processor(this, adapter); return processor;}
注意现在的status是SocketEvent.OPEN_READ,之后是怎样处理的可以看下前面写的文章。
自此整个tomcat整体的启动,以及接收请求过程大体结果已经梳理,启动、连接过程中其他的一些比较重要的细节之后再理解、梳理下。
补充:
1、我们请求一个servlet的url:
2、之后其就通过executor去创建TaskThread线程去运行 :
发表评论
最新留言
关于作者
