netty的socksproxy例子源代码解析

在上篇简单的分析了socks协议。
这篇中看下netty是如何实现socks的服务端的。

socks的实现主要实现在io.netty.handler.codec.socks包下。
在example中给了一个示例。代码在这里

下面就对这个实现过程进行解析

主要目地:

  • 结合实现了解socks细节
  • 实现socks的过程,更进步了解netty的使用

入口

入口在SocksServer的main方法中。
创建ServerBootstrap对象,初始化参数,与常规的netty 一样。
这里关键在SocksServerInitializer类,做了相关初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new SocksServerInitializer());
b.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

SocksServerInitializer

在初始化的时候,增加三个handler:

  • io.netty.handler.codec.socks.SocksInitRequestDecoder
  • io.netty.handler.codec.socks.SocksMessageEncoder
  • io.netty.example.socksproxy.SocksServerHandler
1
2
3
4
5
6
7
8
9
10
private final SocksMessageEncoder socksMessageEncoder = new SocksMessageEncoder();
private final SocksServerHandler socksServerHandler = new SocksServerHandler();

@Override
public void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new SocksInitRequestDecoder());
p.addLast(socksMessageEncoder);
p.addLast(socksServerHandler);
}

SocksInitRequestDecoder

看继承关系:
public class SocksInitRequestDecoder extends ReplayingDecoder<State>
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder

ReplayingDecoderByteToMessageDecoder解码的一种特殊的抽象基类,ByteToMessageDecoder解码读取缓冲区的数据之前需要检查缓冲区是否有足够的字节,使用ReplayingDecoder就无需自己检查。若ByteBuf中有足够的字节,则会正常读取;若没有足够的字节则会停止解码。也正因为这样的包装使得ReplayingDecoder带有一定的局限性。

  • 不是所有的操作都被ByteBuf支持,如果调用一个不支持的操作会抛出DecoderException。
  • ByteBuf.readableBytes()大部分时间不会返回期望值

SocksInitRequestDecoder 重写了 ReplayingDecoder的decode方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
switch (state()) {
case CHECK_PROTOCOL_VERSION: {
version = SocksProtocolVersion.valueOf(byteBuf.readByte());
if (version != SocksProtocolVersion.SOCKS5) {
break;
}
checkpoint(State.READ_AUTH_SCHEMES);
}
case READ_AUTH_SCHEMES: {
authSchemes.clear();
authSchemeNum = byteBuf.readByte();
for (int i = 0; i < authSchemeNum; i++) {
authSchemes.add(SocksAuthScheme.valueOf(byteBuf.readByte()));
}
msg = new SocksInitRequest(authSchemes);
break;
}
}
ctx.pipeline().remove(this);
out.add(msg);
}

在这个decode方法中主要做了两件事

  • 读版本号
  • 读认证方式列表
    是对socks客户端发来的第一段消息的处理。
VER NMETHODS METHODS
1 1 1-255

把读取好的authSchemes 传入SocksInitRequest中,并做为msg交由上层处理。
并把自己从处理器链中移除。

SocksInitRequestSocksRequest的子类。

1
2
3
4
5
6
7
8
9
10
public final class SocksInitRequest extends SocksRequest {
private final List<SocksAuthScheme> authSchemes;

public SocksInitRequest(List<SocksAuthScheme> authSchemes) {
super(SocksRequestType.INIT);
if (authSchemes == null) {
throw new NullPointerException("authSchemes");
}
this.authSchemes = authSchemes;
}

SocksInitRequest在初始化的时候,把SocksRequestType初始化为INIT

下一个处理SocksInitRequest的handler是哪个呢?

在最开始的时候增加三个处理器,SocksInitRequestDecoderSocksMessageEncoder,SocksServerHandler

SocksInitRequestDecoder已经在处理完成移除了
SocksMessageEncoder继承于MessageToByteEncoder<SocksMessage>ChannelOutboundHandler类的处理器,所以不是。
SocksServerHandler继承于SimpleChannelInboundHandler<SocksRequest>ChannelInboundHandler的处理器,并SocksInitRequestSocksRequest的子类。所是由于SocksServerHandler处理。

下面来看SocksServerHandler的处理方式

SocksServerHandler

SocksServerHandler实现了channelRead0方法。
channelRead0方法是当指定对象解析好时调用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public void channelRead0(ChannelHandlerContext ctx, SocksRequest socksRequest) throws Exception {
switch (socksRequest.requestType()) {
case INIT: {
// auth support example
//ctx.pipeline().addFirst(new SocksAuthRequestDecoder());
//ctx.write(new SocksInitResponse(SocksAuthScheme.AUTH_PASSWORD));
ctx.pipeline().addFirst(new SocksCmdRequestDecoder());
ctx.write(new SocksInitResponse(SocksAuthScheme.NO_AUTH));
break;
}
case AUTH:
ctx.pipeline().addFirst(new SocksCmdRequestDecoder());
ctx.write(new SocksAuthResponse(SocksAuthStatus.SUCCESS));
break;
case CMD:
SocksCmdRequest req = (SocksCmdRequest) socksRequest;
if (req.cmdType() == SocksCmdType.CONNECT) {
ctx.pipeline().addLast(new SocksServerConnectHandler());
ctx.pipeline().remove(this);
ctx.fireChannelRead(socksRequest);
} else {
ctx.close();
}
break;
case UNKNOWN:
ctx.close();
break;
}
}

由上面的分析可知最开始会进入INIT分支。

