首页

对比阻塞IO、非阻塞NIO、异步非阻塞AIO模型代码示例

标签:io,阻塞I/O     发布时间:2022-12-11   

一、对比说明

BIO (Blocking I/O):同步阻塞I/O模式,传统的 java.io 包。@b@NIO (New I/O):同步非阻塞模式 - 是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,可以构建多路复用的、同步非阻塞 IO 程序。@b@AIO (Asynchronous I/O):异步非阻塞I/O模型,是 Java 1.7 之后引入的包,是 NIO 的升级版本,提供了异步非堵塞的 IO 操作方式

完整代码示例参加《亿级流量Jαva高并发与网络编程实战》的ch06章节目录。

二、代码说明

1) io示例

package com.xwood.demo.chat.io;@b@@b@import java.io.FileOutputStream;@b@import java.io.IOException;@b@import java.io.InputStream;@b@import java.io.OutputStream;@b@import java.net.Socket;@b@import java.net.UnknownHostException;@b@@b@public class MyClient {@b@    public static void main(String[] args) throws UnknownHostException, IOException {@b@        //客户端 连接服务端发布的服务@b@        Socket socket = new Socket("127.0.0.1",8882);@b@        //接受服务端发来到文件@b@        InputStream in = socket.getInputStream() ;@b@        byte[] bs = new byte[64] ;@b@        int len = -1 ;@b@        OutputStream fileOut = new FileOutputStream("d:\\temp\\xwood_back.gif") ;@b@        while( (len =in.read(bs))!=-1 ) {@b@            fileOut.write(bs,0,len);@b@        }@b@        System.out.println("文件接收成功!");@b@        fileOut.close();@b@        in.close();@b@        socket.close();@b@    }@b@@b@}
package com.xwood.demo.chat.io;@b@@b@import java.io.IOException;@b@import java.net.ServerSocket;@b@import java.net.Socket;@b@@b@public class MyServer {@b@    public static void main(String[] args) throws IOException {@b@        //服务的地址: 本机ip:8888@b@        ServerSocket server = new ServerSocket(8882);@b@        //允许接收多个客户端连接@b@        while (true) {@b@            //一直阻塞,直到有客户端发来连接@b@            Socket socket = server.accept();@b@            //创建一个线程,用于给该客户端发送一个文件@b@            new Thread(new SendFile(socket)).start();@b@        }@b@    }@b@}
package com.xwood.demo.chat.io;@b@@b@import java.io.File;@b@import java.io.FileInputStream;@b@import java.io.InputStream;@b@import java.io.OutputStream;@b@import java.net.Socket;@b@@b@//服务端向客户端发送文件@b@public class SendFile implements Runnable{@b@    private Socket socket ;@b@    public SendFile(Socket socket) {@b@        this.socket = socket ;@b@    }@b@    @Override@b@    public void run() {@b@        try {@b@            System.out.println("连接成功!");@b@            OutputStream out = socket.getOutputStream() ;@b@            File file  = new File("d:\\xwood_back.gif");@b@            InputStream fileIn = new FileInputStream(file) ;@b@            byte[] bs = new byte[64] ;@b@            int len = -1 ;@b@            while( (len=fileIn.read(bs)) !=-1   ) {@b@                out.write(bs,0,len);@b@            }@b@            fileIn.close();@b@            out.close();@b@            socket.close();@b@        }catch(Exception e) {@b@            e.printStackTrace();@b@        }@b@    }@b@}

2)nio示例

package com.xwood.demo.chat.nio;@b@@b@@b@import java.io.IOException;@b@import java.net.InetSocketAddress;@b@import java.net.ServerSocket;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.SelectionKey;@b@import java.nio.channels.Selector;@b@import java.nio.channels.ServerSocketChannel;@b@import java.nio.channels.SocketChannel;@b@import java.nio.charset.Charset;@b@import java.util.HashMap;@b@import java.util.Iterator;@b@import java.util.Map;@b@import java.util.Set;@b@@b@@b@public class ChatServer {@b@    /*@b@        clientsMap:保存所有的客户端@b@            key:客户端的名字@b@            value:客户端连接服务端的Channel@b@     */@b@    private static Map<String, SocketChannel> clientsMap = new HashMap();@b@@b@    public static void main(String[] args) throws IOException {@b@        int[] ports = new int[]{7777,8889,9999};@b@        Selector selector = Selector.open();@b@@b@        for(int port:ports){@b@            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();@b@            serverSocketChannel.configureBlocking(false);@b@@b@            ServerSocket serverSocket = serverSocketChannel.socket();@b@@b@            //将聊天服务绑定到7777、8888和9999三个端口上@b@            serverSocket.bind(new InetSocketAddress(port));@b@            System.out.println("服务端启动成功,端口"+port);@b@@b@            //在服务端的选择器上,注册一个通道,并标识该通道所感兴趣的事件是:接收客户端连接(接收就绪)@b@            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);@b@@b@        }@b@@b@        while (true) {@b@            //一直阻塞,直到选择器上存在已经就绪的通道(包含感兴趣的事件)@b@            selector.select();@b@            //selectionKeys包含了所有通道与选择器之间的关系(接收连接、读、写)@b@            Set<SelectionKey> selectionKeys = selector.selectedKeys();@b@            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();@b@            //如果selector中有多个就绪通道(接收就绪、读就绪、写就绪等),则遍历这些通道@b@            while (keyIterator.hasNext()) {@b@                SelectionKey selectedKey = keyIterator.next();@b@                String receive = null;@b@                //与客户端交互的通道@b@                SocketChannel clientChannel;@b@                try {@b@                    //接收就绪(已经可以接收客户端的连接了)@b@                    if (selectedKey.isAcceptable()) {@b@                        ServerSocketChannel server = (ServerSocketChannel) selectedKey.channel();@b@                        clientChannel = server.accept();@b@                        //切换到非阻塞模式@b@                        clientChannel.configureBlocking(false);@b@                        //再在服务端的选择器上,注册第二个通道,并标识该通道所感兴趣的事件是:接收客户端发来的消息(读就绪)@b@                        clientChannel.register(selector, SelectionKey.OP_READ);@b@                        //用“key四位随机数”的形式模拟客户端的key值@b@                        String key = "key" + (int) (Math.random() * 9000 + 1000);@b@                        //将该建立完毕连接的 通道 保存到clientsMap中@b@                        clientsMap.put(key, clientChannel);@b@                        //读就绪(已经可以读取客户端发来的信息了)@b@                    } else if (selectedKey.isReadable()) {@b@                        clientChannel = (SocketChannel) selectedKey.channel();@b@                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);@b@                        int result = -1 ;@b@                        try {@b@                            //将服务端读取到的客户端消息,放入readBuffer中@b@                            result = clientChannel.read(readBuffer);@b@                            //如果终止客户端,则read()会抛出IOException异常,可以依次判断是否有客户端退出。@b@                        }catch (IOException e){@b@                            //获取退出连接的client对应的key@b@                            String clientKey = getClientKey(clientChannel);@b@                            System.out.println("客户端"+clientKey+"退出聊天室");@b@                            clientsMap.remove(clientKey);@b@                            clientChannel.close();@b@                            selectedKey.cancel();@b@@b@                            continue;@b@                        }@b@                        if (result > 0) {@b@                            readBuffer.flip();@b@                            Charset charset = Charset.forName("utf-8");@b@                            receive = String.valueOf(charset.decode(readBuffer).array());@b@                            //将读取到的客户端消息,打印在服务端的控制台,格式: “客户端key,客户端消息”@b@                            System.out.println(clientChannel + ":" + receive);@b@                            //处理客户端第一次发来的连接测试信息@b@                            if ("connecting".equals(receive)) {@b@                                receive = "新客户端加入聊天!";@b@                            }@b@                            //将读取到的客户消息保存在attachment中,用于后续向所有客户端转发此消息@b@                            selectedKey.attach(receive);@b@                            //将通道所感兴趣的事件标识为:向客户端发送消息(写就绪)@b@                            selectedKey.interestOps(SelectionKey.OP_WRITE);@b@                        }@b@                        //写就绪@b@                    } else if (selectedKey.isWritable()) {@b@                        clientChannel = (SocketChannel) selectedKey.channel();@b@                        //获取发送消息从client对应的key@b@                        String sendKey = getClientKey(clientChannel);@b@                        //将接收到的消息,拼接成“发送消息的客户端Key:消息”的形式,再广播给所有client@b@                        for (Map.Entry<String, SocketChannel> entry : clientsMap.entrySet()) {@b@                            SocketChannel eachClient = entry.getValue();@b@                            ByteBuffer broadcastMsg = ByteBuffer.allocate(1024);@b@                            broadcastMsg.put((sendKey + ":" + selectedKey.attachment()).getBytes());@b@                            broadcastMsg.flip();@b@                            eachClient.write(broadcastMsg);@b@@b@                        }@b@                        selectedKey.interestOps(SelectionKey.OP_READ);@b@                    }@b@                } catch (Exception e) {@b@                    e.printStackTrace();@b@                }@b@            }@b@            selectionKeys.clear();@b@        }@b@    }@b@@b@    public static String getClientKey(SocketChannel clientChannel){@b@        String sendKey = null;@b@        //很多client在发下消息,通过for找到是哪个client在发消息,找到该client的key@b@        for (Map.Entry<String, SocketChannel> entry : clientsMap.entrySet()) {@b@            if (clientChannel == entry.getValue()) {@b@                //找到发送消息的client所对应的key@b@                sendKey = entry.getKey();@b@                break;@b@            }@b@        }@b@        return sendKey ;@b@    }@b@}
package com.xwood.demo.chat.nio;@b@@b@@b@import java.io.BufferedReader;@b@import java.io.IOException;@b@import java.io.InputStreamReader;@b@import java.net.InetSocketAddress;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.SelectionKey;@b@import java.nio.channels.Selector;@b@import java.nio.channels.SocketChannel;@b@import java.util.Iterator;@b@import java.util.Set;@b@@b@@b@public class ChatClient {@b@    public static void main(String[] args) {@b@        try {@b@            SocketChannel socketChannel = SocketChannel.open();@b@            //切换到非阻塞模式@b@            socketChannel.configureBlocking(false);@b@            Selector selector = Selector.open();@b@            //在客户端的选择器上,注册一个通道,并标识该通道所感兴趣的事件是:向服务端发送连接(连接就绪)。对应于服务端的OP_ACCEPT事件@b@            socketChannel.register(selector, SelectionKey.OP_CONNECT);@b@            //随机连接到服务端提供的一个端口上@b@            //int[] ports = {7777,8889,9999};@b@            // int port = ports[(int)(Math.random()*3)] ;@b@            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8889));@b@            while (true) {@b@                selector.select();@b@                //selectionKeys包含了所有通道与选择器之间的关系(请求连接、读、写)@b@                Set<SelectionKey> selectionKeys = selector.selectedKeys();@b@                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();@b@                while (keyIterator.hasNext()) {@b@                    SelectionKey selectedKey = keyIterator.next();@b@                    //判断是否连接成功@b@                    if (selectedKey.isConnectable()) {@b@                        ByteBuffer sendBuffer = ByteBuffer.allocate(1024);@b@                        //创建一个用于和服务端交互的Channel@b@                        SocketChannel client = (SocketChannel) selectedKey.channel();@b@                        //如果状态是:正在连接中...@b@                        if (client.isConnectionPending()) {@b@                            boolean isConnected = client.finishConnect();@b@                            if (isConnected) {@b@                                System.out.println("连接成功!访问的端口是:"+8889);@b@                                //向服务端发送一条测试消息@b@                                sendBuffer.put("connecting".getBytes());@b@                                sendBuffer.flip();@b@                                client.write(sendBuffer);@b@                            }@b@@b@                            //在“聊天室”中,对于客户端而言,可以随时向服务端发送消息(写操作),因此,需要建立一个单独写线程@b@                            new Thread(() -> {@b@                                while (true) {@b@                                    try {@b@                                        sendBuffer.clear();@b@                                        //接收用户从控制台输入的内容,并发送给服务端@b@                                        InputStreamReader reader = new InputStreamReader(System.in);@b@                                        BufferedReader bReader = new BufferedReader(reader);@b@                                        String message = bReader.readLine();@b@@b@                                        sendBuffer.put(message.getBytes());@b@                                        sendBuffer.flip();@b@                                        client.write(sendBuffer);@b@                                    } catch (Exception e) {@b@                                        e.printStackTrace();@b@                                    }@b@                                }@b@                            }).start();@b@                        }@b@                        //标记通道感兴趣的事件是:读取服务端消息(读就绪)@b@                        client.register(selector, SelectionKey.OP_READ);@b@                        //客户端读取服务端的反馈消息@b@                    } else if (selectedKey.isReadable()) {@b@                        SocketChannel client = (SocketChannel) selectedKey.channel();@b@                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);@b@                        //将服务端的反馈消息放入readBuffer中@b@                        int len = client.read(readBuffer);@b@                        if (len > 0) {@b@                            String receive = new String(readBuffer.array(), 0, len);@b@                            System.out.println(receive);@b@                        }@b@                    }@b@                }@b@                selectionKeys.clear();@b@            }@b@@b@        } catch (IOException e) {@b@            e.printStackTrace();@b@        }@b@    }@b@}

3)aio示例

package com.xwood.demo.chat.aio;@b@@b@import java.net.InetSocketAddress;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.AsynchronousServerSocketChannel;@b@import java.nio.channels.AsynchronousSocketChannel;@b@import java.nio.channels.CompletionHandler;@b@@b@public class AIOServer {@b@@b@    public static void main(String[] args) throws Exception {@b@        final AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel@b@                .open()@b@                .bind(new InetSocketAddress("127.0.0.1", 8881));@b@@b@        while (true) {@b@            //接收客户端请求的连接@b@            channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {@b@                //当接收到连接时,触发completed()@b@                @Override@b@                public void completed(final AsynchronousSocketChannel client, Void attachment) {@b@                    channel.accept(null, this);@b@                    ByteBuffer buffer = ByteBuffer.allocate(1024);@b@                    //开接收客户端发来的消息@b@                    client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {@b@                        //当接收到消息时,触发completed()@b@                        @Override@b@                        public void completed(Integer result_num, ByteBuffer dataBuffer) {@b@                            dataBuffer.flip();@b@                            String receive = new String(dataBuffer.array(), 0, dataBuffer.limit());@b@                            System.out.println("接收到的客户端消息:" + receive);@b@                            try {@b@                                client.close();@b@                            } catch (Exception e) {@b@                                e.printStackTrace();//打印异常@b@                            }@b@                        }@b@                        @Override@b@                        public void failed(Throwable e, ByteBuffer attachment) {@b@                            e.printStackTrace();@b@                        }@b@                    });@b@                }@b@@b@                @Override@b@                public void failed(Throwable e, Void attachment) {@b@                    e.printStackTrace();@b@                }@b@            });@b@            for (; ; ) {@b@                System.out.println("main线程和用于读取客户端消息的线程是异步执行的...");@b@                Thread.sleep(1000);@b@            }@b@        }@b@    }@b@@b@}
package com.xwood.demo.chat.aio;@b@@b@import java.net.InetSocketAddress;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.AsynchronousSocketChannel;@b@import java.util.concurrent.Future;@b@@b@public class AIOClient {@b@    public static void main(String[] args) throws Exception {@b@        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();@b@        channel.connect(new InetSocketAddress("127.0.0.1", 8881)).get();@b@        ByteBuffer buffer = ByteBuffer.wrap("Hello Server".getBytes());@b@        //向服务端发送消息@b@        Future<Integer> future = channel.write(buffer);@b@        while (!future.isDone()) {@b@            System.out.println("在channel将消息发送完毕以前,main可以异步处理其他事情..");@b@            Thread.sleep(1000);@b@        }@b@        Integer len = future.get();@b@        System.out.println("发送完毕!共发送字节数:"+len);@b@@b@    }@b@}