免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 5062 | 回复: 6
打印 上一主题 下一主题

Mina 非阻塞通讯 [复制链接]

论坛徽章:
0
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2009-03-27 19:16 |只看该作者 |倒序浏览
1NIO 设计背后的基石:反应器模式,用于事件多路分离和分派的体系结构模式。
反应器模式的核心功能如下:
将事件多路分用
将事件分派到各自相应的事件处理程序
NIO 的非阻塞 I/O 机制是围绕 选择器和 通道构建的。 Channel 类表示服务器和客户机之间的
一种通信机制。Selector 类是 Channel 的多路复用器。 Selector 类将传入客户机请求多路分
用并将它们分派到各自的请求处理程序。
通道(Channel 类):表示服务器和客户机之间的一种通信机制。
选择器(Selector类):是 Channel 的多路复用器。Selector 类将传入的客户机请求多路分用并将它们
分派到各自的请求处理程序。
简单的来说:
NIO是一个基于事件的IO架构,最基本的思想就是:有事件我通知你,你再去做你的事情.
而且NIO的主线程只有一个,不像传统的模型,需要多个线程以应对客户端请求,也减轻
了JVM的工作量。
当Channel注册至Selector以后,经典的调用方法如下:
        while (somecondition) {
            int n = selector.select(TIMEOUT);
            if (n == 0)
                continue;
            for (Iterator iter = selector.selectedKeys().iterator(); iter
                    .hasNext() {
                if (key.isAcceptable())
                    doAcceptable(key);
                if (key.isConnectable())
                    doConnectable(key);
                if (key.isValid() && key.isReadable())
                    doReadable(key);
                if (key.isValid() && key.isWritable())
                    doWritable(key);
                iter.remove();
            }
        }
nio中取得事件通知,就是在selector的select事件中完成的。在selector事件时有一个线程
向操作系统询问,selector中注册的Channel&&SelectionKey的键值对的各种事件是否有发生,
如果有则添加到selector的selectedKeys属性Set中去,并返回本次有多少个感兴趣的事情发生。
如果发现这个值>0,表示有事件发生,马上迭代selectedKeys中的SelectionKey,
根据Key中的表示的事件,来做相应的处理。
实际上,这段说明表明了异步socket的核心,即异步socket不过是将多个socket的调度(或者还有他们的线程调度)
全部交给操作系统自己去完成,异步的核心Selector,不过是将这些调度收集、分发而已。

评分

参与人数 1可用积分 +10 收起 理由
starxing + 10 我很赞同

查看全部评分

论坛徽章:
0
2 [报告]
发表于 2009-03-27 19:17 |只看该作者
附上一个简单的例子

采用MINA进行socket开发,一般步骤如下:
1:
server:
IoAcceptor acceptor = new SocketAcceptor(); //建立client接收器
or client:
SocketConnector connector = new SocketConnector();  //建立一个连接器
2:server的属性配置:
        SocketAcceptorConfig cfg = new SocketAcceptorConfig();
        cfg.setReuseAddress(true);
        cfg.getFilterChain().addLast(
                    "codec",
                    new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) ); //对象序列化 codec factory
        cfg.getFilterChain().addLast( "logger", new LoggingFilter() );
3:绑定address和business logic
server:
        acceptor.bind(
                new InetSocketAddress( SERVER_PORT ),
                new ServerSessionHandler( ), cfg ); // 绑定address和handler
client:
        connector.connect(new InetSocketAddress( HOSTNAME, PORT ),
                        new ClientSessionHandler(msg), cfg );
下面的这个简单的example演示client和server传递object的过程:
Message.java
public class Message implements Serializable {
    private int type;
    private int status;
    private String msgBody;
   
