Netty 网络编程
Netty是由JBOSS 提供的一个java开源 框架,现为 Github 上的独立项目。Netty提供异步的、事件驱动 的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器 和客户端程序。
也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端 应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。 [1]
NIO 基础 三大组件 Channel & Buffer channel 有一点类似于 stream,它就是读写数据的 双向通道 ,可以从 channel 将数据读入 buffer,也可以 将 buffer的 数据 写入 channel,而之前的 stream 要么是输入,要么是输出,channel 比 stream 更为底层
graph LR
channel --> buffer
buffer --> channel
常见的 Channel
FileChannel
DatagramChannel
SocketChannel
ServerSocketChannel
buffer 则用来缓冲读写数据,常见的 Buffer 有
ByteBuffer
MappedByteBuffer
DirectByteBuffer
HeapByteBuffer
ShortBuffer
IntBuffer
LongBuffer
FloatBuffer
DoubleBuffer
CharBuffer
Selector selector 单从字面意思不好理解,需要结合 服务器的 设计演化来理解它的用途
多线程版设计 graph TD
subgraph 多线程版
t1(thread) --> s1(socket1)
t2(thread) --> s2(socket2)
t3(thread) --> s3(socket3)
end
多线程版缺点
内存占用高
线程上下文切换成本高
只适合连接较少的场景
线程池版设计 graph TD
subgraph 线程池版
t4(thread) --> s4(socket1)
t5(thread) --> s5(socket2)
t4(thread) -.-> s6(socket3)
t5(thread) -.-> s7(socket4)
end
线程池版缺点
阻塞模式下,线程仅能处理一个 socket 连接
仅适合短连接场景
selector 版设计 selector 的作用 就是配合一个线程来管理多个 channel,获取这些 channel 上发生的事情,这些channel 工作在非阻塞模式下,不会让线程吊死在一个 channel 上。适合 连接数特别多,但流量地的场景 (low traffic)
graph TD
subgraph selector 版
thread --> selector
selector --> c1(channel)
selector --> c2(channel)
selector --> c3(channel)
end
调用 selector 的 select()
会阻塞 直到 channel 发生了 读写就绪事件,这些事件发生,select()
方法就会返回这些事件交给 thread 来处理
ByteBuffer
有一普通文本文件 data.txt
使用FileChannel
来读取文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package com.cs7eric.netty.demo;import lombok.extern.slf4j.Slf4j;import java.io.FileNotFoundException;import java.io.IOException;import java.io.RandomAccessFile;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;@Slf4j public class ChannelDemo1 { public static void main (String[] args) { try { RandomAccessFile file = new RandomAccessFile ("data.txt" , "rw" ); FileChannel channel = file.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(10 ); do { int len = channel.read(byteBuffer); log.debug("读到字节数:{}" , len); if (len == -1 ) { break ; } byteBuffer.flip(); while (byteBuffer.hasRemaining()) { log.debug("{}" , (char ) byteBuffer.get()); } byteBuffer.clear(); }while (true ); } catch (IOException e) { e.printStackTrace(); } } }
输出
ByteBuffer 使用
向 buffer 写入数据,例如 调用 channel.read(buufer)
调用 flip()
切换至 读模式
从 buffer 读取数据,例如 调用 buffer.get()
调用 clear()
或 compact()
切换至 写模式
重复 1 - 4 步骤
ByteBuffer 结构 ByteBuffer 有以下重要属性
一开始
写模式下,position 是写入位置,limit 等于容量,下图表示写入了 4 个字节后的状态
flip 动作发生后,position 切换为读取位置,limit 切换为读取限制
读取 4 个字节后,状态
clear 动作发生后,状态
compact 方法,是把未读完的部分向前压缩,然后切换至写模式
调试工具类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 package com.cs7eric.netty.demo;import io.netty.util.internal.StringUtil;import java.nio.ByteBuffer;import static io.netty.util.internal.MathUtil.isOutOfBounds;import static io.netty.util.internal.StringUtil.NEWLINE;public class ByteBufferUtil { private static final char [] BYTE2CHAR = new char [256 ]; private static final char [] HEXDUMP_TABLE = new char [256 * 4 ]; private static final String[] HEXPADDING = new String [16 ]; private static final String[] HEXDUMP_ROWPREFIXES = new String [65536 >>> 4 ]; private static final String[] BYTE2HEX = new String [256 ]; private static final String[] BYTEPADDING = new String [16 ]; static { final char [] DIGITS = "0123456789abcdef" .toCharArray(); for (int i = 0 ; i < 256 ; i++) { HEXDUMP_TABLE[i << 1 ] = DIGITS[i >>> 4 & 0x0F ]; HEXDUMP_TABLE[(i << 1 ) + 1 ] = DIGITS[i & 0x0F ]; } int i; for (i = 0 ; i < HEXPADDING.length; i++) { int padding = HEXPADDING.length - i; StringBuilder buf = new StringBuilder (padding * 3 ); for (int j = 0 ; j < padding; j++) { buf.append(" " ); } HEXPADDING[i] = buf.toString(); } for (i = 0 ; i < HEXDUMP_ROWPREFIXES.length; i++) { StringBuilder buf = new StringBuilder (12 ); buf.append(NEWLINE); buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L )); buf.setCharAt(buf.length() - 9 , '|' ); buf.append('|' ); HEXDUMP_ROWPREFIXES[i] = buf.toString(); } for (i = 0 ; i < BYTE2HEX.length; i++) { BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i); } for (i = 0 ; i < BYTEPADDING.length; i++) { int padding = BYTEPADDING.length - i; StringBuilder buf = new StringBuilder (padding); for (int j = 0 ; j < padding; j++) { buf.append(' ' ); } BYTEPADDING[i] = buf.toString(); } for (i = 0 ; i < BYTE2CHAR.length; i++) { if (i <= 0x1f || i >= 0x7f ) { BYTE2CHAR[i] = '.' ; } else { BYTE2CHAR[i] = (char ) i; } } } public static void debugAll (ByteBuffer buffer) { int oldlimit = buffer.limit(); buffer.limit(buffer.capacity()); StringBuilder origin = new StringBuilder (256 ); appendPrettyHexDump(origin, buffer, 0 , buffer.capacity()); System.out.println("+--------+-------------------- all ------------------------+----------------+" ); System.out.printf("position: [%d], limit: [%d]\n" , buffer.position(), oldlimit); System.out.println(origin); buffer.limit(oldlimit); } public static void debugRead (ByteBuffer buffer) { StringBuilder builder = new StringBuilder (256 ); appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position()); System.out.println("+--------+-------------------- read -----------------------+----------------+" ); System.out.printf("position: [%d], limit: [%d]\n" , buffer.position(), buffer.limit()); System.out.println(builder); } private static void appendPrettyHexDump (StringBuilder dump, ByteBuffer buf, int offset, int length) { if (isOutOfBounds(offset, length, buf.capacity())) { throw new IndexOutOfBoundsException ( "expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length + ") <= " + "buf.capacity(" + buf.capacity() + ')' ); } if (length == 0 ) { return ; } dump.append( " +-------------------------------------------------+" + NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" + NEWLINE + "+--------+-------------------------------------------------+----------------+" ); final int startIndex = offset; final int fullRows = length >>> 4 ; final int remainder = length & 0xF ; for (int row = 0 ; row < fullRows; row++) { int rowStartIndex = (row << 4 ) + startIndex; appendHexDumpRowPrefix(dump, row, rowStartIndex); int rowEndIndex = rowStartIndex + 16 ; for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2HEX[getUnsignedByte(buf, j)]); } dump.append(" |" ); for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]); } dump.append('|' ); } if (remainder != 0 ) { int rowStartIndex = (fullRows << 4 ) + startIndex; appendHexDumpRowPrefix(dump, fullRows, rowStartIndex); int rowEndIndex = rowStartIndex + remainder; for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2HEX[getUnsignedByte(buf, j)]); } dump.append(HEXPADDING[remainder]); dump.append(" |" ); for (int j = rowStartIndex; j < rowEndIndex; j++) { dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]); } dump.append(BYTEPADDING[remainder]); dump.append('|' ); } dump.append(NEWLINE + "+--------+-------------------------------------------------+----------------+" ); } private static void appendHexDumpRowPrefix (StringBuilder dump, int row, int rowStartIndex) { if (row < HEXDUMP_ROWPREFIXES.length) { dump.append(HEXDUMP_ROWPREFIXES[row]); } else { dump.append(NEWLINE); dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L )); dump.setCharAt(dump.length() - 9 , '|' ); dump.append('|' ); } } public static short getUnsignedByte (ByteBuffer buffer, int index) { return (short ) (buffer.get(index) & 0xFF ); } }
ByteBuffer 常见方法 分配空间 可以使用 allocate
方法为 ByteBuffer 分配空间,其他 buffer 类也有该方法
1 ByteBuffer buf = ByteBuffer.allocate(16 );
向 buffer 写入数据 有两种方法
调用 channel 的 read 方法
int readBytes = channel.read(buf);
调用 buffer 自己的 put 方法
向 buffer 读取数据 同样有两种方法
调用 channel 的 write
方法
int writeBytes = channel.write(buf);
调用 buffer 自己的 get
方法
get 方法 会让 position 读指针 向后走,如果 想 重复读取数据
可以 调用 rewind()
方法 将 position 重新置为 0
或者 调用 get(int i)
方法 获取 索引 为 i 的内容,他不会移动指针
mark 和 reset mark 是在读取时,做一个标记,即使 position 改变,只要调用 reset
就能回到 mark 的 位置
rewind 、flip
都会清除 mark 位置
字符串和 ByteBuffer 互转 1 2 3 4 5 6 7 8 9 ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("你好" );ByteBuffer buffer2 = Charset.forName("utf-8" ).encode("你好" );debug(buffer1); debug(buffer2); CharBuffer buffer3 = StandardCharsets.UTF_8.decode(buffer1);System.out.println(buffer3.getClass()); System.out.println(buffer3.toString());
Buffer 线程安全
Buffer 是 非 线程安全的
Netty 入门 Netty 进阶 Netty 优化与源码