作者gasbomb (虚空雷神兽)
看板mud
标题[心得] 从0开始 3.8 非阻塞式IO的聊天室
时间Thu Dec 19 17:55:16 2019
之前实作的聊天室由於使用了阻塞式的 IO
在等待使用者输入指令时整个执行绪都必须暂停
所以说线上有几个使用者就等於我们要同时开启几条执行绪
这是非常浪费资源的
在後来的 java 版本有提供了非阻塞式的 IO
让我们可以只用一条执行绪就可以应付许多连线
这次就使用 AsynchronousServerSocketChannel 来实作聊天室 (简称 AIO)
以下就是聊天室的程式码
由於 AIO 有非常多的细节, 但是我们的目的是要开发 MUD
因此这边我不打算解释的太详细
// GeneralAioEchoServer.java
// ✂--------------请沿虚线剪下--------------
package test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.*;
public class GeneralAioEchoServer {
private AsynchronousServerSocketChannel assc;
private Set<AsynchronousSocketChannel> users = new HashSet<>();
public static void main(String[] args) throws Exception {
GeneralAioEchoServer server = new GeneralAioEchoServer();
server.start();
// AIO 因为不会阻塞, 所以必须要有无限回圈来维持 main thread
while (true) {
Thread.sleep(5000L);
}
}
// 建立连线池, 设定 server port, 启动
private void start() throws IOException {
ExecutorService pool = Executors.newSingleThreadExecutor();
AsynchronousChannelGroup channelGroup =
AsynchronousChannelGroup.withThreadPool(pool);
assc = AsynchronousServerSocketChannel.open(channelGroup);
assc.bind(new InetSocketAddress(4000));
// 设定 callback method
assc.accept(null, new AcceptHandler());
}
private class AcceptHandler implements
CompletionHandler<AsynchronousSocketChannel, Object> {
@Override
public void completed(AsynchronousSocketChannel asc, Object o) {
assc.accept(null, this);
try {
asc.write(StandardCharsets.UTF_8.encode(
"欢迎来到 aio telnet chat server\r\n")).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
ByteBuffer bb = ByteBuffer.allocate(1024);
asc.read(bb, null, new ReadHandler(asc, bb));
users.add(asc);
}
@Override
public void failed(Throwable throwable, Object o) {
}
}
private class ReadHandler implements CompletionHandler<Integer, Object> {
private AsynchronousSocketChannel asc;
private ByteBuffer bb;
private MyByteArrayOutputStream byteArrayOutputStream =
new MyByteArrayOutputStream();
private boolean firstChar = true;
private Queue<String> inputs = new LinkedList<>();
public ReadHandler(AsynchronousSocketChannel asc, ByteBuffer bb) {
this.asc = asc;
this.bb = bb;
}
@Override
public void completed(Integer result, Object o) {
if (result == -1) return;
// 逐 byte 读取玩家输入的字元
byte[] bytes = new byte[result];
bb.flip().get(bytes).clear();
for (byte b : bytes) {
switch (b) {
case '\n':
if (firstChar) {
firstChar = false;
continue;
}
case '\r':
inputs.offer(
new String(byteArrayOutputStream.toByteArray(),
StandardCharsets.UTF_8));
byteArrayOutputStream.reset();
firstChar = true;
continue;
case 127:
byteArrayOutputStream.backspace();
continue;
default:
byteArrayOutputStream.write(b);
firstChar = false;
}
}
try {
while (!inputs.isEmpty()) {
String message = inputs.poll() + "\r\n";
for (AsynchronousSocketChannel user : users) {
user.write(StandardCharsets.UTF_8.encode(message)).get();
}
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
asc.read(bb, null, this);
}
@Override
public void failed(Throwable throwable, Object o) {
}
}
// 继承ByteArrayOutputStream 实作 backspace 的功能
private static class MyByteArrayOutputStream extends ByteArrayOutputStream {
public void backspace() {
if (count > 0) count--;
}
}
}
// ✂--------------请沿虚线剪下--------------
如此一来, 更轻量化的聊天室就完成了
下一次我们会开始实作登入系统
--
╔═◢ ◣═╦╦═════╦═════╗
║
◤◤◤ ◥ ╠╣
飞鸟ももこ╠═╗ ║
║ ▇ ▇ ║╚═════╝ ╚═╦═╣
║ ▌ ● ● ▌ ║╔══════╗╔═╩═╣
║
◤ ◥
︺█◤
◥╠╣
Momoko Asuka╠╝ ║
╚◣◢ ▄▂▄ ◣◢╩╩══════╩════╝
--
※ 发信站: 批踢踢实业坊(ptt.cc), 来自: 211.72.253.48 (台湾)
※ 文章网址: https://webptt.com/cn.aspx?n=bbs/mud/M.1576749320.A.36F.html
1F:→ laechan : 所以实际上底层还是走线程池 114.33.66.104 12/19 18:20
2F:→ laechan : 以AsynchronousServerSocketChannel 114.33.66.104 12/19 18:21
3F:→ laechan : 来包裹它的一些应用 114.33.66.104 12/19 18:21
4F:推 outshaker : 推 期待更新 1.160.108.129 12/20 15:34
5F:推 nfsong : 推 36.229.221.126 01/04 10:13
6F:推 jameslong : 等候更新 110.50.153.144 01/18 17:56