Apache Mina通信框架架构与应用

2/10/2017来源:ASP.NET技巧人气:1631

Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于 TCP/ip、UDP/IP协议栈的通信框架(当然,也可以提供 java 对象的序列化服务、虚拟机管道通信服务等),Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步 IO 默认使用的是 JAVA NIO 作为底层支持)操作的编程模型。 从官网文档“MINA based application Architecture”中可以看到Mina作为一个通信层框架,在实际应用所处的位置,如图所示: apparch_small Mina位于用户应用程序和底层Java网络API(和in-VM通信)之间,我们开发基于Mina的网络应用程序,就无需关心复杂的通信细节。

应用整体架构

再看一下,Mina提供的基本组件,如图所示: mina_app_arch 也就是说,无论是客户端还是服务端,使用Mina框架实现通信的逻辑分层在概念上统一的,即包含如下三层:

I/O Service – Performs actual I/O I/O Filter Chain – Filters/Transforms bytes into desired Data Structures and vice-versa I/O Handler – Here resides the actual business logic

想要开发基于MIna的应用程序,你只需要做如下事情:

Create an I/O service – Choose from already available Services (*Acceptor) or create your own Create a Filter Chain – Choose from already existing Filters or create a custom Filter for transforming request/response Create an I/O Handler – Write business logic, on handling different messages

下面看一下使用Mina的应用程序,在服务器端和客户端的架构细节:

服务器端架构 服务器端监听指定端口上到来的请求,对这些请求经过处理后,回复响应。它也会创建并处理一个链接过来的客户会话对象(session)。服务器端架构如图所示: Server_arch 对服务器端的说明,引用官网文档,如下所示:

IOAcceptor listens on the network for incoming connections/packets For a new connection, a new session is created and all subsequent request from IP Address/Port combination are handled in that Session All packets received for a Session, traverses the Filter Chain as specified in the diagram. Filters can be used to modify the content of packets (like converting to Objects, adding/removing information etc). For converting to/from raw bytes to High Level Objects, PacketEncoder/Decoder are particularly useful Finally the packet or converted object lands in IOHandler. IOHandlers can be used to fulfill business needs.

客户端架构 客户端主要做了如下工作:

连接到服务器端 向服务器发送消息 等待服务器端响应,并处理响应

客户端架构,如图所示: clientdiagram 对客户端架构的说明,引用官网文档内容,如下所示:

Client first creates an IOConnector (MINA Construct for connecting to Socket), initiates a bind with Server Upon Connection creation, a Session is created and is associated with Connection Application/Client writes to the Session, resulting in data being sent to Server, after traversing the Filter Chain All the responses/messages received from Server are traverses the Filter Chain and lands at IOHandler, for PRocessing

应用实例开发

下面根据上面给出的架构设计描述,看一下Mina(版本2.0.7)自带的例子,如何实现一个简单的C/S通信的程序,非常容易。 服务端 首先,服务器端需要使用的组件有IoAdaptor、IoHandler、IoFilter,其中IoFilter可选. 我们基于Mina自带的例子进行了简单地修改,实现服务端IoHandler的代码如下所示:

01 package org.shirdrn.mina.server;
02  
03 import org.apache.mina.core.service.IoHandlerAdapter;
04 import org.apache.mina.core.session.IdleStatus;
05 import org.apache.mina.core.session.IoSession;
06 import org.slf4j.Logger;
07 import org.slf4j.LoggerFactory;
08  
09 public class TinyServerProtocolHandler extends IoHandlerAdapter {
10     private final static Logger LOGGER = LoggerFactory.getLogger(TinyServerProtocolHandler.class);
11     
12     @Override
13     public void sessionCreated(IoSession session) {
14         session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
15     }
16  
17     @Override
18     public void sessionClosed(IoSession session) throws Exception {
19         LOGGER.info("CLOSED");
20     }
21  
22     @Override
23     public void sessionOpened(IoSession session) throws Exception {
24         LOGGER.info("OPENED");
25     }
26  
27     @Override
28     public void sessionIdle(IoSession session, IdleStatus status) {
29         LOGGER.info("*** IDLE #" + session.getIdleCount(IdleStatus.BOTH_IDLE) + " ***");
30     }
31  
32     @Override
33     public void exceptionCaught(IoSession session, Throwable cause) {
34         session.close(true);
35     }
36  
37     @Override
38     public void messageReceived(IoSession session, Object message)
39             throws Exception {
40         LOGGER.info( "Received : " + message );
41        if(!session.isConnected()) {
42             session.close(true);
43        }
44     }
45 }

这个版本中,IoHandlerAdapter实现了IoHandler接口,里面封装了一组用于事件处理的空方法,其中包含服务端和客户端的事件。在实际应用中,客户端可以选择客户端具有的事件,服务器端选择服务器端具有的事件,然后分别对这两类事件进行处理(有重叠的事件,如连接事件、关闭事件、异常事件等)。 客户端的IoHandler的具体实现也是类似的,不过多累述。 下面看启动服务器的主方法类,代码如下所示:

01 package org.shirdrn.mina.server;
02  
03 import java.net.InetSocketAddress;
04  
05 import org.apache.mina.filter.codec.ProtocolCodecFilter;
06 import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
07 import org.apache.mina.transport.socket.SocketAcceptor;
08 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
09 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11  
12 public class TinyMinaServer {
13  
14     private final static Logger LOG = LoggerFactory.getLogger(TinyMinaServer.class);
15     /** Choose your favorite port number. */
16     private static final int PORT = 8080;
17  
18     public static void main(String[] args) throws Exception {
19         SocketAcceptor acceptor = new NioSocketAcceptor();
20         acceptor.setReuseAddress(true);
21         acceptor.getFilterChain().addLast("codec"new ProtocolCodecFilter(newTextLineCodecFactory()));
22  
23         // Bind
24         acceptor.setHandler(new TinyServerProtocolHandler());
25         acceptor.bind(new InetSocketAddress(PORT));
26         LOG.info("Listening on port " + PORT);
27  
28         LOG.info("Server started!");
29  
30         for (;;) {
31             LOG.info("R: " + acceptor.getStatistics().getReadBytesThroughput() + ", W: " + acceptor.getStatistics().getWrittenBytesThroughput());
32             Thread.sleep(3000);
33         }
34     }
35  
36 }

客户端 实现客户端IoHandler的代码如下所示:

01 package org.shirdrn.mina.client;
02  
03 import org.apache.mina.core.service.IoHandlerAdapter;
04 import org.apache.mina.core.session.IdleStatus;
05 import org.apache.mina.core.session.IoSession;
06 import org.slf4j.Logger;
07 import org.slf4j.LoggerFactory;
08  
09 public class TinyClientProtocolHandler extends IoHandlerAdapter {
10  
11     private final static Logger LOGGER = LoggerFactory
12             .getLogger(TinyClientProtocolHandler.class);
13  
14     @Override
15     public void sessionCreated(IoSession session) {
16         LOGGER.info("CLIENT::CREATED");
17     }
18  
19     @Override
20     public void sessionClosed(IoSession session) throws Exception {
21         LOGGER.info("CLIENT::CLOSED");
22     }
23  
24     @Override
25     public void sessionOpened(IoSession session) throws Exception {
26         LOGGER.info("CLIENT::OPENED");
27     }
28  
29     @Override
30     public void sessionIdle(IoSession session, IdleStatus status) {
31         LOGGER.info("CLIENT::*** IDLE #"
32                 + session.getIdleCount(IdleStatus.BOTH_IDLE) + " ***");
33     }
34  
35     @Override
36     public void exceptionCaught(IoSession session, Throwable cause) {
37         LOGGER.info("CLIENT::EXCEPTIONCAUGHT");
38         cause.printStackTrace();
39     }
40  
41     public void messageSent(IoSession session, Object message) throws Exception {
42         LOGGER.info("CLIENT::MESSAGESENT: " + message);
43     }
44 }

下面看启动客户端的主方法类,代码如下所示:

01 package org.shirdrn.mina.client;
02  
03 import java.net.InetSocketAddress;
04  
05 import org.apache.mina.core.future.ConnectFuture;
06 import org.apache.mina.filter.codec.ProtocolCodecFilter;
07 import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
08 import org.apache.mina.transport.socket.SocketConnector;
09 import org.apache.mina.transport.socket.nio.NioSocketConnector;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12  
13 public class TinyMinaClient {
14  
15     private final static Logger LOG = LoggerFactory.getLogger(TinyMinaClient.class);
16     /** Choose your favorite port number. */
17     private static final int PORT = 8080;
18  
19     public static void main(String[] args) throws Exception {
20         SocketConnector connector = new NioSocketConnector();
21  
22         // Connect
23         connector.getFilterChain().addLast("codec"new ProtocolCodecFilter(newTextLineCodecFactory()));
24         connector.setHandler(new TinyClientProtocolHandler());
25  
26         for (int i = 0; i < 10; i++) {
27             ConnectFuture future = connector.connect(new InetSocketAddress(PORT));
28             LOG.info("Connect to port " + PORT);
29             future.awaitUninterruptibly();
30             future.getSession().write(String.valueOf(i));
31             Thread.sleep(1500);
32         }
33  
34     }
35 }

我们只是发送了十个数字,每发一次间隔1500ms。 测试上述服务器端与客户端交互,首先启动服务器端,监听8080端口。 接着启动客户端,连接到服务器端8080端口,然后发送消息,服务器端接收到消息后,直接将到客户端的连接关闭掉。