NIO


NIO

NIO与BIO

  1. BIO:Blocking IO,阻塞模型,适用于连接数目较小且固定的架构

    为每个连接创建一个线程

  2. NIO:Non-Blocking IO,同步非阻塞,适用于连接数目较多且连接比较短的架构

    专有一个或多个线程服务于众多的IO连接(Netty)

  3. AIO:Asynchronous IO,异步非阻塞,适用于连接数目较多且连接比较长的架构

    操作系统进行调度,当操作系统完成后才通知服务端启动线程去执行

BIO

传统的Java io编程,相关接口在java.io

实现流程

  1. 服务器端启动一个ServerSocket
  2. 客户端启动Socket与服务器进行通信,默认情况下,对客户机的每一个连接服务器均会创建一个新的服务线程
  3. 客户端会先等待(阻塞)服务器的响应
  4. 客户机会等待到请求结束再继续执行

实现

public static void main(String[] args) throws Exception {
    //创建一个线程池

    //对每一个客户机请求,创建一个线程与之通信

    ExecutorService threadPool = Executors.newCachedThreadPool();

    //创建ServerSocket
    ServerSocket serverSocket = new ServerSocket(6666);
    System.out.println("Server start...");

    while (true) {
        //监听,等待客户机连接
        final Socket socket = serverSocket.accept();
        System.out.println("Accepted...");

        //创建一个线程与之通信
        threadPool.execute(() -> {
            try {
                byte[] bytes = new byte[1024];
                InputStream inputStream = socket.getInputStream();
                int length = 0;
                while ((length = inputStream.read(bytes)) != -1) {
                    System.out.println(Thread.currentThread().getName() + ":" +
                            new String(bytes, 0, length));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

输出结果

Server start...
Accepted...
pool-1-thread-1:a
Accepted...
pool-1-thread-2:b

NIO

位于java.nio包下

NIO是面向缓冲区/面向块的编程

有三大核心部分:Channel, Buffer, Selector

  1. 每个channel都会对应一个buffer
  2. selector会对应一个线程
  3. 一个线程(selector)对应多个channel(连接)
  4. 程序切换channel是由事件决定的
  5. selector会根据不同的事件在各个通道上切换
  6. buffer就是一个内存块,底层是一个数组
  7. 数据的读取或者写入是通过buffer,与BIO不同,BIO中一个流要么是输入流,要么是输出流,但buffer是双向的

Buffer

Buffer的使用案例

public class BasicBuffer {
    public static void main(String[] args) {
        //创建一个大小为5的buffer
        IntBuffer intBuffer = IntBuffer.allocate(5);

        //向buffer内放入数据
        for (int i = 0; i < intBuffer.capacity(); i++) {
            intBuffer.put(i * i);
        }

        //将buffer转换,读写切换
        intBuffer.flip();

        while (intBuffer.hasRemaining()) {
            System.out.println(intBuffer.get());
        }
    }
}

在Buffer的源码中定义了如下变量

在进行反转(flip)时,position会变为下一个要读取的字符位置,而limit会变成已经写入的字节数目。

private int mark = -1;//标记,一般使用默认值
private int position = 0;//下一个要被读/写的位置
private int limit;//表示缓冲区当前终点,可以修改
private int capacity;//缓冲区最大大小,不可以改变

ByteBuffer

在网络编程中,最常用的是ByteBuffer,常用的方法如下

//创建直接缓冲区
public static ByteBuffer allocateDirect(int capacity)
//设置缓冲区的初始容量
public static ByteBuffer allocate(int capacity)
//获取下一个值
public abstract byte get()
//从绝对位置get
public abstract byte get(int var1)
//放入一个元素
public abstract ByteBuffer put(byte var1)
//从绝对位置放入一个元素
public abstract ByteBuffer put(int var1, byte var2)

Channel

  1. 通道可以同时读写
  2. 通道可以实现异步读写数据
  3. 通道可以从缓冲读取数据,也可以将数据写入缓冲
  4. 常用的Channel有:FileChannel, DatagramChannel, ServerSocketChannel, SocketChannel
  5. FileChannel用于文件读写,DatagramChannel用于UDP读写,ServerSocketChannel, SocketChannel用于TCP读写

写入实现

public class NIOFileChannel1 {
    public static void main(String[] args) throws Exception{
        String str = "hello";
        //创建一个输出流->channel
        FileOutputStream fileOutputStream = new FileOutputStream("NIOFileChannelOutput1.txt");

        //通过fileOutputStream获得文件输出流
        //filechannel真实类型是FileChannelImpl
        FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();

        //创建一个缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        //将str放入到buffer
        buffer.put(str.getBytes());

        //由写转读
        buffer.flip();

        //将缓冲区的数据写入到通道
        fileOutputStreamChannel.write(buffer);
        fileOutputStreamChannel.close();
    }
}

读入数据实现

public class NIOFileChannel2 {
    public static void main(String[] args) throws Exception{
        String str = "hello";
        //创建一个输入流->channel
        File file = new File("NIOFileChannelInput1.txt");
        FileInputStream fileInputStream = new FileInputStream(file);

        //通过fileOutputStream获得文件输出流
        //filechannel真实类型是FileChannelImpl
        FileChannel fileInputStreamChannel = fileInputStream.getChannel();

        //创建一个缓冲区
        ByteBuffer buffer = ByteBuffer.allocate((int)file.length());

        //将通道的数据读入到buffer
        fileInputStreamChannel.read(buffer);

        //将byteBuffer的数据转化为String
        System.out.println(new String(buffer.array()));

        //关闭流
        fileInputStreamChannel.close();
        fileInputStream.close();
    }
}

Channel APIs

ServerSocketChannel

ServerSocketChannel在服务器端监听客户端Socket连接

bind

public final ServerSocketChannel bind(SocketAddress local)

accept

public abstract SocketChannel accept() throws IOException

configureBlocking:设置阻塞模式,默认阻塞。错误的阻塞模式可能会导致java.nio.channels.IllegalBlockingModeException错误

public final SelectableChannel configureBlocking(boolean block) throws IOException

register:注册一个监听器并设置监听事件

public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException
SocketChannel

SocketChannel具有configureBlocking,open,read,write,connect方法,另外还拥有finishConnection用于connect失败后处理,register注册一个监听器并设置监听事件。

Channel与Buffer的注意事项

  1. 使用哪种类型写入也要用那种方式读出,否则可能会抛出BufferUnderflowException

  2. NIO提供了MappedByteBuffer,可以让文件直接在内存(堆外内存)进行修改,而如何同步到文件由NIO来完成

    public class MappedByteBufferTest {
        public static void main(String[] args) throws Exception{
    
            RandomAccessFile input = new RandomAccessFile("MappedByteBufferTestInput.txt", "rw");
    
            //获取对应的通道
            FileChannel channel = input.getChannel();
    
            //三个参数意义是使用的读写模式,可以直接修改的起始位置,映射到内存的大小
            MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
    
            map.put(0, (byte) 'H');
            map.put(0, (byte) 'e');
    
            channel.close();
            input.close();
    
        }
    }
  3. 此外,NIO还支持通过多个Buffer(Buffer数组)完成读写操作,即Scattering和Gathering

    • Scattering:将数据写入到buffer时,可以采用buffer数组,依次写入
    • Gathering:从buffer读取数据时,可以采用buffer数组,依次读
    public class ScatteringAndGathering {
        public static void main(String[] args) throws Exception {
            //使用ServerSocketChannel和SocketChannel
    
            ServerSocketChannel serversocketChannel = ServerSocketChannel.open();
            InetSocketAddress inetSocketAddress = new InetSocketAddress(7000);
    
            //绑定端口到socket并启动
            serversocketChannel.socket().bind(inetSocketAddress);
    
            //创建buffer数组
            ByteBuffer[] byteBuffers = new ByteBuffer[2];
            byteBuffers[0] = ByteBuffer.allocate(5);
            byteBuffers[1] = ByteBuffer.allocate(3);
    
            //监听,等待客户端连接
            SocketChannel socketChannel = serversocketChannel.accept();
    
            //循环读取数据
            int messageLength = 8;
            while (true) {
                int byteRead = 0;
                while (byteRead < messageLength) {
                    long l = socketChannel.read(byteBuffers);
                    byteRead += l;
                    System.out.println("readByte = " + byteRead);
    
                    //使用流打印,观察当亲position以及limit
                    Arrays.asList(byteBuffers).stream()
                            .map(buffer -> "position = " + buffer.position() + ", limit = " + buffer.limit())
                            .forEach(System.out::println);
                }
    
                //将所有的buffer进行flip
                Arrays.asList(byteBuffers).forEach(buffer -> buffer.flip());
    
                //将数据显示回客户端
                long byteWrite = 0;
                while (byteWrite < messageLength) {
                    long l = socketChannel.write(byteBuffers);
                    byteWrite += l;
                }
    
                //将所有的buffer进行clear
                Arrays.asList(byteBuffers).forEach(buffer -> buffer.clear());
    
                System.out.println("byteRead = " + byteRead + "byteWrite = " + byteWrite);
            }
        }
    }

Seletor

  1. Seletor能够检测多个注册的通道上是否有事件发生,如果有事件发生,则Seletor会切换到对应的通道
  2. 避免了在多线程之间频繁的创建释放资源

Seletor相关方法

  • 声明

    public abstract class Selector implements Closeable
  • open

    public static Selector open() throws IOException
  • select

    public abstract int select() throws IOException
    //等待var1的时间
    public abstract int select(long var1) throws IOException

Selector工作原理

  1. 当客户端连接时,会通过ServerSocketChannel得到SocketChannel
  2. selector进行监听select()方法,返回有事件返回的通道个数
  3. 将socketChannel通过register(Selector sel, int ops)注册到Selector上,一个Selector上可以注册多个SocketChannel
  4. 注册后返回一个SelectionKey,会和该Selector关联
  5. 进一步得到各SelectionKey(有事件发生的)
  6. 通过SelectionKey的channel()方法反向获取SocketChannel
  7. 可以通过得到的channel完成业务处理

代码实现一个C/S简单Demo

服务器端代码

public class NIOServer {
    public static void main(String[] args) throws Exception {

        //创建一个ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        //得到一个Selector实例
        Selector selector = Selector.open();

        //绑定一个端口
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));

        //设置为非阻塞
        serverSocketChannel.configureBlocking(false);

        //把serverSocketChannel注册到selector关心事件为OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            //等待一秒,若没有连接事件的处理
            if (selector.select(1000) == 0) {
                System.out.println("暂时没有连接....");
                continue;
            }

            //如果有事件发生,获取到有相应事件发生的selection集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();

            //通过selectionKeys获取对应的通道
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();

            while (keyIterator.hasNext()) {
                //获取selectionKey
                SelectionKey key = keyIterator.next();
                //根据key对应通带发生的事件做出相应的处理
                if (key.isAcceptable()) {
                    //给该连接生成一个socketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    //将socketChannel设置为非阻塞
                    socketChannel.configureBlocking(false);
                    //将当前socketChannel注册到selector
                    socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                }

                if (key.isReadable()) {
                    //通过key反向获得对应的channel
                    SocketChannel channel = (SocketChannel) key.channel();
                    //获取到该channel关联的buffer
                    ByteBuffer buffer = (ByteBuffer)key.attachment();
                    channel.read(buffer);
                    System.out.println("客户端发送的数据为:" + buffer);
                }
                //手动删除key
                keyIterator.remove();
            }
        }
    }
}

客户端代码

public class NIOClient {
    public static void main(String[] args) throws Exception{
        //得到一个网络通道
        SocketChannel channel = SocketChannel.open();
        //设置非阻塞
        channel.configureBlocking(false);
        //提供服务器的端口和ip地址
        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);

        //连接服务器
        if (channel.connect(inetSocketAddress)){
            while (!channel.finishConnect()) {
                System.out.println("Connecting...");
            }
        }

        String str = "hello";
        //产生一个字节数组到buffer
        ByteBuffer wrap = ByteBuffer.wrap(str.getBytes());
        //发送数据,将buffer写入channel
        channel.write(wrap);
        System.in.read();
    }
}

应用实例

基于NIO的群聊系统

  1. 服务器端启动并监听6667端口
  2. 服务器接收客户端消息并进行转发
  3. 连接服务器
  4. 发送消息
  5. 接受服务器发送的消息

实例代码:

GroupChatServer

public class GroupChatServer {
    //定义相关属性
    private Selector selector;
    private ServerSocketChannel listenChannel;
    private static final int port = 6667;

    //构造器
    //初始化工作
    public GroupChatServer() {
        try {
            //得到选择器
            selector = Selector.open();
            //ServerSocketChannel
            listenChannel = ServerSocketChannel.open();
            //绑定端口
            listenChannel.socket().bind(new InetSocketAddress(port));
            //设置为非阻塞
            listenChannel.configureBlocking(false);
            //将该listenChannel注册到selector
            listenChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void listen() {
        try {
            //循环处理
            while (true) {
                int count = selector.select(2000);
                if (count > 0) {
                    //遍历得到selectionKeys集合
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        //如果是accept事件
                        if (key.isAcceptable()) {
                            SocketChannel socketChannel = listenChannel.accept();
                            //设置非阻塞
                            socketChannel.configureBlocking(false);
                            //将该socketChannel注册到selector上
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            //给出提示
                            System.out.println(socketChannel.getRemoteAddress() + " is online...");
                        }
                        //如果通道发生read事件
                        if (key.isReadable()) {
                            //处理都读事件
                            readData(key);
                        }
                        //将当前的key从集合中删除,避免重复处理
                        iterator.remove();
                    }
                } else {
                    System.out.println("Waiting for client...");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //读取客户端消息
    private void readData(SelectionKey key) {
        //定义一个SocketChannel
        SocketChannel channel = null;
        try {
            //取到关联的Channel
            channel = (SocketChannel) key.channel();
            //创建buffer
            ByteBuffer buffer = ByteBuffer.allocate(1024);

            int readCount = channel.read(buffer);

            //根据readCount的值做处理
            if (readCount > 0) {
                //把缓冲区的byte转成字符串
                String msg = new String(buffer.array());
                System.out.println("Client: " + msg);

                //向其他客户机转发消息
                sendInfoToOtherClients(msg, channel);
            }
        } catch (Exception e) {
            try {
                System.out.println(channel.getRemoteAddress() + "is offline...");
                //取消注册
                key.cancel();
                //关闭通道
                channel.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

    //给其他通道转发消息
    private void sendInfoToOtherClients(String msg, SocketChannel self) {
        System.out.println("Server is transferring msg...");
        try {
            //遍历所以注册在selector上的SocketChannel并排除自己
            for (SelectionKey key : selector.keys()) {
                //通过key取出对应的SocketChannel
                Channel targetChannel = key.channel();

                //排除自己
                if (targetChannel instanceof SocketChannel && targetChannel != self) {
                    SocketChannel destChannel = (SocketChannel) targetChannel;
                    //将msg存储到buffer
                    ByteBuffer wrap = ByteBuffer.wrap(msg.getBytes());
                    //将buffer的数据写入tongd
                    destChannel.write(wrap);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

    }
}

GroupChatClient

public class GroupChatClient {

    //定义相关属性
    private final String HOST = "127.0.0.1";
    private final int PORT = 6667;
    private Selector selector;
    private SocketChannel socketChannel;
    private String userName;

    //构造器,完成初始化工作
    public GroupChatClient() throws Exception {
        Selector selector = Selector.open();
        //连接服务器
        socketChannel.open(new InetSocketAddress(HOST, PORT));
        //设置非阻塞
        socketChannel.configureBlocking(false);
        //将channel注册到selector
        socketChannel.register(selector, SelectionKey.OP_READ);
        //得到userName
        userName = socketChannel.getLocalAddress().toString().substring(1);
        System.out.println(userName + " is OK...");
    }

    //向服务器发送消息
    public void sendInfo(String info) {
        info = userName + "said: " + info;
        try {
            socketChannel.write(ByteBuffer.wrap(info.getBytes()));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //读取从服务器端回复的消息
    public void readInfo() {
        try {
            int readCount = selector.select();
            if (readCount > 0) {
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                for (SelectionKey selectionKey : selectionKeys) {
                    if (selectionKey.isReadable()) {
                        //得到相关通道
                        SocketChannel channel = (SocketChannel) selectionKey.channel();
                        //得到一个buffer
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        //读取
                        socketChannel.read(buffer);
                        //把读到的消息转成字符串
                        String msg = new String(buffer.array());
                        System.out.println(msg.trim());
                    }
                    //删除当前的selectionKey
                    selectionKeys.remove(selectionKey);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception{
        //启动客户端
        GroupChatClient chatClient = new GroupChatClient();
        //启动一个线程
        new Thread(()->{
            while (true) {
                chatClient.readInfo();
                try {
                    Thread.currentThread().sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        //发送数据给服务器
        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNextLine()) {
            String s = scanner.nextLine();
            chatClient.sendInfo(s);
        }
    }
}

零拷贝

零拷贝指的是没有CPU拷贝,Java程序中常用的零拷贝有mmap(内存映射)和sendFile

普通的文件访问需要先从硬盘读入文件到内核空间,然后再将文件由内核空间拷贝到用户空间,需要4次拷贝数据,3次切换工作空间

mmap适合小数据文件读写,sendFile适合大数据的文件读写

mmap

mmap通过内存映射,将文件映射到内核缓冲区,用户可以共享一部分内核数据,这样可以减少拷贝次数,但不可以减少空间切换次数。

sendFile

sendFile数据不经过用户态,直接由内核空间进入SocketBuffer,需要3次拷贝数据,2次切换工作空间

sendFile后经优化,直接将数据由硬盘空间拷贝到kernelBuffer(可以直接修改kernelBuffer中的数据),需要2次拷贝数据,2次切换工作空间。(此时仍需要有一次CPU拷贝kernelBuffer -> socketBuffer,但仅拷贝一些描述信息,并不占用太多的资源)

零拷贝带来了更少的数据拷贝,更少的上下文切换,也大大减少了文件校验与CPU缓存伪共享

零拷贝的案例

public class NewIOClient {
    public static void main(String[] args) throws Exception{

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost", 7001));
        String fileName = "NewIOClientIn.txt";

        //得到一个文件channel
        FileChannel fileChannel = new FileInputStream(fileName).getChannel();

        //准备发送
        long startTimeMillis = System.currentTimeMillis();

        //在Linux下一次就可完成所有文件的传输
        //在Windows下一次最多只能传送8MB
        //transferTo底层使用零拷贝
        long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);

        System.out.println("file sent: " + transferCount + " using time: " + (System.currentTimeMillis() - startTimeMillis));

        fileChannel.close();

    }
}

AIO

Java编程常用到的两种模式Reactor和Proactor,Java NIO使用的是Reactor,当有事件触发时,服务器端收到通知,并进行相应的操作。

Java AIO引入异步通道的概念,采用Proactor,仅当有请求时会由操作系统完成后才会通知服务程序启动线程

琐碎的问题

  1. UTF-8中,每个中文字符对应3个字节
  2. 当出现java.nio.channels.IllegalBlockingModeException时可能是因为对应的SocketChannel设置为阻塞模式所导致

文章作者: Dale Mo
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Dale Mo !
 上一篇
认知计算前沿与应用 认知计算前沿与应用
认知计算前沿与应用认知分类脑智:依据现有知识产生新知 心智:形象思维与创新 连接主义, 人脑能不能被物化首要问题在于人脑是否能被形式化 不可解释结果–弱人工智能 document.querySelectorAll('.gi
2020-09-08
下一篇 
链式编程与流式计算基础 链式编程与流式计算基础
链式编程与流式计算基础“集合讲的是数据,流讲的是计算” 前言有关链式编程与流式计算的具体原理会写在Scala中,在这里以Java中的实现为主。 实现Lombok与链式编程的简单Demo@Data @NoArgsConstructor @Al
2020-09-07
  目录