关于做网站的了解点,家庭电脑做网站,wordpress 迁移 404,营销型网站开发一、mina总体框架与案例#xff1a; 1.总体结构图#xff1a; 简述#xff1a;以上是一张来自网上比較经典的图#xff0c;总体上揭示了mina的结构#xff0c;当中IoService包括clientIoConnector和服务端IoAcceptor两部分。即不管是client还是服务端都是这个结构。IoServ… 一、mina总体框架与案例 1.总体结构图 简述以上是一张来自网上比較经典的图总体上揭示了mina的结构当中IoService包括clientIoConnector和服务端IoAcceptor两部分。即不管是client还是服务端都是这个结构。IoService封装了网络传输层TCP和UDP而IoFilterChain中mina自带的filter做了一些主要的操作之外支持扩展。经过FilterChain之后终于调用IoHandlerIoHandler是详细实现业务逻辑的处理接口详细的业务实现可扩展。 2.一个可执行的案例案例来自网上转载后试验 Client.java: import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.Random;import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.nio.NioSocketConnector;public class Client extends IoHandlerAdapter {private Random random new Random(System.currentTimeMillis());public Client() {IoConnector connector new NioSocketConnector();connector.getFilterChain().addLast(text,new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName(Server.ENCODE))));connector.setHandler(this);ConnectFuture future connector.connect(new InetSocketAddress(127.0.0.1, Server.PORT));future.awaitUninterruptibly();future.addListener(new IoFutureListenerConnectFuture() {Overridepublic void operationComplete(ConnectFuture future) {IoSession session future.getSession();while (!session.isClosing()) {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}String message 你好。我roll了 random.nextInt(100) 点.;session.write(message);}}});connector.dispose();}Overridepublic void messageReceived(IoSession session, Object message)throws Exception {System.out.println(批复: message.toString());}Overridepublic void messageSent(IoSession session, Object message) throws Exception {System.out.println(报告: message.toString());}Overridepublic void exceptionCaught(IoSession session, Throwable cause)throws Exception {cause.printStackTrace();session.close(true);}public static void main(String[] args) {for (int i 0; i 10; i) {new Client();}}
}ServerHandler.java: import java.net.InetSocketAddress;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;public class ServerHandler extends IoHandlerAdapter {Overridepublic void exceptionCaught(IoSession session, Throwable cause)throws Exception {cause.printStackTrace();session.close(false);}public void messageReceived(IoSession session, Object message)throws Exception {String s message.toString();System.out.println(收到请求 s);if (s ! null) {int i getPoint(s);if (session.isConnected()) {if (i 95) {session.write(运气不错你能够出去了.);session.close(false);return;}Integer count (Integer) session.getAttribute(Server.KEY);count;session.setAttribute(Server.KEY, count);session.write(抱歉。你运气太差了第 count 次请求未被通过。继续在小黑屋呆着吧.);} else {session.close(true);}}}Overridepublic void messageSent(IoSession session, Object message) throws Exception {System.out.println(发给client message.toString());}Overridepublic void sessionClosed(IoSession session) throws Exception {long l session.getCreationTime();System.out.println(来自 getInfo(session) 的会话已经关闭它已经存活了 (System.currentTimeMillis() - 1) 毫秒);}Overridepublic void sessionCreated(IoSession session) throws Exception {System.out.println(给 getInfo(session) 创建了一个会话);}Overridepublic void sessionIdle(IoSession session, IdleStatus status)throws Exception {System.out.println(来自 getInfo(session) 的会话闲置状态为 status.toString());}public void sessionOpened(IoSession session) throws Exception {session.setAttribute(Server.KEY, 0);System.out.println(和 getInfo(session) 的会话已经打开.);}public String getInfo(IoSession session) {if (session null) {return null;}InetSocketAddress address (InetSocketAddress) session.getRemoteAddress();int port address.getPort();String ip address.getAddress().getHostAddress();return ip : port;}public int getPoint(String s) {if (s null) {return -1;}Pattern p Pattern.compile(^[\u0041-\uFFFF,]*(\\d).*$);Matcher m p.matcher(s);if (m.matches()) {return Integer.valueOf(m.group(1));}return 0;}
}Server.java: import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;public class Server {public static final int PORT 2534;public static String ENCODE UTF-8;public static final String KEY roll;public static void main(String[] args){ SocketAcceptor acceptor new NioSocketAcceptor();acceptor.getFilterChain().addLast(text,new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName(ENCODE))));acceptor.setHandler(new ServerHandler());try {acceptor.bind(new InetSocketAddress(PORT));System.out.println(游戏開始你想出去吗来碰碰运气吧);} catch (IOException e) {e.printStackTrace();acceptor.dispose();}}
}本案例依赖的jar例如以下图 简述以上是依赖mina实现的一个可执行的案例就不多说了结合总体的结构图和案例实现能够看出mina框架还是非常轻量级的。以下分析一下mina的源代码结构和一些时序流程。 二、mina 核心源代码分析 1.mina的启动时序(结合上面的案例) 简述SocketAcceptor作为服务端对外启动接口类在bind网络地址的时候会触发服务端一系列服务的启动从调用链能够清晰找到相应的源代码阅读。当中AbstractPollingIoAcceptor是一个核心类它会调用自身的startupAcceptor方法来启动一个存放Acceptor的线程池用来处理client传输过来的请求。 AbstractPollingIoAcceptor 类的 startupAcceptor 方法例如以下 /*** This method is called by the doBind() and doUnbind()* methods. If the acceptor is null, the acceptor object will* be created and kicked off by the executor. If the acceptor* object is null, probably already created and this class* is now working, then nothing will happen and the method* will just return.*/
private void startupAcceptor() throws InterruptedException {// If the acceptor is not ready, clear the queues// TODO : they should already be clean : do we have to do that ? if (!selectable) { registerQueue.clear(); cancelQueue.clear(); } // start the acceptor if not already started Acceptor acceptor acceptorRef.get(); //这里仅仅会启动一个worker if (acceptor null) { lock.acquire(); acceptor new Acceptor(); if (acceptorRef.compareAndSet(null, acceptor)) { executeWorker(acceptor); } else { lock.release(); } } } 上面调用到 AbstractIoService 的 executeWorker方法例如以下 protected final void executeWorker(Runnable worker) {executeWorker(worker, null);
}protected final void executeWorker(Runnable worker, String suffix) {String actualThreadName threadName;if (suffix ! null) {actualThreadName actualThreadName - suffix;}executor.execute(new NamePreservingRunnable(worker, actualThreadName));
}简述有类AbstractPollingIoAcceptor 的 startupAcceptor方法上文能够看到一个SocketAcceptor仅仅启动了一个Worker线程即代码中的Acceptor对象而且把他加到线程池中。反过来讲也能够看出AbstractIoService维护了Worker的线程池。ps这个Worker就是服务端处理请求的线程。 2.Mina处理client链接的过程启动后 概述从1中的启动时序能够看到启动过程通过创建SocketAcceptor将有类AbstractPollingIoAcceptor的内部类Acceptor放到了 AbstractIoService的线程池里面而这个Acceptor就是处理client网络请求的worker。而以下这个时序就是线程池中每一个worker处理client网络请求的时序流程。 处理请求时序 简述worker线程Acceptor的run方法中会调用NioSocketAcceptor或者AprSocketAccetpor的select方法。 ps:APRApache Protable Runtime LibraryApache可移植执行库是能够提供非常好的可拓展性、性能以及对底层操作系统一致性操作的技术说白了就是apache实现的一套标准的通讯接口。 AprSocketAcceptor先不做深入了解主要了解下NioSocketAcceptorNioSocketAcceptor顾名思义它调用了java NIO的API实现了NIO的网络连接处理过程。 AbstractPolling$Acceptor 的run方法的核心代码例如以下 private class Acceptor implements Runnable {public void run() {assert (acceptorRef.get() this);int nHandles 0;// Release the locklock.release();while (selectable) {try {// Detect if we have some keys ready to be processed// The select() will be woke up if some new connection// have occurred, or if the selector has been explicitly// woke up//调用了NioSocketAcceptor的select方法获取了selectKeyint selected select();// this actually sets the selector to OP_ACCEPT,// and binds to the port on which this class will// listen onnHandles registerHandles();// Now, if the number of registred handles is 0, we can// quit the loop: we dont have any socket listening// for incoming connection.if (nHandles 0) {acceptorRef.set(null);if (registerQueue.isEmpty() cancelQueue.isEmpty()) {assert (acceptorRef.get() ! this);break;}if (!acceptorRef.compareAndSet(null, this)) {assert (acceptorRef.get() ! this);break;}assert (acceptorRef.get() this);}if (selected 0) {// We have some connection request, lets process// them here.processHandles(selectedHandles());}// check to see if any cancellation request has been made.nHandles - unregisterHandles();} catch (ClosedSelectorException cse) {// If the selector has been closed, we can exit the loopbreak;} catch (Throwable e) {ExceptionMonitor.getInstance().exceptionCaught(e);try {Thread.sleep(1000);} catch (InterruptedException e1) {ExceptionMonitor.getInstance().exceptionCaught(e1);}}}// Cleanup all the processors, and shutdown the acceptor.if (selectable isDisposing()) {selectable false;try {if (createdProcessor) {processor.dispose();}} finally {try {synchronized (disposalLock) {if (isDisposing()) {destroy();}}} catch (Exception e) {ExceptionMonitor.getInstance().exceptionCaught(e);} finally {disposalFuture.setDone();}}}}简述从上面的代码中能够看出一个典型的网络请求处理的程序在循环中拿到处理的请求后就调用AbstractPollingIoAcceptor的processHandles对网络请求做处理。代码例如以下 /*** This method will process new sessions for the Worker class. All* keys that have had their status updates as per the Selector.selectedKeys()* method will be processed here. Only keys that are ready to accept* connections are handled here.* p/* Session objects are created by making new instances of SocketSessionImpl* and passing the session object to the SocketIoProcessor class.*/SuppressWarnings(unchecked)private void processHandles(IteratorH handles) throws Exception {while (handles.hasNext()) {H handle handles.next();handles.remove();// Associates a new created connection to a processor,// and get back a session//这里调用了NioSocketAcceptor的accept方法S session accept(processor, handle);if (session null) {continue;}initSession(session, null, null);// add the session to the SocketIoProcessor// 这步处理add操作会触发对client请求的异步处理。 session.getProcessor().add(session); } } NioSocketAcceptor的accept方法new了一个包装Process处理线程的session实例:而且在调用session.getProcessor().add(session)的操作的时候触发了对client请求的异步处理。 /*** {inheritDoc}*/
Override
protected NioSession accept(IoProcessorNioSession processor, ServerSocketChannel handle) throws Exception {SelectionKey key handle.keyFor(selector);if ((key null) || (!key.isValid()) || (!key.isAcceptable())) {return null;}// accept the connection from the clientSocketChannel ch handle.accept();if (ch null) {return null;}return new NioSocketSession(this, processor, ch);
}再看上面时序图有一步是AbstractPollingIoProcessor调用了startupProcessor方法。代码例如以下 /*** Starts the inner Processor, asking the executor to pick a thread in its* pool. The Runnable will be renamed*/
private void startupProcessor() {Processor processor processorRef.get();if (processor null) {processor new Processor();if (processorRef.compareAndSet(null, processor)) {executor.execute(new NamePreservingRunnable(processor, threadName));}}// Just stop the select() and start it again, so that the processor// can be activated immediately.wakeup();
}简述这个startupProcessor方法在调用 session里包装的processor的add方法是触发了将处理client请求的processor放入异步处理的线程池中。兴许详细Processor怎么处理client请求的流程涉及到FilterChain的过滤。以及Adapter的调用。用来处理业务逻辑。详细的异步处理时序看以下的时序图 简述这个时序就是将待处理的client链接通过NIO的形式接受请求并将请求包装成Processor的形式放到处理的线程池中异步的处理。在异步的处理过程中则调用了Processor的run方法详细的filterchain的调用和业务Adapter的调用也是在这一步得到处理。值得注意的是。Handler的调用是封装在DefaultFilterchain的内部类诶TairFilter中触发调用的。Processor的run方法代码例如以下 private class Processor implements Runnable {public void run() {assert (processorRef.get() this);int nSessions 0;lastIdleCheckTime System.currentTimeMillis();for (;;) {try {// This select has a timeout so that we can manage// idle session when we get out of the select every// second. (note : this is a hack to avoid creating// a dedicated thread).long t0 System.currentTimeMillis();//调用了NioProcessorint selected select(SELECT_TIMEOUT);long t1 System.currentTimeMillis();long delta (t1 - t0);if ((selected 0) !wakeupCalled.get() (delta 100)) {// Last chance : the select() may have been// interrupted because we have had an closed channel.if (isBrokenConnection()) {LOG.warn(Broken connection);// we can reselect immediately// set back the flag to falsewakeupCalled.getAndSet(false);continue;} else {LOG.warn(Create a new selector. Selected is 0, delta (t1 - t0));// Ok, we are hit by the nasty(讨厌的) epoll// spinning.// Basically, there is a race condition// which causes a closing file descriptor not to be// considered as available as a selected channel, but// it stopped the select. The next time we will// call select(), it will exit immediately for the same// reason, and do so forever, consuming 100%// CPU.// We have to destroy the selector, and// register all the socket on a new one.registerNewSelector();}// Set back the flag to falsewakeupCalled.getAndSet(false);// and continue the loopcontinue;}// Manage newly created session firstnSessions handleNewSessions();updateTrafficMask();// Now, if we have had some incoming or outgoing events,// deal with themif (selected 0) {//LOG.debug(Processing ...); // This log hurts one of the MDCFilter test...//触发了详细的调用逻辑process();}// Write the pending requestslong currentTime System.currentTimeMillis();flush(currentTime);// And manage removed sessionsnSessions - removeSessions();// Last, not least, send Idle events to the idle sessionsnotifyIdleSessions(currentTime);// Get a chance to exit the infinite loop if there are no// more sessions on this Processorif (nSessions 0) {processorRef.set(null);if (newSessions.isEmpty() isSelectorEmpty()) {// newSessions.add() precedes startupProcessorassert (processorRef.get() ! this);break;}assert (processorRef.get() ! this);if (!processorRef.compareAndSet(null, this)) {// startupProcessor won race, so must exit processorassert (processorRef.get() ! this);break;}assert (processorRef.get() this);}// Disconnect all sessions immediately if disposal has been// requested so that we exit this loop eventually.if (isDisposing()) {for (IteratorS i allSessions(); i.hasNext();) {scheduleRemove(i.next());}wakeup();}} catch (ClosedSelectorException cse) {// If the selector has been closed, we can exit the loopbreak;} catch (Throwable t) {ExceptionMonitor.getInstance().exceptionCaught(t);try {Thread.sleep(1000);} catch (InterruptedException e1) {ExceptionMonitor.getInstance().exceptionCaught(e1);}}}try {synchronized (disposalLock) {if (disposing) {doDispose();}}} catch (Throwable t) {ExceptionMonitor.getInstance().exceptionCaught(t);} finally {disposalFuture.setValue(true);}}
}简述这么一坨代码能够看出这个处理器也调用了java的Nio API是一个NIO模型。当中select和process方法各自是从session拿到要处理的请求并进行处理。而详细的Processor实例是NioProcessor。从加入凝视的代码中有一步调用了自身的process方法这步调用触发了详细业务逻辑的调用。能够结合代码和时序图看下。在Process方法中会调用readersession或wirtesession方法然后调用fireMessageReceived方法这种方法又调用了callNextMessageReceived方法致使触发了整个FilterChain和Adapter的调用。read方法的核心代码例如以下 private void read(S session) {IoSessionConfig config session.getConfig();int bufferSize config.getReadBufferSize();IoBuffer buf IoBuffer.allocate(bufferSize);final boolean hasFragmentation session.getTransportMetadata().hasFragmentation();try {int readBytes 0;int ret;try {if (hasFragmentation) {while ((ret read(session, buf)) 0) {readBytes ret;if (!buf.hasRemaining()) {break;}}} else {ret read(session, buf);if (ret 0) {readBytes ret;}}} finally {buf.flip();}if (readBytes 0) {IoFilterChain filterChain session.getFilterChain();filterChain.fireMessageReceived(buf);buf null;if (hasFragmentation) {if (readBytes 1 config.getReadBufferSize()) {session.decreaseReadBufferSize();} else if (readBytes config.getReadBufferSize()) {session.increaseReadBufferSize();}}}if (ret 0) {scheduleRemove(session);}} catch (Throwable e) {if (e instanceof IOException) {if (!(e instanceof PortUnreachableException)|| !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())|| ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {scheduleRemove(session);}}IoFilterChain filterChain session.getFilterChain();filterChain.fireExceptionCaught(e);}
}从这段代码并结合上面的时序图能够看出来触发整个FilterChain的调用以及IoHandler的调用。 三、类结构分析 參考第一部分的总体结构图画一下每一个部分大致的类结构图 简述 从类继承结构图来看能够看到在IOService体系下存在IoConnector和IoAcceptor两个大的分支体系。IoConnector是做为client的时候使用IoAcceptor是作为服务端的时候使用。实际上在Mina中有三种worker线程各自是Acceptor、Connector 和 I/O processor。 (1) Acceptor Thread 作为server端的链接线程实现了IoService接口。线程的数量就是创建SocketAcceptor的数量。 (2) Connector Thread 作为client请求建立的链接线程实现了IoService接口维持了一个和服务端Acceptor的一个链接线程的数量就是创建SocketConnector的数量。 (3) I/O processorThread 作为I/O真正处理的线程存在于server端和client。线程的数量是能够配置的默认是CPU个数1。 上面那个图仅仅是表述了IoService类体系而I/O Processor的类体系并不在当中见下图 简述IOProcessor主要分为两种。各自是AprIOProcessor和NioProcessorApr的解释见上文ps:APRApache Protable Runtime LibraryApache可移植执行库。NioProcessor也是Nio的一种实现用来处理client连接过来的请求。在Processor中会调用到 FilterChain 和 Handler见上文代码。先看下FilterChain的类结构图例如以下 Filter 和 Handler的类结构例如以下 Handler的类结构例如以下 Mina的session类结构图例如以下 Mina的Buffer的类结构图例如以下 版权声明本文博主原创文章博客未经同意不得转载。