NIO
NIO与BIO
BIO:Blocking IO,阻塞模型,适用于连接数目较小且固定的架构
为每个连接创建一个线程
NIO:Non-Blocking IO,同步非阻塞,适用于连接数目较多且连接比较短的架构
专有一个或多个线程服务于众多的IO连接(Netty)
AIO:Asynchronous IO,异步非阻塞,适用于连接数目较多且连接比较长的架构
操作系统进行调度,当操作系统完成后才通知服务端启动线程去执行
BIO
传统的Java io编程,相关接口在java.io
实现流程
- 服务器端启动一个ServerSocket
- 客户端启动Socket与服务器进行通信,默认情况下,对客户机的每一个连接服务器均会创建一个新的服务线程
- 客户端会先等待(阻塞)服务器的响应
- 客户机会等待到请求结束再继续执行
实现
public static void main(String[] args) throws Exception {
//创建一个线程池
//对每一个客户机请求,创建一个线程与之通信
ExecutorService threadPool = Executors.newCachedThreadPool();
//创建ServerSocket
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("Server start...");
while (true) {
//监听,等待客户机连接
final Socket socket = serverSocket.accept();
System.out.println("Accepted...");
//创建一个线程与之通信
threadPool.execute(() -> {
try {
byte[] bytes = new byte[1024];
InputStream inputStream = socket.getInputStream();
int length = 0;
while ((length = inputStream.read(bytes)) != -1) {
System.out.println(Thread.currentThread().getName() + ":" +
new String(bytes, 0, length));
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
输出结果
Server start...
Accepted...
pool-1-thread-1:a
Accepted...
pool-1-thread-2:b
NIO
位于java.nio包下
NIO是面向缓冲区/面向块的编程
有三大核心部分:Channel, Buffer, Selector
- 每个channel都会对应一个buffer
- selector会对应一个线程
- 一个线程(selector)对应多个channel(连接)
- 程序切换channel是由事件决定的
- selector会根据不同的事件在各个通道上切换
- buffer就是一个内存块,底层是一个数组
- 数据的读取或者写入是通过buffer,与BIO不同,BIO中一个流要么是输入流,要么是输出流,但buffer是双向的
Buffer
Buffer的使用案例
public class BasicBuffer {
public static void main(String[] args) {
//创建一个大小为5的buffer
IntBuffer intBuffer = IntBuffer.allocate(5);
//向buffer内放入数据
for (int i = 0; i < intBuffer.capacity(); i++) {
intBuffer.put(i * i);
}
//将buffer转换,读写切换
intBuffer.flip();
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
}
}
在Buffer的源码中定义了如下变量
在进行反转(flip)时,position会变为下一个要读取的字符位置,而limit会变成已经写入的字节数目。
private int mark = -1;//标记,一般使用默认值
private int position = 0;//下一个要被读/写的位置
private int limit;//表示缓冲区当前终点,可以修改
private int capacity;//缓冲区最大大小,不可以改变
ByteBuffer
在网络编程中,最常用的是ByteBuffer,常用的方法如下
//创建直接缓冲区
public static ByteBuffer allocateDirect(int capacity)
//设置缓冲区的初始容量
public static ByteBuffer allocate(int capacity)
//获取下一个值
public abstract byte get()
//从绝对位置get
public abstract byte get(int var1)
//放入一个元素
public abstract ByteBuffer put(byte var1)
//从绝对位置放入一个元素
public abstract ByteBuffer put(int var1, byte var2)
Channel
- 通道可以同时读写
- 通道可以实现异步读写数据
- 通道可以从缓冲读取数据,也可以将数据写入缓冲
- 常用的Channel有:FileChannel, DatagramChannel, ServerSocketChannel, SocketChannel
- FileChannel用于文件读写,DatagramChannel用于UDP读写,ServerSocketChannel, SocketChannel用于TCP读写
写入实现
public class NIOFileChannel1 {
public static void main(String[] args) throws Exception{
String str = "hello";
//创建一个输出流->channel
FileOutputStream fileOutputStream = new FileOutputStream("NIOFileChannelOutput1.txt");
//通过fileOutputStream获得文件输出流
//filechannel真实类型是FileChannelImpl
FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();
//创建一个缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//将str放入到buffer
buffer.put(str.getBytes());
//由写转读
buffer.flip();
//将缓冲区的数据写入到通道
fileOutputStreamChannel.write(buffer);
fileOutputStreamChannel.close();
}
}
读入数据实现
public class NIOFileChannel2 {
public static void main(String[] args) throws Exception{
String str = "hello";
//创建一个输入流->channel
File file = new File("NIOFileChannelInput1.txt");
FileInputStream fileInputStream = new FileInputStream(file);
//通过fileOutputStream获得文件输出流
//filechannel真实类型是FileChannelImpl
FileChannel fileInputStreamChannel = fileInputStream.getChannel();
//创建一个缓冲区
ByteBuffer buffer = ByteBuffer.allocate((int)file.length());
//将通道的数据读入到buffer
fileInputStreamChannel.read(buffer);
//将byteBuffer的数据转化为String
System.out.println(new String(buffer.array()));
//关闭流
fileInputStreamChannel.close();
fileInputStream.close();
}
}
Channel APIs
ServerSocketChannel
ServerSocketChannel在服务器端监听客户端Socket连接
bind
public final ServerSocketChannel bind(SocketAddress local)
accept
public abstract SocketChannel accept() throws IOException
configureBlocking:设置阻塞模式,默认阻塞。错误的阻塞模式可能会导致java.nio.channels.IllegalBlockingModeException错误
public final SelectableChannel configureBlocking(boolean block) throws IOException
register:注册一个监听器并设置监听事件
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException
SocketChannel
SocketChannel具有configureBlocking,open,read,write,connect方法,另外还拥有finishConnection用于connect失败后处理,register注册一个监听器并设置监听事件。
Channel与Buffer的注意事项
使用哪种类型写入也要用那种方式读出,否则可能会抛出BufferUnderflowException
NIO提供了MappedByteBuffer,可以让文件直接在内存(堆外内存)进行修改,而如何同步到文件由NIO来完成
public class MappedByteBufferTest { public static void main(String[] args) throws Exception{ RandomAccessFile input = new RandomAccessFile("MappedByteBufferTestInput.txt", "rw"); //获取对应的通道 FileChannel channel = input.getChannel(); //三个参数意义是使用的读写模式,可以直接修改的起始位置,映射到内存的大小 MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5); map.put(0, (byte) 'H'); map.put(0, (byte) 'e'); channel.close(); input.close(); } }此外,NIO还支持通过多个Buffer(Buffer数组)完成读写操作,即Scattering和Gathering
- Scattering:将数据写入到buffer时,可以采用buffer数组,依次写入
- Gathering:从buffer读取数据时,可以采用buffer数组,依次读
public class ScatteringAndGathering { public static void main(String[] args) throws Exception { //使用ServerSocketChannel和SocketChannel ServerSocketChannel serversocketChannel = ServerSocketChannel.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress(7000); //绑定端口到socket并启动 serversocketChannel.socket().bind(inetSocketAddress); //创建buffer数组 ByteBuffer[] byteBuffers = new ByteBuffer[2]; byteBuffers[0] = ByteBuffer.allocate(5); byteBuffers[1] = ByteBuffer.allocate(3); //监听,等待客户端连接 SocketChannel socketChannel = serversocketChannel.accept(); //循环读取数据 int messageLength = 8; while (true) { int byteRead = 0; while (byteRead < messageLength) { long l = socketChannel.read(byteBuffers); byteRead += l; System.out.println("readByte = " + byteRead); //使用流打印,观察当亲position以及limit Arrays.asList(byteBuffers).stream() .map(buffer -> "position = " + buffer.position() + ", limit = " + buffer.limit()) .forEach(System.out::println); } //将所有的buffer进行flip Arrays.asList(byteBuffers).forEach(buffer -> buffer.flip()); //将数据显示回客户端 long byteWrite = 0; while (byteWrite < messageLength) { long l = socketChannel.write(byteBuffers); byteWrite += l; } //将所有的buffer进行clear Arrays.asList(byteBuffers).forEach(buffer -> buffer.clear()); System.out.println("byteRead = " + byteRead + "byteWrite = " + byteWrite); } } }
Seletor
- Seletor能够检测多个注册的通道上是否有事件发生,如果有事件发生,则Seletor会切换到对应的通道
- 避免了在多线程之间频繁的创建释放资源
Seletor相关方法
声明
public abstract class Selector implements Closeableopen
public static Selector open() throws IOExceptionselect
public abstract int select() throws IOException //等待var1的时间 public abstract int select(long var1) throws IOException
Selector工作原理
- 当客户端连接时,会通过ServerSocketChannel得到SocketChannel
- selector进行监听select()方法,返回有事件返回的通道个数
- 将socketChannel通过register(Selector sel, int ops)注册到Selector上,一个Selector上可以注册多个SocketChannel
- 注册后返回一个SelectionKey,会和该Selector关联
- 进一步得到各SelectionKey(有事件发生的)
- 通过SelectionKey的channel()方法反向获取SocketChannel
- 可以通过得到的channel完成业务处理
代码实现一个C/S简单Demo
服务器端代码
public class NIOServer {
public static void main(String[] args) throws Exception {
//创建一个ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//得到一个Selector实例
Selector selector = Selector.open();
//绑定一个端口
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
//把serverSocketChannel注册到selector关心事件为OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
//等待一秒,若没有连接事件的处理
if (selector.select(1000) == 0) {
System.out.println("暂时没有连接....");
continue;
}
//如果有事件发生,获取到有相应事件发生的selection集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//通过selectionKeys获取对应的通道
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
//获取selectionKey
SelectionKey key = keyIterator.next();
//根据key对应通带发生的事件做出相应的处理
if (key.isAcceptable()) {
//给该连接生成一个socketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
//将socketChannel设置为非阻塞
socketChannel.configureBlocking(false);
//将当前socketChannel注册到selector
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if (key.isReadable()) {
//通过key反向获得对应的channel
SocketChannel channel = (SocketChannel) key.channel();
//获取到该channel关联的buffer
ByteBuffer buffer = (ByteBuffer)key.attachment();
channel.read(buffer);
System.out.println("客户端发送的数据为:" + buffer);
}
//手动删除key
keyIterator.remove();
}
}
}
}
客户端代码
public class NIOClient {
public static void main(String[] args) throws Exception{
//得到一个网络通道
SocketChannel channel = SocketChannel.open();
//设置非阻塞
channel.configureBlocking(false);
//提供服务器的端口和ip地址
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
//连接服务器
if (channel.connect(inetSocketAddress)){
while (!channel.finishConnect()) {
System.out.println("Connecting...");
}
}
String str = "hello";
//产生一个字节数组到buffer
ByteBuffer wrap = ByteBuffer.wrap(str.getBytes());
//发送数据,将buffer写入channel
channel.write(wrap);
System.in.read();
}
}
应用实例
基于NIO的群聊系统
- 服务器端启动并监听6667端口
- 服务器接收客户端消息并进行转发
- 连接服务器
- 发送消息
- 接受服务器发送的消息
实例代码:
GroupChatServer
public class GroupChatServer {
//定义相关属性
private Selector selector;
private ServerSocketChannel listenChannel;
private static final int port = 6667;
//构造器
//初始化工作
public GroupChatServer() {
try {
//得到选择器
selector = Selector.open();
//ServerSocketChannel
listenChannel = ServerSocketChannel.open();
//绑定端口
listenChannel.socket().bind(new InetSocketAddress(port));
//设置为非阻塞
listenChannel.configureBlocking(false);
//将该listenChannel注册到selector
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
e.printStackTrace();
}
}
public void listen() {
try {
//循环处理
while (true) {
int count = selector.select(2000);
if (count > 0) {
//遍历得到selectionKeys集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//如果是accept事件
if (key.isAcceptable()) {
SocketChannel socketChannel = listenChannel.accept();
//设置非阻塞
socketChannel.configureBlocking(false);
//将该socketChannel注册到selector上
socketChannel.register(selector, SelectionKey.OP_READ);
//给出提示
System.out.println(socketChannel.getRemoteAddress() + " is online...");
}
//如果通道发生read事件
if (key.isReadable()) {
//处理都读事件
readData(key);
}
//将当前的key从集合中删除,避免重复处理
iterator.remove();
}
} else {
System.out.println("Waiting for client...");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
//读取客户端消息
private void readData(SelectionKey key) {
//定义一个SocketChannel
SocketChannel channel = null;
try {
//取到关联的Channel
channel = (SocketChannel) key.channel();
//创建buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
int readCount = channel.read(buffer);
//根据readCount的值做处理
if (readCount > 0) {
//把缓冲区的byte转成字符串
String msg = new String(buffer.array());
System.out.println("Client: " + msg);
//向其他客户机转发消息
sendInfoToOtherClients(msg, channel);
}
} catch (Exception e) {
try {
System.out.println(channel.getRemoteAddress() + "is offline...");
//取消注册
key.cancel();
//关闭通道
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
//给其他通道转发消息
private void sendInfoToOtherClients(String msg, SocketChannel self) {
System.out.println("Server is transferring msg...");
try {
//遍历所以注册在selector上的SocketChannel并排除自己
for (SelectionKey key : selector.keys()) {
//通过key取出对应的SocketChannel
Channel targetChannel = key.channel();
//排除自己
if (targetChannel instanceof SocketChannel && targetChannel != self) {
SocketChannel destChannel = (SocketChannel) targetChannel;
//将msg存储到buffer
ByteBuffer wrap = ByteBuffer.wrap(msg.getBytes());
//将buffer的数据写入tongd
destChannel.write(wrap);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
}
}
GroupChatClient
public class GroupChatClient {
//定义相关属性
private final String HOST = "127.0.0.1";
private final int PORT = 6667;
private Selector selector;
private SocketChannel socketChannel;
private String userName;
//构造器,完成初始化工作
public GroupChatClient() throws Exception {
Selector selector = Selector.open();
//连接服务器
socketChannel.open(new InetSocketAddress(HOST, PORT));
//设置非阻塞
socketChannel.configureBlocking(false);
//将channel注册到selector
socketChannel.register(selector, SelectionKey.OP_READ);
//得到userName
userName = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(userName + " is OK...");
}
//向服务器发送消息
public void sendInfo(String info) {
info = userName + "said: " + info;
try {
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
} catch (Exception e) {
e.printStackTrace();
}
}
//读取从服务器端回复的消息
public void readInfo() {
try {
int readCount = selector.select();
if (readCount > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
if (selectionKey.isReadable()) {
//得到相关通道
SocketChannel channel = (SocketChannel) selectionKey.channel();
//得到一个buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取
socketChannel.read(buffer);
//把读到的消息转成字符串
String msg = new String(buffer.array());
System.out.println(msg.trim());
}
//删除当前的selectionKey
selectionKeys.remove(selectionKey);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception{
//启动客户端
GroupChatClient chatClient = new GroupChatClient();
//启动一个线程
new Thread(()->{
while (true) {
chatClient.readInfo();
try {
Thread.currentThread().sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
//发送数据给服务器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String s = scanner.nextLine();
chatClient.sendInfo(s);
}
}
}
零拷贝
零拷贝指的是没有CPU拷贝,Java程序中常用的零拷贝有mmap(内存映射)和sendFile
普通的文件访问需要先从硬盘读入文件到内核空间,然后再将文件由内核空间拷贝到用户空间,需要4次拷贝数据,3次切换工作空间
mmap适合小数据文件读写,sendFile适合大数据的文件读写
mmap
mmap通过内存映射,将文件映射到内核缓冲区,用户可以共享一部分内核数据,这样可以减少拷贝次数,但不可以减少空间切换次数。
sendFile
sendFile数据不经过用户态,直接由内核空间进入SocketBuffer,需要3次拷贝数据,2次切换工作空间
sendFile后经优化,直接将数据由硬盘空间拷贝到kernelBuffer(可以直接修改kernelBuffer中的数据),需要2次拷贝数据,2次切换工作空间。(此时仍需要有一次CPU拷贝kernelBuffer -> socketBuffer,但仅拷贝一些描述信息,并不占用太多的资源)
零拷贝带来了更少的数据拷贝,更少的上下文切换,也大大减少了文件校验与CPU缓存伪共享
零拷贝的案例
public class NewIOClient {
public static void main(String[] args) throws Exception{
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 7001));
String fileName = "NewIOClientIn.txt";
//得到一个文件channel
FileChannel fileChannel = new FileInputStream(fileName).getChannel();
//准备发送
long startTimeMillis = System.currentTimeMillis();
//在Linux下一次就可完成所有文件的传输
//在Windows下一次最多只能传送8MB
//transferTo底层使用零拷贝
long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);
System.out.println("file sent: " + transferCount + " using time: " + (System.currentTimeMillis() - startTimeMillis));
fileChannel.close();
}
}
AIO
Java编程常用到的两种模式Reactor和Proactor,Java NIO使用的是Reactor,当有事件触发时,服务器端收到通知,并进行相应的操作。
Java AIO引入异步通道的概念,采用Proactor,仅当有请求时会由操作系统完成后才会通知服务程序启动线程
琐碎的问题
- UTF-8中,每个中文字符对应3个字节
- 当出现java.nio.channels.IllegalBlockingModeException时可能是因为对应的SocketChannel设置为阻塞模式所导致