This commit is contained in:
罗祥 2020-04-20 15:50:13 +08:00
parent 4bef86747a
commit 18d9db85c4
6 changed files with 424 additions and 326 deletions

View File

@ -34,19 +34,19 @@
JConsole、VisualVM 、监控本地进程、监控远程进程
7. IO、NIO、AIO
7. [Java NIO 核心组件详解](notes/Java_NIO.md)
8. Socket 网络编程系列
缓冲区 Buffer、通道 Channel、选择器 Selector、实现多人聊天室
9. 高性能网络框架 Netty
8. 高性能网络框架 Netty
9. [Tomcat 架构解析](notes/Tomcat_架构解析.md)
核心组件、连接器、多层容器、请求处理流程、程序启动过程、类加载器
10. Java 集合类源码解析
11. [Tomcat 架构解析](notes/Tomcat_架构解析.md)
核心组件、连接器、多层容器、请求处理流程、程序启动过程、类加载器
12. 函数式编程
11. 函数式编程
<br/>

View File

@ -1,18 +1,95 @@
## 一、Buffer
# Java NIO
### 1.1 缓冲区属性
<nav>
<a href="#一简介">一、简介</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#11-面向缓冲">1.1 面向缓冲</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#12-同步非阻塞">1.2 同步非阻塞</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#13-多路复用">1.3 多路复用</a><br/>
<a href="#二Buffer">二、Buffer</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#21-缓冲区属性">2.1 缓冲区属性</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#22-创建缓冲区">2.2 创建缓冲区</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#23-操作缓冲区">2.3 操作缓冲区</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#24-复制缓冲区">2.4 复制缓冲区</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#25-直接缓冲区">2.5 直接缓冲区</a><br/>
<a href="#三Channel">三、Channel</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#31-通道基础">3.1 通道基础</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#32-文件通道">3.2 文件通道</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#33-Channel-To-Channel">3.3 Channel To Channel</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#34-ScatterGather">3.4 Scatter/Gather</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#35-Pipe">3.5 Pipe</a><br/>
<a href="#四Selector">四、Selector</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#41-创建选择器">4.1 创建选择器</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#42-注册通道">4.2 注册通道</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#43-select">4.3 select</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#44-SelectionKey">4.4 SelectionKey</a><br/>
<a href="#五聊天室实例">五、聊天室实例</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#51-群聊服务器">5.1 群聊服务器</a><br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;<a href="#52-客户端实现">5.2 客户端实现</a><br/>
</nav>
![nio_buffer](../pictures/nio_buffer.png)
## 一、简介
所有缓冲区都直接或间接继承自 Buffer 抽象类Buffer 中定义了缓冲区的四个基本属性:
Java NIO 是 JDK 1.4 中引入的新的 IO 方式,它主要包含 Buffer、Channel、Selector 这三个核心的组件,它与传统 IO 的区别如下:
<table>
<tr>
<th>NIO</th>
<th>IO</th>
</tr>
<tr>
<td>面向缓冲</td>
<td>面向流</td>
</tr>
<tr>
<td>同步非阻塞</td>
<td>同步阻塞</td>
</tr>
<tr>
<td>多路复用(选择器)</td>
<td></td>
</tr>
</table>
### 1.1 面向缓冲
传统的 IO 是面向流的,传统 IO 每次从流中读取一个或者多个字节,直至读取完所有的字节。而 NIO 是面向缓冲区的,所有的读写操作都需要通过 Buffer 来完成,数据会被先写入 Buffer 中然后再进行处理Buffer 提供了多种方法用于操纵其中的数据,因此其在操作上更加灵活,读取速度也更加快。
### 1.2 同步非阻塞
传统 IO 的流都是单向的,因此它们需要分为 Input Stream 和 Output Stream。而 NIO 中的 Channel 则是双向的,数据可以从 Channel 读到 Buffer 中,也可以从 Buffer 写到 Channel
<div align="center"> <img src="../pictures/nio_channel_buffer.png"/> </div>
> 注意:从 Channel 写入到 Buffer 执行的是 read 方法,而从 Buffer 写出到 Channel 执行的是 write 方法。
Channel 可以设置为非阻塞模式,此时当 Channel 从 Buffer 中读取数据时,如果有待读取的数据则返回该数据;如果没有待读取的数据,对应的方法也不会阻塞,而是直接返回。
### 1.3 多路复用
Java NIO 通过 Reactor 模型实现了 IO 的多路复用可以在一个线程上通过一个选择Selector使用轮询的方式去监听多个通道 Channel 上注册的事件,从而在一个线程上就能实现对多个 Channel 的处理:
<div align="center"> <img src="../pictures/nio_selector.png"/> </div>
## 二、Buffer
### 2.1 缓冲区属性
<div align="center"> <img src="../pictures/nio_buffer.png"/> </div>
所有缓冲区ByteBuffer、FloatBuffer、IntBuffer、DoubleBuffer、ShortBuffer、LongBuffer、CharBuffer、MappedByteBuffer都直接或间接继承自 Buffer 抽象类Buffer 中定义了缓冲区的四个基本属性:
```java
public abstract class Buffer {
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
private int limit;
private int position = 0;
private int mark = -1;
...
}
```
@ -22,7 +99,7 @@ public abstract class Buffer {
+ **位置 (Position)**:下一个待操作元素的索引。
+ **标记 (Mark)**:标记位置。通过 `mark()` 方法可以让 mark 等于当前 position之后通过 `reset()` 方法可以让 position 恢复到标记位置。
### 1.2 创建缓冲区
### 2.2 创建缓冲区
通常可以通过以下两种方法来创建缓冲区:
@ -39,7 +116,7 @@ char[] chars = new char[100];
CharBuffer buffer = CharBuffer.wrap(chars);
```
实际上缓冲区内部就是通过数组来存储元素,以 CharBuffer 为例,它的内部维持有一个名为 `hb` 的数组,用来存放实际的元素:
实际上,在缓冲区内部就是通过数组来存储元素,以 CharBuffer 为例,它的内部维持有一个名为 `hb` 的数组,用来存放实际的元素:
```java
public abstract class CharBuffer extends Buffer implements Comparable<CharBuffer>, Appendable, CharSequence, Readable
@ -51,9 +128,10 @@ public abstract class CharBuffer extends Buffer implements Comparable<CharBuffer
缓冲区创建完成后,它处于以下初始状态:
![buffer_init](../pictures/buffer_init.png)
<div align="center"> <img src="../pictures/buffer_init.png"/> </div>
### 1.3 操作缓冲区
### 2.3 操作缓冲区
**1. put()**
@ -72,7 +150,8 @@ put(char[] src, int offset, int length)
当我们向 Buffer 中添加数据后position 属性也会随之变动:
![buffer_put](../pictures/buffer_put.png)
<div align="center"> <img src="../pictures/buffer_put.png"/> </div>
**2. get()**
@ -105,31 +184,36 @@ buffer.clear();
当使用 `filp()` 将 Buffer 由写模式切换到读模式后position 属性会恢复到初始位置代表从此处开始读取数据limit 属性也会随之变动,代表我们所能读取数据的上界:
![buffer_flip](../pictures/buffer_flip.png)
<div align="center"> <img src="../pictures/buffer_flip.png"/> </div>
当我们再通过 `get()` 方法进行读取时position 属性会随之移动position 和 limit 之间的数据就是待处理的数据:
![buffer_get](../pictures/buffer_get.png)
当我们再通过 `get()` 方法进行读取时position 属性会随之移动position 和 limit 之间就是待处理的数据:
<div align="center"> <img src="../pictures/buffer_get.png"/> </div>
**4. hasRemaining()**
`hasRemaining()` 用于判断当前的 position 是否小于 limit如果 position 小于 limit则返回 true代表仍存在待处理的数据。
`hasRemaining()` 用于判断当前的 position 是否小于 limit如果 position 小于 limit则返回 true代表仍待处理的数据。
**5. clear()**
将 position 设置为 0并将 limit 设置为 capacity 的大小,从而让缓冲区恢复到初始状态。此时缓冲区的状态如下
`clear()` 并不会真的清除缓冲区中的数据,它只是将 position 设置为 0并将 limit 设置为 capacity 的大小,从而让缓冲区恢复到初始状态:
![buffer_clear](../pictures/buffer_clear.png)
<div align="center"> <img src="../pictures/buffer_clear.png"/> </div>
当有新的数据写入时,新的数据会覆盖原有位置上的数据。
**6. compact()**
用于压缩缓冲区,即将数组中待处理的数据复制到头部。如下所示,会将未读取的 `LL0` 复制到头部:
![buffer_compact](../pictures/buffer_compact.png)
<div align="center"> <img src="../pictures/buffer_compact.png"/> </div>
需要注意的是这里执行的是复制操作,而不是移动操作,底层调用的是 `System.arraycopy` 方法,因此原有位置上的数据依然存在。但是由于 position 会移动到未处理数据的下一个位置上,所以不用担心原有位置上的数据会被读取到,原因是你切换到读模式时,原有的 `LO` 数据仍处于 limit 之后:
![buffer_compact_flip](../pictures/buffer_compact_flip.png)
需要注意的是这里执行的是复制操作,而不是移动操作,底层调用的是 `System.arraycopy` 方法,因此原有位置上的数据依然存在。但由于 position 会移动到未处理数据的下一个位置上,所以不用担心原有位置上的数据会被读取到,原因是你切换到读模式时,原有的 `LO` 数据仍处于 limit 之后:
<div align="center"> <img src="../pictures/buffer_compact_flip.png"/> </div>
**7. mark()**
@ -139,7 +223,7 @@ buffer.clear();
buffer.position(2).mark().position(5).reset().position(); //从位置2移动到位置5之后又恢复到位置2
```
### 1.4 复制缓冲区
### 2.4 复制缓冲区
如果想要对一个已有的缓冲区进行复制,可以有以下三种方法:
@ -155,15 +239,17 @@ public abstract CharBuffer slice();
- 复制缓冲区的 mark、position、limit、capcaity 属性的初始值与复制时原缓冲区的 mark、position、limit、capcaity 的值相同,但这些属性与原缓冲区的属性相互独立,创建后就不再受原有缓冲区的影响;
- 如果原缓冲区是只读缓冲区或直接缓冲区,则复制缓冲区也将继承这些属性。
![buffer_duplicate](../pictures/buffer_duplicate.png)
<div align="center"> <img src="../pictures/buffer_duplicate.png"/> </div>
`asReadOnlyBuffer()``duplicate()` 类似,但创建的复制缓冲区为只读缓冲区。
`slice()` 也与 `duplicate()` 类似,但创建的复制缓冲区与原缓冲区只共享部分数据元素,并且所有标志位都处于原始状态:
![buffer_slice](../pictures/buffer_slice.png)
<div align="center"> <img src="../pictures/buffer_slice.png"/> </div>
示例如下:
使用示例如下:
```java
CharBuffer buffer = CharBuffer.allocate(100);
@ -178,9 +264,9 @@ System.out.println("duplicate:" + duplicate.position() + "," + duplicate.limit()
System.out.println("slice:" + slice.position() + "," + slice.limit()); //slice:0,3
```
### 1.5 直接缓冲区
### 2.5 直接缓冲区
ByteBuffer 支持使用 `allocateDirect` 方法来创建直接缓冲区,示例如下:
ByteBuffer 支持使用 `allocateDirect()` 方法来创建直接缓冲区,示例如下:
```java
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
@ -188,13 +274,11 @@ ByteBuffer byteBuffer = ByteBuffer.allocate(100);
## 三、Channel
### 3.1 通道基础
## 二、Channel
### 2.1 通道基础
Channel 接口的定义如下,其中包含了两个基础方法:
Channel 接口的定义如下,其中定义了两个基础方法:
+ **isOpen()**:判断当前 Channel 是否处于打开状态;
+ **close()**:关闭当前 Channel 。Channel 关闭后,就不能在其上再进行任何 IO 操作,否则将抛出 ClosedChannelException 异常。
@ -224,11 +308,12 @@ FileInputStream fileInputStream = new FileInputStream(new File("path"));
FileChannel fileChannel = fileInputStream.getChannel();
```
### 2.2 文件通道
### 3.2 文件通道
FileChannel 是一个连接到文件的通道,通过该通道可以完成文件的读写。需要注意的是 FileChannel 无法设置为非阻塞模式,因此它总是运行在阻塞模式下。通过 FileChannel 和 Buffer 可以高效地完成文件的拷贝,示例如下:
FileChannel 是一个连接到文件的通道,通过该通道可以完成文件的读写。另外 FileChannel 无法设置为非阻塞模式因为对文件读写操作设置非阻塞并没有什么意义。FileChannel 的使用示例如下:
```java
// 示例:文件拷贝
try {
FileInputStream inputStream = new FileInputStream(new File("D:\\a.png"));
FileOutputStream outputStream = new FileOutputStream(new File("D:\\b.png"));
@ -275,7 +360,7 @@ public void close() throws IOException {
}
```
### 2.3 Channel To Channel
### 3.3 Channel To Channel
在 Java NIO 中,如果两个 Channel 中有一个是 FileChannel那么可以直接将数据从一个 Channel 传输到另外一个 Channel
@ -290,20 +375,20 @@ transferFrom(ReadableByteChannel src, long position, long count)
```java
try {
FileInputStream inputStream = new FileInputStream(new File("D:\\a.png"));
FileOutputStream outputStream = new FileOutputStream(new File("D:\\b.png"));
FileChannel inChannel = inputStream.getChannel();
FileChannel outChannel = outputStream.getChannel();
inChannel.transferTo(0,inChannel.size(),outChannel); //使用transferTo实现
// outChannel.transferFrom(inChannel, 0, inChannel.size()); //使用transferFrom实现
inputStream.close();
outputStream.close();
FileInputStream inputStream = new FileInputStream(new File("D:\\a.png"));
FileOutputStream outputStream = new FileOutputStream(new File("D:\\b.png"));
FileChannel inChannel = inputStream.getChannel();
FileChannel outChannel = outputStream.getChannel();
inChannel.transferTo(0,inChannel.size(),outChannel); //使用transferTo实现
// outChannel.transferFrom(inChannel, 0, inChannel.size()); //使用transferFrom实现
inputStream.close();
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
e.printStackTrace();
}
```
### 2.4 Scatter/Gather
### 3.4 Scatter/Gather
Java NIO 支持 scatter 和 gather 操作:
@ -320,7 +405,8 @@ fileInputStream.getChannel().read(buffers);
此时 Channel 中的数据会依次写入到 Buffer01 Buffer02 Buffer03 上。Scatter 通常用于固定长度数据的处理,假设一个数据单元由 headerbodyfooter 三部分组成,并且每部分的长度都是固定的,此时通过 Scatter 操作,每一组数据的 headerbodyfooter 都会分别固定地写到 Buffer01 Buffer02 Buffer03 上,此时就可以对每个 Buffer 应用不同的处理逻辑:
![nio_scatter](../pictures/nio_scatter.png)
<div align="center"> <img src="../pictures/nio_scatter.png"/> </div>
+ **聚集 (gather)**:将多个 Buffer 中的数据依次写入到同一个 Channel 上。示例如下:
@ -333,15 +419,17 @@ ByteBuffer[] buffers = new ByteBuffer[]{buffer01, buffer02, buffer03};
fileInputStream.getChannel().read(buffers);
```
![nio_gather](../pictures/nio_gather.png)
<div align="center"> <img src="../pictures/nio_gather.png"/> </div>
### 2.5 Pipe
Java 提供了 Pipe 类用于在不同线程之间传递数据:
### 3.5 Pipe
![nio_pipe](../pictures/nio_pipe.png)
Java NIO 还提供了 Pipe 管道用于在不同线程之间传递数据:
Pipe 管道可以通过 Pipe 的静态方法 `open()` 来创建:
<div align="center"> <img src="../pictures/nio_pipe.png"/> </div>
Pipe 管道可以通过 Pipe 类的静态方法 `open()` 来创建:
```java
Pipe pipe = Pipe.open();
@ -349,7 +437,7 @@ Pipe pipe = Pipe.open();
创建完成后可以通过其 `sink()``source()` 方法来创建对应的 SinkChannel 和 SourceChannel
```shell
```java
Pipe.SinkChannel sinkChannel = pipe.sink();
Pipe.SourceChannel sourceChannel = pipe.source();
```
@ -359,44 +447,44 @@ SinkChannel 和 SourceChannel 的使用与基本的 Channel 类似,示例如
```java
Pipe pipe = Pipe.open();
new Thread(() -> {
try {
Pipe.SinkChannel sink = pipe.sink();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("HelloWorld".getBytes());
buffer.flip();
while (buffer.hasRemaining()) {
// 将数据写入SinkChannel
sink.write(buffer);
}
sink.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
Pipe.SinkChannel sink = pipe.sink();
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("HelloWorld".getBytes());
buffer.flip();
while (buffer.hasRemaining()) {
// 将数据写入SinkChannel
sink.write(buffer);
}
sink.close();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
Pipe.SourceChannel source = pipe.source();
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读取SourceChannel中的数据
while (source.read(buffer) > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get()); //输出HelloWorld
}
buffer.clear();
}
source.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
Pipe.SourceChannel source = pipe.source();
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读取SourceChannel中的数据
while (source.read(buffer) > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get()); //输出HelloWorld
}
buffer.clear();
}
source.close();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
```
## 、Selector
## 、Selector
### 3.1 创建选择器
### 4.1 创建选择器
想要创建一个选择器,可以通过 Selector 类的静态方法 `open()` 来实现:
@ -404,9 +492,9 @@ new Thread(() -> {
Selector selector = Selector.open();
```
### 3.2 注册通道
### 4.2 注册通道
想要将 Channel 和 Selector 配合使用,必须要通过 `register()` 方法将 Channel 注册到 Selector 上,示例如下:
之后须要通过 `register()` 方法将 Channel 注册到 Selector 上,示例如下:
```java
// 创建ServerSocketChannel
@ -446,7 +534,7 @@ register(Selector sel, int ops, Object att)
这个附加信息可以在事件触发时通过 SelectionKey 对象再次得到。
### 3.3 select
### 4.3 select
当你在 Selector 上注册好通道后,就可以使用 `select()` 方法来获取处于就绪状态的事件的集合。示例如下:
@ -460,17 +548,18 @@ select 有以下三个重载方法:
+ **select(long timeout)**:与 `select()` 类似,但最长只阻塞 timout 毫秒;
+ **selectNow()**:不会阻塞,如果不存在就绪事件,则直接返回 0。
需要注意的是如果是 Ready 操作集发生了变化select 操作的返回值也可能是 0。这意味着如果某个通道注册的是 `OP_READ` 事件,那么该通道在第一次收到消息时 select 操作返回的值是 1但是之后收到消息时 select 的返回值则是 0。因此在循环获取消息时对于 select 返回值的判断应该加上为 0 的情况:
需要注意的是如果是 Ready 操作集发生了变化select 操作的返回值也可能是 0。这意味着如果某个通道注册的是 `OP_READ` 事件,那么该通道在第一次收到消息时select 操作返回的值是 1但是之后收到消息时select 的返回值却可能是 0。因此在循环获取消息时对于 select 返回值的判断应该加上为 0 的情况:
```java
// 如果选择器上存在就绪事件,则进行处理
while (selector.select() >= 0) {
....
}
```
### 3.4 SelectionKey
### 4.4 SelectionKey
存在注册的事件处于就绪状态时,可以通过 Selector 的 `selectedKeys()` 方法来获取处于就绪状态的事件信息:
当注册的事件处于就绪状态时,可以通过 Selector 的 `selectedKeys()` 方法来获取处于就绪状态的事件信息:
```java
Set<SelectionKey> selectionKeys = selector.selectedKeys();
@ -523,15 +612,18 @@ public abstract class SelectionKey {
}
```
## 四、聊天室实例
## 五、聊天室实例
下面以一个群聊的聊天室为例,来展示 Java NIO 三大组件的综合使用,效果如下:
![nio_chat_group](../pictures/nio_chat_group.png)
<div align="center"> <img src="../pictures/nio_chat_group.png"/> </div>
### 4.1 群聊服务器
### 5.1 群聊服务器
群聊服务器的实现如下:
@ -547,135 +639,136 @@ import java.util.Set;
public class ChatServer {
private String hostname;
private int port;
private Selector selector;
private final ByteBuffer rBuffer = ByteBuffer.allocate(1024);
private final ByteBuffer wBuffer = ByteBuffer.allocate(1024);
private String hostname;
private int port;
private Selector selector;
private final ByteBuffer rBuffer = ByteBuffer.allocate(1024);
private final ByteBuffer wBuffer = ByteBuffer.allocate(1024);
ChatServer(int port) {
this("127.0.0.1", port);
}
ChatServer(int port) {
this("127.0.0.1", port);
}
ChatServer(String hostname, int port) {
this.hostname = hostname;
this.port = port;
}
ChatServer(String hostname, int port) {
this.hostname = hostname;
this.port = port;
}
public void start() {
try {
// 创建ServerSocketChannel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置为非阻塞模式
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(hostname, port));
public void start() {
try {
// 创建ServerSocketChannel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置为非阻塞模式
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(hostname, port));
// 创建selector
selector = Selector.open();
// 注册监听CONNECT事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// 创建selector
selector = Selector.open();
// 注册监听CONNECT事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// 持续调用select()
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
// 处理Accept事件
if (selectionKey.isAcceptable()) {
// 获取ServerSocketChannel
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
// 通过ServerSocketChannel获取SocketChannel
SocketChannel clientChannel = server.accept();
clientChannel.configureBlocking(false);
// 注册Read事件
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println("欢迎" + clientChannel.socket().getPort() + "加入聊天室!");
// 处理Readable事件
} else if (selectionKey.isReadable()) {
handleMessage(selectionKey);
}
}
selectionKeys.clear();
}
// 持续调用select()
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
// 处理Accept事件
if (selectionKey.isAcceptable()) {
// 获取ServerSocketChannel
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
// 通过ServerSocketChannel获取SocketChannel
SocketChannel clientChannel = server.accept();
clientChannel.configureBlocking(false);
// 注册Read事件
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println("欢迎" + clientChannel.socket().getPort() + "加入聊天室!");
// 处理Readable事件
} else if (selectionKey.isReadable()) {
handleMessage(selectionKey);
}
}
selectionKeys.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭selector后与之相关的所有资源都会被释放
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭selector后与之相关的所有资源都会被释放
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 处理客户端消息
*
* @param selectionKey 处于Read状态的SelectionKey
*/
private void handleMessage(SelectionKey selectionKey) {
SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
try {
// 获取来自客户端的消息
StringBuilder buffer = new StringBuilder();
while (clientChannel.read(rBuffer) > 0) {
rBuffer.flip();
buffer.append(StandardCharsets.UTF_8.decode(rBuffer));
rBuffer.clear();
}
String msg = buffer.toString();
// 将消息发送给其他客户端
broadcastMessage(clientChannel, msg);
} catch (IOException e) {
// 该异常由clientChannel.read(rBuffer)方法抛出如果出现该异常则说明clientChannel已经关闭
// 此时需要调用cancel()取消注册在selector上的事件
selectionKey.cancel();
// 在多线程环境下如果另一个线程正在阻塞地调用select(),因为事件集已经改变,
/**
* 处理客户端消息
*
* @param selectionKey 处于Read状态的SelectionKey
*/
private void handleMessage(SelectionKey selectionKey) {
SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
try {
// 获取来自客户端的消息
StringBuilder buffer = new StringBuilder();
while (clientChannel.read(rBuffer) > 0) {
rBuffer.flip();
buffer.append(StandardCharsets.UTF_8.decode(rBuffer));
rBuffer.clear();
}
String msg = buffer.toString();
// 将消息发送给其他客户端
broadcastMessage(clientChannel, msg);
} catch (IOException e) {
// 该异常由clientChannel.read(rBuffer)方法抛出如果出现该异常则说明clientChannel已经关闭
// 此时需要调用cancel()取消注册在selector上的事件
selectionKey.cancel();
// 在多线程环境下如果另一个线程正在阻塞地调用select(),因为事件集已经改变,
// 因此需要通过wakeup()让其立刻返回并重新select()
selector.wakeup();
System.out.println("用户" + clientChannel.socket().getPort() + "退出聊天室!");
}
}
selector.wakeup();
System.out.println("用户" + clientChannel.socket().getPort() + "退出聊天室!");
}
}
/**
* 将客户端发来的消息广播给其他客户端
*
* @param clientChannel 消息源
* @param msg 消息
*/
private void broadcastMessage(SocketChannel clientChannel, String msg) {
try {
// 遍历当前selector上所有channel
for (SelectionKey selectionKey : selector.keys()) {
SelectableChannel channel = selectionKey.channel();
// 消息不需要转发给ServerSocketChannel和当前客户端自己
if (selectionKey.isValid() && !(channel instanceof ServerSocketChannel)
&& !channel.equals(clientChannel)) {
SocketChannel otherClient = (SocketChannel) channel;
wBuffer.put(("用户" + clientChannel.socket().getPort() + ": " + msg).getBytes());
while (wBuffer.hasRemaining()) {
wBuffer.flip();
otherClient.write(wBuffer);
}
wBuffer.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
/**
* 将客户端发来的消息广播给其他客户端
*
* @param clientChannel 消息源
* @param msg 消息
*/
private void broadcastMessage(SocketChannel clientChannel, String msg) {
try {
// 遍历当前selector上所有channel
for (SelectionKey selectionKey : selector.keys()) {
SelectableChannel channel = selectionKey.channel();
// 消息不需要转发给ServerSocketChannel和当前客户端自己
if (selectionKey.isValid() && !(channel instanceof ServerSocketChannel)
&& !channel.equals(clientChannel)) {
SocketChannel otherClient = (SocketChannel) channel;
wBuffer.put(("用户" + clientChannel.socket().getPort() + ": " + msg).getBytes());
while (wBuffer.hasRemaining()) {
wBuffer.flip();
otherClient.write(wBuffer);
}
wBuffer.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new ChatServer(8888).start();
}
public static void main(String[] args) {
new ChatServer(8888).start();
}
}
```
### 4.2 客户端实现
### 5.2 客户端实现
客户端的实现如下:
```java
package chat;
@ -693,130 +786,135 @@ import java.util.Set;
public class ChatClient {
private String hostname;
private int port;
private Selector selector;
private final ByteBuffer rBuffer = ByteBuffer.allocate(1024);
private final ByteBuffer wBuffer = ByteBuffer.allocate(1024);
private String hostname;
private int port;
private Selector selector;
private final ByteBuffer rBuffer = ByteBuffer.allocate(1024);
private final ByteBuffer wBuffer = ByteBuffer.allocate(1024);
ChatClient(String hostname, int port) {
this.hostname = hostname;
this.port = port;
}
ChatClient(String hostname, int port) {
this.hostname = hostname;
this.port = port;
}
public void start() {
try {
// 创建SocketChannel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(hostname, port));
public void start() {
try {
// 创建SocketChannel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(hostname, port));
// 创建selector
selector = Selector.open();
// 注册监听CONNECT事件
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 创建selector
selector = Selector.open();
// 注册监听CONNECT事件
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 持续调用select
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
// 处理Connect事件
if (selectionKey.isConnectable()) {
SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
// 有可能正处于连接中的状态
if (clientChannel.isConnectionPending()) {
// 等待连接完成
clientChannel.finishConnect();
// 开始监听用户输入
inputListening(clientChannel);
// 为clientChannel注册上Read
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println("尊敬的用户" + clientChannel.socket().getLocalPort()
+ ", 你已成功加入群聊!");
}
// 处理Read事件
} else if (selectionKey.isReadable()) {
handleMessage(selectionKey);
}
}
}
} catch (ClosedSelectorException e) {
// 主动关闭客户端,不做任何处理
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 持续调用select
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey selectionKey : selectionKeys) {
// 处理Connect事件
if (selectionKey.isConnectable()) {
SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
// 有可能正处于连接中的状态
if (clientChannel.isConnectionPending()) {
// 等待连接完成
clientChannel.finishConnect();
// 开始监听用户输入
inputListening(clientChannel);
// 为clientChannel注册上Read
clientChannel.register(selector, SelectionKey.OP_READ);
System.out.println("尊敬的用户" + clientChannel.socket().getLocalPort()
+ ", 你已成功加入群聊!");
}
// 处理Read事件
} else if (selectionKey.isReadable()) {
handleMessage(selectionKey);
}
}
}
} catch (ClosedSelectorException e) {
// 主动关闭客户端,不做任何处理
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 监听用户来自控制台的输入
*
* @param clientChannel 客户端Channel
*/
private void inputListening(SocketChannel clientChannel) {
// 要持续监听用户输入,但又不能阻塞主线程,所以需要一个单独的线程来完成
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
try {
while (scanner.hasNextLine()) {
String input = scanner.nextLine();
if (!input.isEmpty()) {
if ("exit".equals(input)) {
selector.close();
return;
}
wBuffer.put(input.getBytes());
wBuffer.flip();
while (wBuffer.hasRemaining()) {
clientChannel.write(wBuffer);
}
wBuffer.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
/**
* 监听用户来自控制台的输入
*
* @param clientChannel 客户端Channel
*/
private void inputListening(SocketChannel clientChannel) {
// 要持续监听用户输入,但又不能阻塞主线程,所以需要一个单独的线程来完成
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
try {
while (scanner.hasNextLine()) {
String input = scanner.nextLine();
if (!input.isEmpty()) {
if ("exit".equals(input)) {
selector.close();
return;
}
wBuffer.put(input.getBytes());
wBuffer.flip();
while (wBuffer.hasRemaining()) {
clientChannel.write(wBuffer);
}
wBuffer.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
/**
* 处理来自其他客户端的消息
*
* @param selectionKey 处于Read状态的selectionKey
*/
private void handleMessage(SelectionKey selectionKey) {
try {
SocketChannel channel = (SocketChannel) selectionKey.channel();
StringBuilder buffer = new StringBuilder();
while (channel.read(rBuffer) > 0) {
rBuffer.flip();
buffer.append(StandardCharsets.UTF_8.decode(rBuffer));
rBuffer.clear();
}
System.out.println(buffer.toString());
} catch (IOException e) {
selectionKey.cancel();
selector.wakeup();
System.out.println("聊天室服务器已关闭!");
}
}
/**
* 处理来自其他客户端的消息
*
* @param selectionKey 处于Read状态的selectionKey
*/
private void handleMessage(SelectionKey selectionKey) {
try {
SocketChannel channel = (SocketChannel) selectionKey.channel();
StringBuilder buffer = new StringBuilder();
while (channel.read(rBuffer) > 0) {
rBuffer.flip();
buffer.append(StandardCharsets.UTF_8.decode(rBuffer));
rBuffer.clear();
}
System.out.println(buffer.toString());
} catch (IOException e) {
selectionKey.cancel();
selector.wakeup();
System.out.println("聊天室服务器已关闭!");
}
}
public static void main(String[] args) {
new ChatClient("127.0.0.1", 8888).start();
}
public static void main(String[] args) {
new ChatClient("127.0.0.1", 8888).start();
}
}
```
## 参考资料
+ [Ron Hitchens . Java NIO . O'Reilly Media . 2002-08-15](https://book.douban.com/subject/1433583/)
+ [Java NIO Tutorial](http://tutorials.jenkov.com/java-nio/index.html)
+ [一文读懂 Java NIO 和 IO 的不同](https://baijiahao.baidu.com/s?id=1632673729522644150&wfr=spider&for=pc)

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.2 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 66 KiB

After

Width:  |  Height:  |  Size: 40 KiB

BIN
pictures/nio_selector.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB