Netty

cccs7 Lv5

Netty 网络编程

Netty是由JBOSS 提供的一个java开源 框架,现为 Github 上的独立项目。Netty提供异步的、事件驱动 的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器 和客户端程序。

也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端 应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。

“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。 [1]

NIO 基础

三大组件

Channel & Buffer

channel 有一点类似于 stream,它就是读写数据的 双向通道,可以从 channel 将数据读入 buffer,也可以 将 buffer的 数据 写入 channel,而之前的 stream 要么是输入,要么是输出,channel 比 stream 更为底层

graph LR
channel --> buffer
buffer --> channel

常见的 Channel

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

buffer 则用来缓冲读写数据,常见的 Buffer 有

  • ByteBuffer
    • MappedByteBuffer
    • DirectByteBuffer
    • HeapByteBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer
  • CharBuffer

Selector

selector 单从字面意思不好理解,需要结合 服务器的 设计演化来理解它的用途

多线程版设计
graph TD
subgraph 多线程版
t1(thread) --> s1(socket1)
t2(thread) --> s2(socket2)
t3(thread) --> s3(socket3)
end
多线程版缺点
  • 内存占用高
  • 线程上下文切换成本高
  • 只适合连接较少的场景
线程池版设计
graph TD
subgraph 线程池版
t4(thread) --> s4(socket1)
t5(thread) --> s5(socket2)
t4(thread) -.-> s6(socket3)
t5(thread) -.-> s7(socket4)
end
线程池版缺点
  • 阻塞模式下,线程仅能处理一个 socket 连接
  • 仅适合短连接场景
selector 版设计

selector 的作用 就是配合一个线程来管理多个 channel,获取这些 channel 上发生的事情,这些channel 工作在非阻塞模式下,不会让线程吊死在一个 channel 上。适合 连接数特别多,但流量地的场景 (low traffic)

graph TD
subgraph selector 版
thread --> selector
selector --> c1(channel)
selector --> c2(channel)
selector --> c3(channel)
end

调用 selector 的 select() 会阻塞 直到 channel 发生了 读写就绪事件,这些事件发生,select() 方法就会返回这些事件交给 thread 来处理

ByteBuffer


有一普通文本文件 data.txt

1
1234567890abcd

使用FileChannel 来读取文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.cs7eric.netty.demo;

import lombok.extern.slf4j.Slf4j;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

@Slf4j
public class ChannelDemo1 {

public static void main(String[] args) {
try {
RandomAccessFile file = new RandomAccessFile("data.txt", "rw");
FileChannel channel = file.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
do {
// 向 buffer 写入
int len = channel.read(byteBuffer);
log.debug("读到字节数:{}", len);
if (len == -1) {
break;
}
// 切换到 buffer 读模式
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
log.debug("{}", (char) byteBuffer.get());
}
// 切换到 buffer 写模式
byteBuffer.clear();
}while (true);
} catch (IOException e) {
e.printStackTrace();
}
}
}

输出

image-20230329214105225

ByteBuffer 使用

  1. 向 buffer 写入数据,例如 调用 channel.read(buufer)
  2. 调用 flip() 切换至 读模式
  3. 从 buffer 读取数据,例如 调用 buffer.get()
  4. 调用 clear()compact() 切换至 写模式
  5. 重复 1 - 4 步骤

ByteBuffer 结构

ByteBuffer 有以下重要属性

  • capacity
  • position
  • limit

一开始

写模式下,position 是写入位置,limit 等于容量,下图表示写入了 4 个字节后的状态

flip 动作发生后,position 切换为读取位置,limit 切换为读取限制

读取 4 个字节后,状态

clear 动作发生后,状态

compact 方法,是把未读完的部分向前压缩,然后切换至写模式

调试工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package com.cs7eric.netty.demo;

import io.netty.util.internal.StringUtil;

import java.nio.ByteBuffer;

import static io.netty.util.internal.MathUtil.isOutOfBounds;
import static io.netty.util.internal.StringUtil.NEWLINE;

public class ByteBufferUtil {
private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16];
private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
private static final String[] BYTE2HEX = new String[256];
private static final String[] BYTEPADDING = new String[16];

static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i++) {
HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}

int i;

// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}

// Generate the lookup table for the start-offset header in each row (up to 64KiB).
for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
StringBuilder buf = new StringBuilder(12);
buf.append(NEWLINE);
buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, '|');
buf.append('|');
HEXDUMP_ROWPREFIXES[i] = buf.toString();
}

// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < BYTE2HEX.length; i++) {
BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
}

// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}

// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}

/**
* 打印所有内容
* @param buffer
*/
public static void debugAll(ByteBuffer buffer) {
int oldlimit = buffer.limit();
buffer.limit(buffer.capacity());
StringBuilder origin = new StringBuilder(256);
appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
System.out.println("+--------+-------------------- all ------------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
System.out.println(origin);
buffer.limit(oldlimit);
}

/**
* 打印可读取内容
* @param buffer
*/
public static void debugRead(ByteBuffer buffer) {
StringBuilder builder = new StringBuilder(256);
appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
System.out.println("+--------+-------------------- read -----------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
System.out.println(builder);
}

private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
if (isOutOfBounds(offset, length, buf.capacity())) {
throw new IndexOutOfBoundsException(
"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
}
if (length == 0) {
return;
}
dump.append(
" +-------------------------------------------------+" +
NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
NEWLINE + "+--------+-------------------------------------------------+----------------+");

final int startIndex = offset;
final int fullRows = length >>> 4;
final int remainder = length & 0xF;

// Dump the rows which have 16 bytes.
for (int row = 0; row < fullRows; row++) {
int rowStartIndex = (row << 4) + startIndex;

// Per-row prefix.
appendHexDumpRowPrefix(dump, row, rowStartIndex);

// Hex dump
int rowEndIndex = rowStartIndex + 16;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(" |");

// ASCII dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append('|');
}

// Dump the last row which has less than 16 bytes.
if (remainder != 0) {
int rowStartIndex = (fullRows << 4) + startIndex;
appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);

// Hex dump
int rowEndIndex = rowStartIndex + remainder;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(HEXPADDING[remainder]);
dump.append(" |");

// Ascii dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}

dump.append(NEWLINE +
"+--------+-------------------------------------------------+----------------+");
}

private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
if (row < HEXDUMP_ROWPREFIXES.length) {
dump.append(HEXDUMP_ROWPREFIXES[row]);
} else {
dump.append(NEWLINE);
dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
}

public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF);
}
}

ByteBuffer 常见方法

分配空间

可以使用 allocate 方法为 ByteBuffer 分配空间,其他 buffer 类也有该方法

1
ByteBuffer buf = ByteBuffer.allocate(16);
向 buffer 写入数据

有两种方法

  • 调用 channel 的 read 方法
    • int readBytes = channel.read(buf);
  • 调用 buffer 自己的 put 方法
    • buf.put((byte) 127)
向 buffer 读取数据

同样有两种方法

  • 调用 channel 的 write 方法
    • int writeBytes = channel.write(buf);
  • 调用 buffer 自己的 get 方法
    • byte b = buf.get();

get 方法 会让 position 读指针 向后走,如果 想 重复读取数据

  • 可以 调用 rewind() 方法 将 position 重新置为 0
  • 或者 调用 get(int i) 方法 获取 索引 为 i 的内容,他不会移动指针
mark 和 reset

mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset 就能回到 mark 的 位置

rewind 、flip 都会清除 mark 位置

字符串和 ByteBuffer 互转
1
2
3
4
5
6
7
8
9
ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好");
ByteBuffer buffer2 = Charset.forName("utf-8").encode("你好");

debug(buffer1);
debug(buffer2);

CharBuffer buffer3 = StandardCharsets.UTF_8.decode(buffer1);
System.out.println(buffer3.getClass());
System.out.println(buffer3.toString());

image-20230329215951740

Buffer 线程安全

Buffer 是 非 线程安全的

Netty 入门

Netty 进阶

Netty 优化与源码

  • Title: Netty
  • Author: cccs7
  • Created at: 2023-03-26 22:23:11
  • Updated at: 2023-06-29 23:13:21
  • Link: https://blog.cccs7.icu/2023/03/26/Netty/
  • License: This work is licensed under CC BY-NC-SA 4.0.
 Comments