    public Message(int type, int status, String msgBody)
    {
        this.type = type;
        this.status = status;
        this.msgBody = msgBody;
    }
    public String getMsgBody() {
        return msgBody;
    }
    public void setMsgBody(String msgBody) {
        this.msgBody = msgBody;
    }
    public int getStatus() {
        return status;
    }
    public void setStatus(int status) {
        this.status = status;
    }
    public int getType() {
        return type;
    }
    public void setType(int type) {
        this.type = type;
    }
}
Client.java
public class Client
{
    private static final String HOSTNAME = "localhost";
    private static final int PORT = 8080;
    private static final int CONNECT_TIMEOUT = 30; // seconds

    public static void main( String[] args ) throws Throwable
    {
        SocketConnector connector = new SocketConnector();        
        // Configure the service.
        SocketConnectorConfig cfg = new SocketConnectorConfig();
        cfg.setConnectTimeout( CONNECT_TIMEOUT );
          cfg.getFilterChain().addLast(
                    "codec",
                    new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) );
        cfg.getFilterChain().addLast( "logger", new LoggingFilter() );
        
        IoSession session;
        Message msg = new Message(0,1,"hello");
        connector.connect(new InetSocketAddress( HOSTNAME, PORT ),
                        new ClientSessionHandler(msg), cfg );
    }
}
ClientSessionHandler.java
public class ClientSessionHandler extends IoHandlerAdapter
{
    private Object msg;
   
    public ClientSessionHandler(Object msg)
    {
        this.msg = msg;
    }

    public void sessionOpened( IoSession session )
    {
        session.write(this.msg);
    }
    public void messageReceived( IoSession session, Object message )
    {
        System.out.println("in messageReceived!");
        Message rm = (Message ) message;        
        SessionLog.debug(session, rm.getMsgBody());
        System.out.println("message is: " + rm.getMsgBody());
        session.write(rm);
    }
    public void exceptionCaught( IoSession session, Throwable cause )
    {
        session.close();
    }
}
Server.java
public class Server
{
    private static final int SERVER_PORT = 8080;
    public static void main( String[] args ) throws Throwable
    {
        IoAcceptor acceptor = new SocketAcceptor();
        
        // Prepare the service configuration.
        SocketAcceptorConfig cfg = new SocketAcceptorConfig();
        cfg.setReuseAddress( true );
        cfg.getFilterChain().addLast(
                    "codec",
                    new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) );
        cfg.getFilterChain().addLast( "logger", new LoggingFilter() );
        acceptor.bind(
                new InetSocketAddress( SERVER_PORT ),
                new ServerSessionHandler( ), cfg );
        System.out.println( "The server Listening on port " + SERVER_PORT );
    }
}
ServerSessionHandler.java
public class ServerSessionHandler extends IoHandlerAdapter
{
    public void sessionOpened( IoSession session )
    {
        // set idle time to 60 seconds
        session.setIdleTime( IdleStatus.BOTH_IDLE, 60 );
        session.setAttribute("times",new Integer(0));
    }
    public void messageReceived( IoSession session, Object message )
    {
        System.out.println("in messageReceived");
        int times = ((Integer)(session.getAttribute("times"))).intValue();
        System.out.println("tiems = " + times);
        // communicate 30 times,then close the session.
        if (times < 30)
        {
            times++;
            session.setAttribute("times", new Integer(times));           
         Message msg;
         msg = (Message) message;
         msg.setMsgBody("in server side: " + msg.getMsgBody());
         System.out.println("begin send msg: " + msg.getMsgBody());
         session.write(msg);
        }
        else
        {
            session.close();
        }
    }
    public void sessionIdle( IoSession session, IdleStatus status )
    {
        SessionLog.info( session, "Disconnecting the idle." );
        // disconnect an idle client
        session.close();
    }
    public void exceptionCaught( IoSession session, Throwable cause )
    {
        // close the connection on exceptional situation
        session.close();
    }
}

论坛徽章:
0
3 [报告]
发表于 2009-03-27 19:17 |只看该作者
SSL加密传输  加上SSlFilter过滤器
Client  端

        connectorSSLFilter = new SSLFilter(BogusSSLContextFactory
                                        .getInstance(true));
                        connectorSSLFilter.setUseClientMode(true);
                        connector.getFilterChain().addLast("sslFilter", connectorSSLFilter);

Server


SSLFilter sslFilter = new SSLFilter(BogusSSLContextFactory
                                .getInstance(true));
                chain.addLast("sslFilter", sslFilter);

论坛徽章:
0
4 [报告]
发表于 2009-03-27 19:23 |只看该作者
附上   SSLcontext



public class BogusSSLContextFactory {

    /**
     * Protocol to use.
     */
    private static final String PROTOCOL = "TLS";

    private static final String KEY_MANAGER_FACTORY_ALGORITHM;

    static {
        String algorithm = Security
                .getProperty("algorithm");
        if (algorithm == null) {
            algorithm = "SunX509";
        }

        KEY_MANAGER_FACTORY_ALGORITHM = algorithm;
    }

    /**
     * Bougus Server certificate keystore file name.
     */
    private static final String BOGUS_KEYSTORE = "bogus.cert";

    // NOTE: The keystore was generated using keytool:
    //   keytool -genkey -alias bogus -keysize 512 -validity 3650
    //           -keyalg RSA -dname "CN=bogus.com, OU=XXX CA,
    //               O=Bogus Inc, L=Stockholm, S=Stockholm, C=SE"
    //           -keypass boguspw -storepass boguspw -keystore bogus.cert

    /**
     * Bougus keystore password.
     */
    private static final char[] BOGUS_PW = { 'b', 'o', 'g', 'u', 's', 'p', 'w' };

    private static SSLContext serverInstance = null;

    private static SSLContext clientInstance = null;

    /**
     * Get SSLContext singleton.
     *
     * @return SSLContext
     * @throws java.security.GeneralSecurityException
     *
     */
    public static SSLContext getInstance(boolean server)
            throws GeneralSecurityException {
        SSLContext retInstance = null;
        if (server) {
            if (serverInstance == null) {
                synchronized (BogusSSLContextFactory.class) {
                    if (serverInstance == null) {
                        try {
                            serverInstance = createBougusServerSSLContext();
                        } catch (Exception ioe) {
                            throw new GeneralSecurityException(
                                    "Can't create Server SSLContext:" + ioe);
                        }
                    }
                }
            }
            retInstance = serverInstance;
        } else {
            if (clientInstance == null) {
                synchronized (BogusSSLContextFactory.class) {
                    if (clientInstance == null) {
                        clientInstance = createBougusClientSSLContext();
                    }
                }
            }
            retInstance = clientInstance;
        }
        return retInstance;
    }

    private static SSLContext createBougusServerSSLContext()
            throws GeneralSecurityException, IOException {
        // Create keystore
        KeyStore ks = KeyStore.getInstance("JKS");
        InputStream in = null;
        try {
            in = BogusSSLContextFactory.class
                    .getResourceAsStream(BOGUS_KEYSTORE);
            ks.load(in, BOGUS_PW);
        } finally {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException ignored) {
                }
            }
        }

        // Set up key manager factory to use our key store
        KeyManagerFactory kmf = KeyManagerFactory
                .getInstance(KEY_MANAGER_FACTORY_ALGORITHM);
        kmf.init(ks, BOGUS_PW);

        // Initialize the SSLContext to work with our key managers.
        SSLContext sslContext = SSLContext.getInstance(PROTOCOL);
        sslContext.init(kmf.getKeyManagers(),
                BogusTrustManagerFactory.X509_MANAGERS, null);

        return sslContext;
    }

    private static SSLContext createBougusClientSSLContext()
            throws GeneralSecurityException {
        SSLContext context = SSLContext.getInstance(PROTOCOL);
        context.init(null, BogusTrustManagerFactory.X509_MANAGERS, null);
        return context;
    }

}

论坛徽章:
0
5 [报告]
发表于 2009-03-28 00:28 |只看该作者
申请加精

论坛徽章:
0
6 [报告]
发表于 2009-03-29 10:54 |只看该作者
这个要顶一下

论坛徽章:
0
7 [报告]
发表于 2009-03-30 10:54 |只看该作者
看着累。
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP