diff --git a/code/Java/nio-tutorial/pom.xml b/code/Java/nio-tutorial/pom.xml new file mode 100644 index 0000000..e2a33b5 --- /dev/null +++ b/code/Java/nio-tutorial/pom.xml @@ -0,0 +1,24 @@ + + + 4.0.0 + + org.example + nio-tutorial + 1.0-SNAPSHOT + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + \ No newline at end of file diff --git a/code/Java/nio-tutorial/src/main/java/chat/ChatClient.java b/code/Java/nio-tutorial/src/main/java/chat/ChatClient.java new file mode 100644 index 0000000..7aa600a --- /dev/null +++ b/code/Java/nio-tutorial/src/main/java/chat/ChatClient.java @@ -0,0 +1,133 @@ +package chat; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedSelectorException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.util.Scanner; +import java.util.Set; + +public class ChatClient { + + private String hostname; + private int port; + private Selector selector; + private final ByteBuffer rBuffer = ByteBuffer.allocate(1024); + private final ByteBuffer wBuffer = ByteBuffer.allocate(1024); + + ChatClient(String hostname, int port) { + this.hostname = hostname; + this.port = port; + } + + public void start() { + try { + // 创建SocketChannel + SocketChannel socketChannel = SocketChannel.open(); + socketChannel.configureBlocking(false); + socketChannel.connect(new InetSocketAddress(hostname, port)); + + // 创建selector + selector = Selector.open(); + // 注册监听CONNECT事件 + socketChannel.register(selector, SelectionKey.OP_CONNECT); + + // 持续调用select + while (true) { + selector.select(); + Set selectionKeys = selector.selectedKeys(); + for (SelectionKey selectionKey : selectionKeys) { + // 处理Connect事件 + if (selectionKey.isConnectable()) { + SocketChannel clientChannel = (SocketChannel) selectionKey.channel(); + // 有可能正处于连接中的状态 + if (clientChannel.isConnectionPending()) { + // 等待连接完成 + clientChannel.finishConnect(); + // 开始监听用户输入 + inputListening(clientChannel); + // 为clientChannel注册上Read + clientChannel.register(selector, SelectionKey.OP_READ); + System.out.println("尊敬的用户" + clientChannel.socket().getLocalPort() + ", 你已成功加入群聊!"); + } + // 处理Read事件 + } else if (selectionKey.isReadable()) { + handleMessage(selectionKey); + } + } + } + } catch (ClosedSelectorException e) { + // 主动关闭客户端,不做任何处理 + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + selector.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + /** + * 监听用户来自控制台的输入 + * + * @param clientChannel 客户端Channel + */ + private void inputListening(SocketChannel clientChannel) { + // 要持续监听用户输入,但又不能阻塞主线程,所以需要一个单独的线程来完成 + new Thread(() -> { + Scanner scanner = new Scanner(System.in); + try { + while (scanner.hasNextLine()) { + String input = scanner.nextLine(); + if (!input.isEmpty()) { + if ("exit".equals(input)) { + selector.close(); + return; + } + wBuffer.put(input.getBytes()); + wBuffer.flip(); + while (wBuffer.hasRemaining()) { + clientChannel.write(wBuffer); + } + wBuffer.clear(); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + }).start(); + } + + /** + * 处理来自其他客户端的消息 + * + * @param selectionKey 处于Read状态的selectionKey + */ + private void handleMessage(SelectionKey selectionKey) { + try { + SocketChannel channel = (SocketChannel) selectionKey.channel(); + StringBuilder buffer = new StringBuilder(); + while (channel.read(rBuffer) > 0) { + rBuffer.flip(); + buffer.append(StandardCharsets.UTF_8.decode(rBuffer)); + rBuffer.clear(); + } + System.out.println(buffer.toString()); + } catch (IOException e) { + selectionKey.cancel(); + selector.wakeup(); + System.out.println("聊天室服务器已关闭!"); + } + } + + + public static void main(String[] args) { + new ChatClient("127.0.0.1", 8888).start(); + } +} diff --git a/code/Java/nio-tutorial/src/main/java/chat/ChatServer.java b/code/Java/nio-tutorial/src/main/java/chat/ChatServer.java new file mode 100644 index 0000000..a2b4ec9 --- /dev/null +++ b/code/Java/nio-tutorial/src/main/java/chat/ChatServer.java @@ -0,0 +1,134 @@ +package chat; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.nio.charset.StandardCharsets; +import java.util.Set; + +public class ChatServer { + + private String hostname; + private int port; + private Selector selector; + private final ByteBuffer rBuffer = ByteBuffer.allocate(1024); + private final ByteBuffer wBuffer = ByteBuffer.allocate(1024); + + ChatServer(int port) { + this("127.0.0.1", port); + } + + ChatServer(String hostname, int port) { + this.hostname = hostname; + this.port = port; + } + + public void start() { + try { + // 创建ServerSocketChannel + ServerSocketChannel serverChannel = ServerSocketChannel.open(); + // 设置为非阻塞模式 + serverChannel.configureBlocking(false); + serverChannel.bind(new InetSocketAddress(hostname, port)); + + // 创建selector + selector = Selector.open(); + // 注册监听CONNECT事件 + serverChannel.register(selector, SelectionKey.OP_ACCEPT); + + // 持续调用select() + while (true) { + selector.select(); + Set selectionKeys = selector.selectedKeys(); + for (SelectionKey selectionKey : selectionKeys) { + // 处理Accept事件 + if (selectionKey.isAcceptable()) { + // 获取ServerSocketChannel + ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel(); + // 通过ServerSocketChannel获取SocketChannel + SocketChannel clientChannel = server.accept(); + clientChannel.configureBlocking(false); + // 注册Read事件 + clientChannel.register(selector, SelectionKey.OP_READ); + System.out.println("欢迎" + clientChannel.socket().getPort() + "加入聊天室!"); + // 处理Readable事件 + } else if (selectionKey.isReadable()) { + handleMessage(selectionKey); + } + } + selectionKeys.clear(); + } + + } catch (IOException e) { + e.printStackTrace(); + } finally { + // 关闭selector后,与之相关的所有资源都会被释放 + try { + selector.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + /** + * 处理客户端消息 + * + * @param selectionKey 处于Read状态的SelectionKey + */ + private void handleMessage(SelectionKey selectionKey) { + SocketChannel clientChannel = (SocketChannel) selectionKey.channel(); + try { + // 获取来自客户端的消息 + StringBuilder buffer = new StringBuilder(); + while (clientChannel.read(rBuffer) > 0) { + rBuffer.flip(); + buffer.append(StandardCharsets.UTF_8.decode(rBuffer)); + rBuffer.clear(); + } + String msg = buffer.toString(); + // 将消息发送给其他客户端 + broadcastMessage(clientChannel, msg); + } catch (IOException e) { + // 该异常由clientChannel.read(rBuffer)方法抛出,如果出现该异常,则说明clientChannel已经关闭 + // 此时需要调用cancel()取消注册在selector上的事件 + selectionKey.cancel(); + // 在多线程环境下,如果另一个线程正在阻塞地调用select(),因为事件集已经改变,因此需要通过wakeup()让其立刻返回并重新select() + selector.wakeup(); + System.out.println("用户" + clientChannel.socket().getPort() + "退出聊天室!"); + } + } + + /** + * 将客户端发来的消息广播给其他客户端 + * + * @param clientChannel 消息源 + * @param msg 消息 + */ + private void broadcastMessage(SocketChannel clientChannel, String msg) { + try { + // 遍历当前selector上所有channel + for (SelectionKey selectionKey : selector.keys()) { + SelectableChannel channel = selectionKey.channel(); + // 消息不需要转发给ServerSocketChannel和当前客户端自己 + if (selectionKey.isValid() && !(channel instanceof ServerSocketChannel) && !channel.equals(clientChannel)) { + SocketChannel otherClient = (SocketChannel) channel; + wBuffer.put(("用户" + clientChannel.socket().getPort() + ": " + msg).getBytes()); + while (wBuffer.hasRemaining()) { + wBuffer.flip(); + otherClient.write(wBuffer); + } + wBuffer.clear(); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + + } + + public static void main(String[] args) { + new ChatServer(8888).start(); + } +} diff --git a/notes/Java_NIO.md b/notes/Java_NIO.md new file mode 100644 index 0000000..ec8353a --- /dev/null +++ b/notes/Java_NIO.md @@ -0,0 +1,823 @@ +## 一、Buffer + +### 1.1 缓冲区属性 + +![nio_buffer](../pictures/nio_buffer.png) + +所有缓冲区都直接或间接继承自 Buffer 抽象类,Buffer 中定义了缓冲区的四个基本属性: + +```java +public abstract class Buffer { + + private int mark = -1; + private int position = 0; + private int limit; + private int capacity; + ... +} +``` + ++ **容量 (Capacity)** :缓冲区所能容纳元素的个数。 ++ **上界 (Limit)**:缓冲区中现存元素的个数。 ++ **位置 (Position)**:下一个待操作元素的索引。 ++ **标记 (Mark)**:标记位置。通过 `mark()` 方法可以让 mark 等于当前 position;之后通过 `reset()` 方法可以让 position 恢复到标记位置。 + +### 1.2 创建缓冲区 + +通常可以通过以下两种方法来创建缓冲区: + ++ **allocate()**:通过指定缓冲区的容量大小来创建: + +```java +CharBuffer buffer = CharBuffer.allocate(100); +``` + ++ **wrap()**:通过为缓冲区指定初始化数组来创建: + +```java +char[] chars = new char[100]; +CharBuffer buffer = CharBuffer.wrap(chars); +``` + +实际上缓冲区内部就是通过数组来存储元素,以 CharBuffer 为例,它的内部维持有一个名为 `hb` 的数组,用来存放实际的元素: + +```java +public abstract class CharBuffer extends Buffer implements Comparable, Appendable, CharSequence, Readable +{ + final char[] hb; + ... +} +``` + +缓冲区创建完成后,它处于以下初始状态: + +![buffer_init](../pictures/buffer_init.png) + +### 1.3 操作缓冲区 + +**1. put()** + +用于向缓冲区中填充数据。以 CharBuffer 为例,有以下四个常用的重载方法: + +```java +// 使用字符串填充数据 +put(String src) +// 使用字符串填充数据,start为字符串的开始位置,end为字符串的结束位置(不包含) +put(String src, int start, int end) +// 使用数组填充数据 +put(char[] src) +// 使用数组填充数据,offset为数组填充的开始位置,length为填充的长度,不允许越界 +put(char[] src, int offset, int length) +``` + +当我们向 Buffer 中添加数据后,position 属性也会随之变动: + +![buffer_put](../pictures/buffer_put.png) + +**2. get()** + +用于读取缓冲区中的数据。以 CharBuffer 为例,有以下四个常用的重载方法: + +```java +// 获取当前位置(postion)的数据 +char get(); +// 获取指定位置的数据 +char get(int index); +// 获取数据并填充到数组中 +CharBuffer get(char[] dst) +// 获取数据并填充到数据中,offset为数组填充的开始位置,length为填充的长度,不允许越界 +CharBuffer get(char[] dst, int offset, int length) +``` + +**3. flip()** + +该方法会将 position 的值赋给 limit,然后将 position 设置为 0,从而可以由写模式切换到读模式。无论任何情况,只要由写操作转换到读操作,都需要先执行该方法。示例如下: + +```java +CharBuffer buffer = CharBuffer.allocate(100); +buffer.put("hello"); +buffer.flip(); //由写模式切换到读模式 +while (buffer.hasRemaining()) { + System.out.println(buffer.get()); +} +buffer.clear(); +``` + +当使用 `filp()` 将 Buffer 由写模式切换到读模式后:position 属性会恢复到初始位置,代表从此处开始读取数据;limit 属性也会随之变动,代表我们所能读取数据的上界: + +![buffer_flip](../pictures/buffer_flip.png) + +当我们再通过 `get()` 方法进行读取时,position 属性会随之移动,position 和 limit 之间的数据就是待处理的数据: + +![buffer_get](../pictures/buffer_get.png) + +**4. hasRemaining()** + +`hasRemaining()` 用于判断当前的 position 是否小于 limit:如果 position 小于 limit,则返回 true,代表仍存在待处理的数据。 + +**5. clear()** + +将 position 设置为 0,并将 limit 设置为 capacity 的大小,从而让缓冲区恢复到初始状态。此时缓冲区的状态如下: + +![buffer_clear](../pictures/buffer_clear.png) + +**6. compact()** + +用于压缩缓冲区,即将数组中待处理的数据复制到头部。如下所示,会将未读取的 `LL0` 复制到头部: + +![buffer_compact](../pictures/buffer_compact.png) + +需要注意的是这里执行的是复制操作,而不是移动操作,底层调用的是 `System.arraycopy` 方法,因此原有位置上的数据依然存在。但是由于 position 会移动到未处理数据的下一个位置上,所以不用担心原有位置上的数据会被读取到,原因是你切换到读模式时,原有的 `LO` 数据仍处于 limit 之后: + +![buffer_compact_flip](../pictures/buffer_compact_flip.png) + +**7. mark()** + +用于设置标志位,设置好后可以使用 `reset()` 将 position 恢复到该位置: + +```java + buffer.position(2).mark().position(5).reset().position(); //从位置2移动到位置5,之后又恢复到位置2 +``` + +### 1.4 复制缓冲区 + +如果想要对一个已有的缓冲区进行复制,可以有以下三种方法: + +```java +public abstract CharBuffer duplicate(); +public abstract CharBuffer asReadOnlyBuffer(); +public abstract CharBuffer slice(); +``` + +使用 `duplicate()` 复制的缓冲区具有以下特性: + +- 与原缓冲区共享相同的数据元素,这意味着对原缓冲区数据的修改也会影响复制缓冲区; +- 复制缓冲区的 mark、position、limit、capcaity 属性的初始值与复制时原缓冲区的 mark、position、limit、capcaity 的值相同,但这些属性与原缓冲区的属性相互独立,创建后就不再受原有缓冲区的影响; +- 如果原缓冲区是只读缓冲区或直接缓冲区,则复制缓冲区也将继承这些属性。 + +![buffer_duplicate](../pictures/buffer_duplicate.png) + +`asReadOnlyBuffer()` 与 `duplicate()` 类似,但创建的复制缓冲区为只读缓冲区。 + +`slice()` 也与 `duplicate()` 类似,但创建的复制缓冲区与原缓冲区只共享部分数据元素,并且所有标志位都处于原始状态: + +![buffer_slice](../pictures/buffer_slice.png) + +示例如下: + +```java +CharBuffer buffer = CharBuffer.allocate(100); +buffer.put("helloworld"); +buffer.position(2).limit(5); +CharBuffer duplicate = buffer.duplicate(); +buffer.position(3).limit(6); +CharBuffer slice = buffer.slice(); + +System.out.println("buffer:" + buffer.position() + "," + buffer.limit()); // buffer:3,6 +System.out.println("duplicate:" + duplicate.position() + "," + duplicate.limit()); // duplicate:2,5 +System.out.println("slice:" + slice.position() + "," + slice.limit()); //slice:0,3 +``` + +### 1.5 直接缓冲区 + +ByteBuffer 支持使用 `allocateDirect` 方法来创建直接缓冲区,示例如下: + +```java +ByteBuffer byteBuffer = ByteBuffer.allocate(100); +``` + + + + + +## 二、Channel + +### 2.1 通道基础 + +Channel 接口的定义如下,其中包含了两个基础方法: + ++ **isOpen()**:判断当前 Channel 是否处于打开状态; ++ **close()**:关闭当前 Channel 。Channel 关闭后,就不能在其上再进行任何 IO 操作,否则将抛出 ClosedChannelException 异常。 + +```java +public interface Channel extends Closeable { + + public boolean isOpen(); + + public void close() throws IOException; +} +``` + +对于常见的文件操作和网络操作都可以直接获取到其对应的 Channel: + +```java +// 获取serverSocketChannel +ServerSocket serverSocket = new ServerSocket(8888); +ServerSocketChannel serverSocketChannel = serverSocket.getChannel(); + +// 获取SocketChannel +Socket socket = new Socket("127.0.0.1", 8888); +SocketChannel socketChannel = socket.getChannel(); + +// 获取FileChannel +FileInputStream fileInputStream = new FileInputStream(new File("path")); +FileChannel fileChannel = fileInputStream.getChannel(); +``` + +### 2.2 文件通道 + +FileChannel 是一个连接到文件的通道,通过该通道可以完成文件的读写。需要注意的是 FileChannel 无法设置为非阻塞模式,因此它总是运行在阻塞模式下。通过 FileChannel 和 Buffer 可以高效地完成文件的拷贝,示例如下: + +```java +try { + FileInputStream inputStream = new FileInputStream(new File("D:\\a.png")); + FileOutputStream outputStream = new FileOutputStream(new File("D:\\b.png")); + FileChannel inChannel = inputStream.getChannel(); + FileChannel outChannel = outputStream.getChannel(); + ByteBuffer buffer = ByteBuffer.allocate(256); + // 从输入channel中读取数据到buffer中 + while (inChannel.read(buffer) > 0) { + // 由写模式切换到读模式 + buffer.flip(); + while (buffer.hasRemaining()) { + //将buffer中的数据写出到输出channel + outChannel.write(buffer); + } + buffer.clear(); + } + inputStream.close(); + outputStream.close(); +} catch (IOException e) { + e.printStackTrace(); +} +``` + +这里的最后我们只需要关闭 Stream 即可,其上的 Channel 也会被关闭,源码如下: + +```java +public void close() throws IOException { + synchronized (closeLock) { + if (closed) { + return; + } + closed = true; + } + // 如果channel不为空,则关闭 + if (channel != null) { + channel.close(); + } + + fd.closeAll(new Closeable() { + public void close() throws IOException { + close0(); + } + }); +} +``` + +### 2.3 Channel To Channel + +在 Java NIO 中,如果两个 Channel 中有一个是 FileChannel,那么可以直接将数据从一个 Channel 传输到另外一个 Channel: + +```java +// 将该通道上的数据直接传送到目标通道 +transferTo(long position, long count, WritableByteChannel target) ; +// 将原通道上的数据直接传送到该通道 +transferFrom(ReadableByteChannel src, long position, long count) +``` + +还是以文件拷贝为例,使用示例如下: + +```java +try { + FileInputStream inputStream = new FileInputStream(new File("D:\\a.png")); + FileOutputStream outputStream = new FileOutputStream(new File("D:\\b.png")); + FileChannel inChannel = inputStream.getChannel(); + FileChannel outChannel = outputStream.getChannel(); + inChannel.transferTo(0,inChannel.size(),outChannel); //使用transferTo实现 + // outChannel.transferFrom(inChannel, 0, inChannel.size()); //使用transferFrom实现 + inputStream.close(); + outputStream.close(); +} catch (IOException e) { + e.printStackTrace(); +} +``` + +### 2.4 Scatter/Gather + +Java NIO 支持 scatter 和 gather 操作: + +- **分散 (scatter)**:把 Channel 中的数据依次写入到多个 Buffer 上。示例如下: + +```java +ByteBuffer buffer01 = ByteBuffer.allocate(32); +ByteBuffer buffer02 = ByteBuffer.allocate(64); +ByteBuffer buffer03 = ByteBuffer.allocate(128); + +ByteBuffer[] buffers = new ByteBuffer[]{buffer01, buffer02, buffer03}; +fileInputStream.getChannel().read(buffers); +``` + +此时 Channel 中的数据会依次写入到 Buffer01, Buffer02, Buffer03 上。Scatter 通常用于固定长度数据的处理,假设一个数据单元由 header,body,footer 三部分组成,并且每部分的长度都是固定的,此时通过 Scatter 操作,每一组数据的 header,body,footer 都会分别固定地写到 Buffer01, Buffer02, Buffer03 上,此时就可以对每个 Buffer 应用不同的处理逻辑: + +![nio_scatter](../pictures/nio_scatter.png) + ++ **聚集 (gather)**:将多个 Buffer 中的数据依次写入到同一个 Channel 上。示例如下: + +```java +ByteBuffer buffer01 = ByteBuffer.allocate(32); +ByteBuffer buffer02 = ByteBuffer.allocate(64); +ByteBuffer buffer03 = ByteBuffer.allocate(128); + +ByteBuffer[] buffers = new ByteBuffer[]{buffer01, buffer02, buffer03}; +fileInputStream.getChannel().read(buffers); +``` + +![nio_gather](../pictures/nio_gather.png) + +### 2.5 Pipe + +Java 提供了 Pipe 类用于在不同线程之间传递数据: + +![nio_pipe](../pictures/nio_pipe.png) + +Pipe 管道可以通过 Pipe 的静态方法 `open()` 来创建: + +```java +Pipe pipe = Pipe.open(); +``` + +创建完成后可以通过其 `sink()` 和 `source()` 方法来创建对应的 SinkChannel 和 SourceChannel: + +```shell +Pipe.SinkChannel sinkChannel = pipe.sink(); +Pipe.SourceChannel sourceChannel = pipe.source(); +``` + +SinkChannel 和 SourceChannel 的使用与基本的 Channel 类似,示例如下: + +```java +Pipe pipe = Pipe.open(); +new Thread(() -> { + try { + Pipe.SinkChannel sink = pipe.sink(); + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.put("HelloWorld".getBytes()); + buffer.flip(); + while (buffer.hasRemaining()) { + // 将数据写入SinkChannel + sink.write(buffer); + } + sink.close(); + } catch (IOException e) { + e.printStackTrace(); + } +}).start(); +new Thread(() -> { + try { + Pipe.SourceChannel source = pipe.source(); + ByteBuffer buffer = ByteBuffer.allocate(1024); + // 读取SourceChannel中的数据 + while (source.read(buffer) > 0) { + buffer.flip(); + while (buffer.hasRemaining()) { + System.out.print((char) buffer.get()); //输出:HelloWorld + } + buffer.clear(); + } + source.close(); + } catch (IOException e) { + e.printStackTrace(); + } +}).start(); +``` + + + +## 三、Selector + +### 3.1 创建选择器 + +想要创建一个选择器,可以通过 Selector 类的静态方法 `open()` 来实现: + +```java +Selector selector = Selector.open(); +``` + +### 3.2 注册通道 + +想要将 Channel 和 Selector 配合使用,必须要通过 `register()` 方法将 Channel 注册到 Selector 上,示例如下: + +```java +// 创建ServerSocketChannel +ServerSocketChannel serverChannel = ServerSocketChannel.open(); +// 与 Selector一起使用的Channel必须处于非阻塞模式下 +serverChannel.configureBlocking(false); +serverChannel.bind(new InetSocketAddress(hostname, port)); + +// 注册监听CONNECT事件 +serverChannel.register(selector, SelectionKey.OP_ACCEPT); +``` + +`register()` 方法的第二个参数表示需要监听的事件,它有以下四个可选值: + +```java +//读取事件 +public static final int OP_READ = 1 << 0; +//写入事件 +public static final int OP_WRITE = 1 << 2; +//连接事件 +public static final int OP_CONNECT = 1 << 3; +//接受连接事件 +public static final int OP_ACCEPT = 1 << 4; +``` + +如果你需要监听多个事件,可以使用位操作符进行连接: + +```java +int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; +``` + +除此之外,你还可以在注册时通过调用 register 的另外一个重载方法来指定附加信息: + +```java +register(Selector sel, int ops, Object att) +``` + +这个附加信息可以在事件触发时通过 SelectionKey 对象再次得到。 + +### 3.3 select + +当你在 Selector 上注册好通道后,就可以使用 `select()` 方法来获取处于就绪状态的事件的集合。示例如下: + +```java +int select = selector.select(); +``` + +select 有以下三个重载方法: + ++ **select()**:持续阻塞,直到至少有一个通道在其注册的事件上处于就绪状态; ++ **select(long timeout)**:与 `select()` 类似,但最长只阻塞 timout 毫秒; ++ **selectNow()**:不会阻塞,如果不存在就绪事件,则直接返回 0。 + +需要注意的是如果是 Ready 操作集发生了变化,select 操作的返回值也可能是 0。这意味着如果某个通道注册的是 `OP_READ` 事件,那么该通道在第一次收到消息时 select 操作返回的值是 1,但是之后收到消息时 select 的返回值则是 0。因此在循环获取消息时,对于 select 返回值的判断应该加上为 0 的情况: + +```java +while (selector.select() >= 0) { + .... +} +``` + +### 3.4 SelectionKey + +当存在注册的事件处于就绪状态时,可以通过 Selector 的 `selectedKeys()` 方法来获取处于就绪状态的事件信息: + +```java +Set selectionKeys = selector.selectedKeys(); +``` + +其返回的是 SelectionKey 的集合,SelectionKey 是对多个属性的综合封装: + +```java +public abstract class SelectionKey { + + // SelectionKey对应的channel + public abstract SelectableChannel channel(); + + // SelectionKey对应的选择器 + public abstract Selector selector(); + + // 当前SelectionKey是否有效 + public abstract boolean isValid(); + + // 取消channel在selector上注册的事件 + public abstract void cancel(); + + // 当前channel注册的事件的合集 + public abstract int interestOps(); + + // 当前channel是否对指定的事件感兴趣 + public abstract SelectionKey interestOps(int ops); + + // 处于就绪状态的事件的合集 + public abstract int readyOps(); + + // Read事件是否就绪 + public final boolean isReadable() {return (readyOps() & OP_READ) != 0;} + + // Write事件是否就绪 + public final boolean isWritable() {return (readyOps() & OP_WRITE) != 0;} + + // Connect事件是否就绪 + public final boolean isConnectable() {return (readyOps() & OP_CONNECT) != 0;} + + // Accept事件是否就绪 + public final boolean isAcceptable() {return (readyOps() & OP_ACCEPT) != 0;} + + // 为SelectionKey指定附加属性,也可以在注册时通过register方法指定 + public final Object attach(Object ob) {return attachmentUpdater.getAndSet(this, ob);} + + // 获取附加属性 + public final Object attachment() { return attachment;} + +} +``` + +## 四、聊天室实例 + +下面以一个群聊的聊天室为例,来展示 Java NIO 三大组件的综合使用,效果如下: + +![nio_chat_group](../pictures/nio_chat_group.png) + + + +### 4.1 群聊服务器 + +群聊服务器的实现如下: + +```java +package chat; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.nio.charset.StandardCharsets; +import java.util.Set; + +public class ChatServer { + + private String hostname; + private int port; + private Selector selector; + private final ByteBuffer rBuffer = ByteBuffer.allocate(1024); + private final ByteBuffer wBuffer = ByteBuffer.allocate(1024); + + ChatServer(int port) { + this("127.0.0.1", port); + } + + ChatServer(String hostname, int port) { + this.hostname = hostname; + this.port = port; + } + + public void start() { + try { + // 创建ServerSocketChannel + ServerSocketChannel serverChannel = ServerSocketChannel.open(); + // 设置为非阻塞模式 + serverChannel.configureBlocking(false); + serverChannel.bind(new InetSocketAddress(hostname, port)); + + // 创建selector + selector = Selector.open(); + // 注册监听CONNECT事件 + serverChannel.register(selector, SelectionKey.OP_ACCEPT); + + // 持续调用select() + while (true) { + selector.select(); + Set selectionKeys = selector.selectedKeys(); + for (SelectionKey selectionKey : selectionKeys) { + // 处理Accept事件 + if (selectionKey.isAcceptable()) { + // 获取ServerSocketChannel + ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel(); + // 通过ServerSocketChannel获取SocketChannel + SocketChannel clientChannel = server.accept(); + clientChannel.configureBlocking(false); + // 注册Read事件 + clientChannel.register(selector, SelectionKey.OP_READ); + System.out.println("欢迎" + clientChannel.socket().getPort() + "加入聊天室!"); + // 处理Readable事件 + } else if (selectionKey.isReadable()) { + handleMessage(selectionKey); + } + } + selectionKeys.clear(); + } + + } catch (IOException e) { + e.printStackTrace(); + } finally { + // 关闭selector后,与之相关的所有资源都会被释放 + try { + selector.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + /** + * 处理客户端消息 + * + * @param selectionKey 处于Read状态的SelectionKey + */ + private void handleMessage(SelectionKey selectionKey) { + SocketChannel clientChannel = (SocketChannel) selectionKey.channel(); + try { + // 获取来自客户端的消息 + StringBuilder buffer = new StringBuilder(); + while (clientChannel.read(rBuffer) > 0) { + rBuffer.flip(); + buffer.append(StandardCharsets.UTF_8.decode(rBuffer)); + rBuffer.clear(); + } + String msg = buffer.toString(); + // 将消息发送给其他客户端 + broadcastMessage(clientChannel, msg); + } catch (IOException e) { + // 该异常由clientChannel.read(rBuffer)方法抛出,如果出现该异常,则说明clientChannel已经关闭 + // 此时需要调用cancel()取消注册在selector上的事件 + selectionKey.cancel(); + // 在多线程环境下,如果另一个线程正在阻塞地调用select(),因为事件集已经改变, + // 因此需要通过wakeup()让其立刻返回并重新select() + selector.wakeup(); + System.out.println("用户" + clientChannel.socket().getPort() + "退出聊天室!"); + } + } + + /** + * 将客户端发来的消息广播给其他客户端 + * + * @param clientChannel 消息源 + * @param msg 消息 + */ + private void broadcastMessage(SocketChannel clientChannel, String msg) { + try { + // 遍历当前selector上所有channel + for (SelectionKey selectionKey : selector.keys()) { + SelectableChannel channel = selectionKey.channel(); + // 消息不需要转发给ServerSocketChannel和当前客户端自己 + if (selectionKey.isValid() && !(channel instanceof ServerSocketChannel) + && !channel.equals(clientChannel)) { + SocketChannel otherClient = (SocketChannel) channel; + wBuffer.put(("用户" + clientChannel.socket().getPort() + ": " + msg).getBytes()); + while (wBuffer.hasRemaining()) { + wBuffer.flip(); + otherClient.write(wBuffer); + } + wBuffer.clear(); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + + } + + public static void main(String[] args) { + new ChatServer(8888).start(); + } +} + +``` + +### 4.2 客户端实现 + +```java +package chat; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedSelectorException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.util.Scanner; +import java.util.Set; + +public class ChatClient { + + private String hostname; + private int port; + private Selector selector; + private final ByteBuffer rBuffer = ByteBuffer.allocate(1024); + private final ByteBuffer wBuffer = ByteBuffer.allocate(1024); + + ChatClient(String hostname, int port) { + this.hostname = hostname; + this.port = port; + } + + public void start() { + try { + // 创建SocketChannel + SocketChannel socketChannel = SocketChannel.open(); + socketChannel.configureBlocking(false); + socketChannel.connect(new InetSocketAddress(hostname, port)); + + // 创建selector + selector = Selector.open(); + // 注册监听CONNECT事件 + socketChannel.register(selector, SelectionKey.OP_CONNECT); + + // 持续调用select + while (true) { + selector.select(); + Set selectionKeys = selector.selectedKeys(); + for (SelectionKey selectionKey : selectionKeys) { + // 处理Connect事件 + if (selectionKey.isConnectable()) { + SocketChannel clientChannel = (SocketChannel) selectionKey.channel(); + // 有可能正处于连接中的状态 + if (clientChannel.isConnectionPending()) { + // 等待连接完成 + clientChannel.finishConnect(); + // 开始监听用户输入 + inputListening(clientChannel); + // 为clientChannel注册上Read + clientChannel.register(selector, SelectionKey.OP_READ); + System.out.println("尊敬的用户" + clientChannel.socket().getLocalPort() + + ", 你已成功加入群聊!"); + } + // 处理Read事件 + } else if (selectionKey.isReadable()) { + handleMessage(selectionKey); + } + } + } + } catch (ClosedSelectorException e) { + // 主动关闭客户端,不做任何处理 + } catch (IOException e) { + e.printStackTrace(); + } finally { + try { + selector.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + /** + * 监听用户来自控制台的输入 + * + * @param clientChannel 客户端Channel + */ + private void inputListening(SocketChannel clientChannel) { + // 要持续监听用户输入,但又不能阻塞主线程,所以需要一个单独的线程来完成 + new Thread(() -> { + Scanner scanner = new Scanner(System.in); + try { + while (scanner.hasNextLine()) { + String input = scanner.nextLine(); + if (!input.isEmpty()) { + if ("exit".equals(input)) { + selector.close(); + return; + } + wBuffer.put(input.getBytes()); + wBuffer.flip(); + while (wBuffer.hasRemaining()) { + clientChannel.write(wBuffer); + } + wBuffer.clear(); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + }).start(); + } + + /** + * 处理来自其他客户端的消息 + * + * @param selectionKey 处于Read状态的selectionKey + */ + private void handleMessage(SelectionKey selectionKey) { + try { + SocketChannel channel = (SocketChannel) selectionKey.channel(); + StringBuilder buffer = new StringBuilder(); + while (channel.read(rBuffer) > 0) { + rBuffer.flip(); + buffer.append(StandardCharsets.UTF_8.decode(rBuffer)); + rBuffer.clear(); + } + System.out.println(buffer.toString()); + } catch (IOException e) { + selectionKey.cancel(); + selector.wakeup(); + System.out.println("聊天室服务器已关闭!"); + } + } + + + public static void main(String[] args) { + new ChatClient("127.0.0.1", 8888).start(); + } +} + +``` + + + + + + + diff --git a/pictures/buffer_clear.png b/pictures/buffer_clear.png new file mode 100644 index 0000000..6cecb1b Binary files /dev/null and b/pictures/buffer_clear.png differ diff --git a/pictures/buffer_compact.png b/pictures/buffer_compact.png new file mode 100644 index 0000000..c9d7d18 Binary files /dev/null and b/pictures/buffer_compact.png differ diff --git a/pictures/buffer_compact_flip.png b/pictures/buffer_compact_flip.png new file mode 100644 index 0000000..c0acac9 Binary files /dev/null and b/pictures/buffer_compact_flip.png differ diff --git a/pictures/buffer_duplicate.png b/pictures/buffer_duplicate.png new file mode 100644 index 0000000..7150af6 Binary files /dev/null and b/pictures/buffer_duplicate.png differ diff --git a/pictures/buffer_flip.png b/pictures/buffer_flip.png new file mode 100644 index 0000000..5f0286a Binary files /dev/null and b/pictures/buffer_flip.png differ diff --git a/pictures/buffer_get.png b/pictures/buffer_get.png new file mode 100644 index 0000000..65b74ed Binary files /dev/null and b/pictures/buffer_get.png differ diff --git a/pictures/buffer_init.png b/pictures/buffer_init.png new file mode 100644 index 0000000..5cc7115 Binary files /dev/null and b/pictures/buffer_init.png differ diff --git a/pictures/buffer_put.png b/pictures/buffer_put.png new file mode 100644 index 0000000..aaa7efd Binary files /dev/null and b/pictures/buffer_put.png differ diff --git a/pictures/buffer_slice.png b/pictures/buffer_slice.png new file mode 100644 index 0000000..f8460ec Binary files /dev/null and b/pictures/buffer_slice.png differ diff --git a/pictures/java_nio.pptx b/pictures/java_nio.pptx new file mode 100644 index 0000000..b86e79d Binary files /dev/null and b/pictures/java_nio.pptx differ diff --git a/pictures/nio_buffer.png b/pictures/nio_buffer.png new file mode 100644 index 0000000..c2e0b3f Binary files /dev/null and b/pictures/nio_buffer.png differ diff --git a/pictures/nio_chat_group.png b/pictures/nio_chat_group.png new file mode 100644 index 0000000..589a058 Binary files /dev/null and b/pictures/nio_chat_group.png differ diff --git a/pictures/nio_gather.png b/pictures/nio_gather.png new file mode 100644 index 0000000..b194381 Binary files /dev/null and b/pictures/nio_gather.png differ diff --git a/pictures/nio_pipe.png b/pictures/nio_pipe.png new file mode 100644 index 0000000..9e42ed6 Binary files /dev/null and b/pictures/nio_pipe.png differ diff --git a/pictures/nio_scatter.png b/pictures/nio_scatter.png new file mode 100644 index 0000000..8669759 Binary files /dev/null and b/pictures/nio_scatter.png differ