java io实践与总结

bio

java.io和java.net包,代码简单直观,同步/阻塞IO,一个连接需要一个处理线程,如果是几百个连接的应用,不会有什么问题,如果连接数比较多,会产生过多线程,占用较多系统资源和频繁的上下文切换。

socket io server经典处理模型

socket io

示例代码

import java.io.IOException;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class IOServer extends Thread {

    private volatile boolean running = true;
    private ServerSocket serverSocket;
    private ExecutorService pool = Executors.newFixedThreadPool(5);

    public void stopServer() {
        this.running = false;
        if (serverSocket != null) {
            try {
                serverSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        pool.shutdown();
    }


    @Override
    public void run() {
        try {
            // port=0,表示随机监听一个空闲端口
            serverSocket = new ServerSocket(0);
            System.out.println("server started, listing port: " + serverSocket.getLocalPort());
            while (running) {
                Socket socket = serverSocket.accept();
                RequestHandler handler = new RequestHandler(socket);
                pool.execute(handler);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            stopServer();
        }
    }

    public static void main(String[] args) {
        new IOServer().start();
    }

    private static class RequestHandler extends Thread {
        private volatile boolean alive = true;
        private Socket socketClient;
        private Scanner scanner;
        private PrintStream out;

        RequestHandler(Socket socketClient) {
            this.socketClient = socketClient;
            try {
                this.scanner = new Scanner(socketClient.getInputStream());
                this.scanner.useDelimiter("\n");
                this.out = new PrintStream(socketClient.getOutputStream());
                System.out.println("client connectd: " + socketClient.getInetAddress().getHostAddress());
                this.out.println("hello");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        public void close() {
            alive = false;
            scanner.close();
            out.close();
            if (!socketClient.isClosed()) {
                try {
                    socketClient.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        @Override
        public void run() {
            while (alive) {
                if (socketClient.isClosed()) {
                    System.out.println("client closed");
                    this.close();
                    return;
                }
                if (this.scanner.hasNext()) {
                    String data = this.scanner.next().trim();
                    if ("".equals(data)) {
                        continue;
                    }
                    if ("exit".equalsIgnoreCase(data)) {
                        out.println("bye...");
                        this.close();
                    } else {
                        System.out.printf("receive: %s \n", data);
                        out.printf("receive: %s \n", data);
                    }
                }
            }
        }
    }
}

使用telnet连接server测试

# 连接客户端
~ telnet 127.0.0.1 58605  
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
# 向服务端发送数据
world
# 收到服务器响应数据
receive: world 

nio

java.nio包,jdk1.4+版本。api复杂,使用不当容易出现问题。同步/非阻塞IO,真正的IO操作还是同步的,一个请求占用一个处理线程。核心组件Selector、Channel、Buffer,单个server通常能处理上万个连接

nio server处理模型 socket nio

参考教程:

  1. http://ifeve.com/java-nio-all
  2. http://tutorials.jenkov.com/java-nio/index.html
  3. http://rox-xmlrpc.sourceforge.net/niotut/
  4. http://www.importnew.com/24794.html
  5. http://www.importnew.com/28007.html

java.nio.channels.SelectionKey

事件 描述
OP_ACCEPT 请求在接受新连接并创建 Channel 时获得通知
OP_CONNECT 请求在建立一个连接时获得通知
OP_READ 请求当数据已经就绪,可以从 Channel 中读取时获得通知
OP_WRITE 请求当可以向 Channel 中写更多的数据时获得通知。这处理了套接字缓冲区被完 全填满时的情况,这种情况通常发生在数据的发送速度比远程节点可处理的速度更 快的时候

示例代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;

public class NIOServer extends Thread {

    private int port;
    private ServerSocketChannel serverChannel;
    private Selector selector;
    private ByteBuffer readBuffer = ByteBuffer.allocate(1024);

    private Map<SocketChannel, Queue<ByteBuffer>> pendingData = new ConcurrentHashMap<>(10);

    public NIOServer(int port) throws IOException {
        this.port = port;
        this.selector = this.initSelector();
    }

    private Selector initSelector() throws IOException {
        Selector socketSelector = SelectorProvider.provider().openSelector();
        this.serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        InetSocketAddress isa = new InetSocketAddress(this.port);
        serverChannel.socket().bind(isa);
        serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
        System.out.println("server started, listing port: " + port);
        return socketSelector;
    }

    @Override
    public void run() {
        while (true) {
            try {
                if (this.selector.select() < 1) {
                    continue;
                }
                Iterator selectedKeys = this.selector.selectedKeys().iterator();
                while (selectedKeys.hasNext()) {
                    SelectionKey key = (SelectionKey) selectedKeys.next();
                    selectedKeys.remove();
                    if (!key.isValid()) {
                        continue;
                    }
                    if (key.isAcceptable()) {
                        this.accept(key);
                    } else if (key.isReadable()) {
                        this.read(key);
                    } else if (key.isWritable()) {
                        this.write(key);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        // 响应数据消息队列
        Queue<ByteBuffer> queue = new LinkedBlockingDeque<>(10);
        pendingData.put(socketChannel, queue);
        socketChannel.register(this.selector, SelectionKey.OP_READ);
    }

    private void read(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        this.readBuffer.clear();
        int numRead;
        try {
            numRead = socketChannel.read(this.readBuffer);
        } catch (IOException e) {
            closeClient(key);
            return;
        }

        if (numRead == -1) {
            closeClient(key);
            return;
        }
        String data = new String(readBuffer.array(), 0, numRead).trim();
        // 退出指令
        if ("exit".equalsIgnoreCase(data)) {
            closeClient(key);
            return;
        }
        pendingData.get(socketChannel).add(Charset.defaultCharset().encode("receive: " + data + "\n"));
        key.interestOps(SelectionKey.OP_WRITE);
    }

    private void write(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        Queue<ByteBuffer> queue = pendingData.get(socketChannel);
        if (queue.isEmpty()) {
            return;
        }
        ByteBuffer buffer = queue.poll();
        while (buffer != null) {
            socketChannel.write(buffer);
            buffer = queue.poll();
        }

        if (queue.isEmpty()) {
            key.interestOps(SelectionKey.OP_READ);
        }
    }

    private void closeClient(SelectionKey key) throws IOException {
        key.channel().close();
        key.cancel();
    }

    public static void main(String[] args) {
        try {
            new NIOServer(6666).start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

nio2(aio)

jdk1.7+版本,api复杂,异步非阻塞IO,IO操作完全交给操作系统。采用事件通知回调机制,其read和write方法均是异步。

实例代码

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AIOServer {

    private int port;
    private ExecutorService executorService;
    private AsynchronousChannelGroup threadGroup;
    public AsynchronousServerSocketChannel asynServerSocketChannel;

    public AIOServer(int port) {
        this.port = port;
    }

    public AIOServer start() {
        try {
            executorService = Executors.newCachedThreadPool();
            threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
            asynServerSocketChannel = AsynchronousServerSocketChannel.open(threadGroup);
            asynServerSocketChannel.bind(new InetSocketAddress(port));
            System.out.println("server start , port : " + port);
            asynServerSocketChannel.accept(this, new AcceptHandler());
            return this;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void sync() {
        synchronized (this) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AIOServer> {

        @Override
        public void completed(AsynchronousSocketChannel clientChannel, AIOServer attachment) {
            attachment.asynServerSocketChannel.accept(attachment, this);
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            clientChannel.read(buffer, buffer, new LogicHandler(clientChannel));
        }

        @Override
        public void failed(Throwable exc, AIOServer attachment) {
            exc.printStackTrace();
        }

    }

    static class LogicHandler implements CompletionHandler<Integer, ByteBuffer> {

        private AsynchronousSocketChannel clientChannel;

        public LogicHandler(AsynchronousSocketChannel clientChannel) {
            this.clientChannel = clientChannel;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            System.out.println("result: " + result);
            if (!clientChannel.isOpen() || result < 0) {
                System.out.println("client close");
                this.close();
                return;
            }
            attachment.flip();
            String data = new String(attachment.array()).trim();
            if ("exit".equalsIgnoreCase(data)) {
                this.close();
            }
            write(clientChannel, "receive: " + data + "\r\n");
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            clientChannel.read(buffer, buffer, this);
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
        }

        private void write(AsynchronousSocketChannel clientChannel, String response) {
            try {
                ByteBuffer buf = ByteBuffer.allocate(1024);
                buf.put(response.getBytes());
                buf.flip();
                clientChannel.write(buf).get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

        private void close() {
            try {
                clientChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        AIOServer server = new AIOServer(6666);
        server.start().sync();
    }
}

总结

- BIO NIO AIO
阻塞/非阻塞 阻塞 非阻塞 非阻塞
同步/异步 同步 同步 异步
性能 非常高
线程比(server:client) 1:1 1:N 0:N
吞吐

同步与异步IO的区别

进程IO操作步骤:

  1. 进程向操作系统请求数据
  2. 操作系统把数据加载到内核的缓冲区
  3. 操作系统把内核的缓冲区拷贝到进程的缓冲区
  4. 进程从缓冲区获得数据

同步与异步IO的区别在于,操作系统操作缓冲区的时候(2、3),应用线程是否处于挂起状态(可以处理其他逻辑)。

阻塞与非阻塞区别

进程在开始IO操作之前,需要有一些准备工作,例如网络等待,在数据没有就绪前实际上还没有开始IO操作。

  • 阻塞:在真正IO操作开始前,线程处于阻塞状态,不能处理其他逻辑。
  • 非阻塞:在真正IO操作前,不需要等待,此时线程可以处理其他逻辑。