  • 增加了一个SocksCmdRequestDecoder处理器
  • 向客户端发送了一个SocksInitResponse对象
    这里服务器是按不要验证来实现的。
    如果需要,这里也可以拿到socksRequest的几种认证方式进行认证。

SocksInitResponseSocksResponse的子类,是SocksMessage的子类。于是由SocksMessageEncoder进行编码。

SocksInitResponseencodeAsByteBuf就是编码方法

1
2
3
4
5
@Override
public void encodeAsByteBuf(ByteBuf byteBuf) {
byteBuf.writeByte(protocolVersion().byteValue());
byteBuf.writeByte(authScheme.byteValue());
}

于是由客户端发0x05 0x00的消息,是socks的服务器回应。

这个时候,再看处理器链上有哪些处理器呢?

  • SocksCmdRequestDecoder
  • SocksMessageEncoder
  • SocksServerHandler

0x05 0x00消息客户端收到之后,认证过程结果,客户端会向服务器端发送SOCKS5请求
接下来就看是如何处理的呢?
SocksInitRequest移除后,增加SocksCmdRequestDecoder处理器。
下面来看SocksCmdRequestDecoder

SocksCmdRequestDecoder

1
public class SocksCmdRequestDecoder extends ReplayingDecoder<State> {

同样是ReplayingDecoder的子类。

decode方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
switch (state()) {
case CHECK_PROTOCOL_VERSION: {
version = SocksProtocolVersion.valueOf(byteBuf.readByte());
if (version != SocksProtocolVersion.SOCKS5) {
break;
}
checkpoint(State.READ_CMD_HEADER);
}
case READ_CMD_HEADER: {
cmdType = SocksCmdType.valueOf(byteBuf.readByte());
reserved = byteBuf.readByte();
addressType = SocksAddressType.valueOf(byteBuf.readByte());
checkpoint(State.READ_CMD_ADDRESS);
}
case READ_CMD_ADDRESS: {
switch (addressType) {
case IPv4: {
host = SocksCommonUtils.intToIp(byteBuf.readInt());
port = byteBuf.readUnsignedShort();
msg = new SocksCmdRequest(cmdType, addressType, host, port);
break;
}
case DOMAIN: {
fieldLength = byteBuf.readByte();
host = SocksCommonUtils.readUsAscii(byteBuf, fieldLength);
port = byteBuf.readUnsignedShort();
msg = new SocksCmdRequest(cmdType, addressType, host, port);
break;
}
case IPv6: {
byte[] bytes = new byte[16];
byteBuf.readBytes(bytes);
host = SocksCommonUtils.ipv6toStr(bytes);
port = byteBuf.readUnsignedShort();
msg = new SocksCmdRequest(cmdType, addressType, host, port);
break;
}
case UNKNOWN:
break;
}
}
}
ctx.pipeline().remove(this);
out.add(msg);
}

在这里把命令类型,地址类型进行解析,并封装成SocksCmdRequest类,交由上层(SocksServerHandler)处理。
关把自己从处理器链中移除。

SocksCmdRequestSocksRequest的子类

1
public final class SocksCmdRequest extends SocksRequest {

SocksServerHandler的CMD命令处理

1
2
3
4
5
6
7
8
9
10
case CMD:
SocksCmdRequest req = (SocksCmdRequest) socksRequest;
if (req.cmdType() == SocksCmdType.CONNECT) {
ctx.pipeline().addLast(new SocksServerConnectHandler());
ctx.pipeline().remove(this);
ctx.fireChannelRead(socksRequest);
} else {
ctx.close();
}
break;

判断如果是CONNECT命令,就增加SocksServerConnectHandler处理器进行处理。
然后把自己从处理器链中移除。(应该在这个阶段完成的,已经完成。)
并调用ctx.fireChannelRead(socksRequest);,目地是让SocksServerConnectHandler类能再次处理这个socksRequest,进行连接。

注意:
CMD还有好几种,这个示例中只处理了CONNECT命令

1
2
3
4
5
public enum SocksCmdType {
CONNECT((byte) 0x01),
BIND((byte) 0x02),
UDP((byte) 0x03),
UNKNOWN((byte) 0xff)

在现处理器链上的处理器有:

  • SocksServerConnectHandler
  • SocksMessageEncoder

SocksServerConnectHandler

经过上面分析,已经有SocksCmdRequest中的相关信息。并走了SocksServerConnectHandler的channelRead0方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public final class SocksServerConnectHandler extends SimpleChannelInboundHandler<SocksCmdRequest> {

private final Bootstrap b = new Bootstrap();

@Override
public void channelRead0(final ChannelHandlerContext ctx, final SocksCmdRequest request) throws Exception {
Promise<Channel> promise = ctx.executor().newPromise();
promise.addListener(
new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(final Future<Channel> future) throws Exception {
final Channel outboundChannel = future.getNow();
if (future.isSuccess()) {
ctx.channel().writeAndFlush(new SocksCmdResponse(SocksCmdStatus.SUCCESS, request.addressType()))
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) {
ctx.pipeline().remove(SocksServerConnectHandler.this);
outboundChannel.pipeline().addLast(new RelayHandler(ctx.channel()));
ctx.pipeline().addLast(new RelayHandler(outboundChannel));
}
});
} else {
ctx.channel().writeAndFlush(new SocksCmdResponse(SocksCmdStatus.FAILURE, request.addressType()));
SocksServerUtils.closeOnFlush(ctx.channel());
}
}
});

final Channel inboundChannel = ctx.channel();
b.group(inboundChannel.eventLoop())
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new DirectClientHandler(promise));

b.connect(request.host(), request.port()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// Connection established use handler provided results
} else {
// Close the connection if the connection attempt has failed.
ctx.channel().writeAndFlush(
new SocksCmdResponse(SocksCmdStatus.FAILURE, request.addressType()));
SocksServerUtils.closeOnFlush(ctx.channel());
}
}
});
}

这里是处理代理的核心代码:

看起来很复杂,一步一步分析,并不复杂

  • 7-28行,定义了一个promise
    • 14行 在promise完成之后发送SocksCmdResponse命令
    • 18-20行,在发送完后移除SocksServerConnectHandler,并增加RelayHandler处理器
  • 30-35行 定义了一个新的netty客户端
  • 37行 进行对服务器的地址与端口进行连接

nio的代码有时不好理解是因为它是异步的,不一定按代码书写顺序执行。

进一步解析下执行顺序:

  • 连接远程服务器(connect)
  • 连接成功后,会在DirectClientHandler中处理
  • DirectClientHandler中把setSuccess(promise.setSuccess(ctx.channel());)
  • setSuccess相当于这个事件成功了,就会通知到11行的代码开始执行
  • 向客户端发送new SocksCmdResponse(SocksCmdStatus.SUCCESS)消息
  • 这个消息发送成功后,就会执行17行开始的代码。
  • 在outboundChannel上增加一个RelayHandler
    • outboundChannel是怎么来的呢?是future拿到的,是promise的返回结果
    • 这个返回结果是在DirectClientHandler中设置的
    • 就是与远程服务器连接的channel
  • 在ctx上也增加一个RelayHandler,这个channel是客户端与自己的channel
  • 于是,通过ctx与outboundChannel,做到了“你中有我,我中有你”
    • 客户端与自己的连接处理器中有自己与远程服务器的channel的引用
    • 自己与远程服务器的连接处理器中有客户端与自己的channel的引用
  • RelayHandler是有两个实例的,就在接收到消息就写到对方的channel中,就保证了消息的互相转发。

流程

client proxy remote server
协商版本 –>
SocksInitRequestDecoder
SocksServerHandler 处理
<– SocksInitResponse
SOCKS5请求 –>
SocksCmdRequestDecoder
SocksServerHandler
SocksServerConnectHandler
connect to reomte server –>
<– 连接成功消息
DirectClientHandler 处理
Promise.operationComplete
<–- 连接成功消息
add client RelayHandler
add server RelayHandler
收到返回的消息
http 请求 –>
client RelayHandler
http 请求 –>
<– http响应
server RelayHandler
<– http响应
收到http响应

总结

看代码是最直观理解机制的方法
解释了上篇中为什么没有第3条数包的打印。
因为在SocksServerHandlerINIT分支中,把ctx.pipeline().addFirst(new SocksCmdRequestDecoder());这样LoggingHandler就在SocksCmdRequestDecoder之后,所以在SocksCmdRequestDecoder处理完客户端来的消息之后,就没有经过LoggingHandler处理。

之前对PromiseFuture的概念不是很理解,看了这个代码的处理这后,就和之前看的PromiseFuture的说法互想验证了,了解应该应用在什么场景下。