在上篇简单的分析了socks协议。 这篇中看下netty是如何实现socks的服务端的。
socks的实现主要实现在io.netty.handler.codec.socks
包下。 在example中给了一个示例。代码在这里
下面就对这个实现过程进行解析
主要目地:
结合实现了解socks细节
实现socks的过程,更进步了解netty的使用
入口 入口在SocksServer的main方法中。 创建ServerBootstrap
对象,初始化参数,与常规的netty 一样。 这里关键在SocksServerInitializer
类,做了相关初始化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup (1 ); EventLoopGroup workerGroup = new NioEventLoopGroup (); try { ServerBootstrap b = new ServerBootstrap (); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler (LogLevel.INFO)) .childHandler(new SocksServerInitializer ()); b.bind(PORT).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
SocksServerInitializer 在初始化的时候,增加三个handler:
io.netty.handler.codec.socks.SocksInitRequestDecoder
io.netty.handler.codec.socks.SocksMessageEncoder
io.netty.example.socksproxy.SocksServerHandler
1 2 3 4 5 6 7 8 9 10 private final SocksMessageEncoder socksMessageEncoder = new SocksMessageEncoder ();private final SocksServerHandler socksServerHandler = new SocksServerHandler ();@Override public void initChannel (SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(new SocksInitRequestDecoder ()); p.addLast(socksMessageEncoder); p.addLast(socksServerHandler); }
SocksInitRequestDecoder 看继承关系:public class SocksInitRequestDecoder extends ReplayingDecoder<State>
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
ReplayingDecoder
是ByteToMessageDecoder
解码的一种特殊的抽象基类, ByteToMessageDecoder
解码读取缓冲区的数据之前需要检查缓冲区是否有足够的字节,使用ReplayingDecoder就无需自己检查。若ByteBuf中有足够的字节,则会正常读取;若没有足够的字节则会停止解码。也正因为这样的包装使得ReplayingDecoder带有一定的局限性。
不是所有的操作都被ByteBuf支持,如果调用一个不支持的操作会抛出DecoderException。
ByteBuf.readableBytes()大部分时间不会返回期望值
SocksInitRequestDecoder 重写了 ReplayingDecoder的decode方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override protected void decode (ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception { switch (state()) { case CHECK_PROTOCOL_VERSION: { version = SocksProtocolVersion.valueOf(byteBuf.readByte()); if (version != SocksProtocolVersion.SOCKS5) { break ; } checkpoint(State.READ_AUTH_SCHEMES); } case READ_AUTH_SCHEMES: { authSchemes.clear(); authSchemeNum = byteBuf.readByte(); for (int i = 0 ; i < authSchemeNum; i++) { authSchemes.add(SocksAuthScheme.valueOf(byteBuf.readByte())); } msg = new SocksInitRequest (authSchemes); break ; } } ctx.pipeline().remove(this ); out.add(msg); }
在这个decode方法中主要做了两件事
读版本号
读认证方式列表 是对socks客户端发来的第一段消息的处理。
VER
NMETHODS
METHODS
1
1
1-255
把读取好的authSchemes
传入SocksInitRequest
中,并做为msg交由上层处理。 并把自己从处理器链中移除。
SocksInitRequest
是SocksRequest
的子类。
1 2 3 4 5 6 7 8 9 10 11 public final class SocksInitRequest extends SocksRequest { private final List<SocksAuthScheme> authSchemes; public SocksInitRequest (List<SocksAuthScheme> authSchemes) { super (SocksRequestType.INIT); if (authSchemes == null ) { throw new NullPointerException ("authSchemes" ); } this .authSchemes = authSchemes; }
SocksInitRequest
在初始化的时候,把SocksRequestType
初始化为INIT
。
下一个处理SocksInitRequest
的handler是哪个呢?
在最开始的时候增加三个处理器,SocksInitRequestDecoder
,SocksMessageEncoder
,SocksServerHandler
。
SocksInitRequestDecoder
已经在处理完成移除了SocksMessageEncoder
继承于MessageToByteEncoder<SocksMessage>
是ChannelOutboundHandler
类的处理器,所以不是。SocksServerHandler
继承于SimpleChannelInboundHandler<SocksRequest>
是ChannelInboundHandler
的处理器,并SocksInitRequest
是SocksRequest
的子类。所是由于SocksServerHandler
处理。
下面来看SocksServerHandler
的处理方式
SocksServerHandler SocksServerHandler
实现了channelRead0
方法。channelRead0
方法是当指定对象解析好时调用的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override public void channelRead0 (ChannelHandlerContext ctx, SocksRequest socksRequest) throws Exception { switch (socksRequest.requestType()) { case INIT: { ctx.pipeline().addFirst(new SocksCmdRequestDecoder ()); ctx.write(new SocksInitResponse (SocksAuthScheme.NO_AUTH)); break ; } case AUTH: ctx.pipeline().addFirst(new SocksCmdRequestDecoder ()); ctx.write(new SocksAuthResponse (SocksAuthStatus.SUCCESS)); break ; case CMD: SocksCmdRequest req = (SocksCmdRequest) socksRequest; if (req.cmdType() == SocksCmdType.CONNECT) { ctx.pipeline().addLast(new SocksServerConnectHandler ()); ctx.pipeline().remove(this ); ctx.fireChannelRead(socksRequest); } else { ctx.close(); } break ; case UNKNOWN: ctx.close(); break ; } }
由上面的分析可知最开始会进入INIT
分支。
增加了一个SocksCmdRequestDecoder
处理器
向客户端发送了一个SocksInitResponse
对象 这里服务器是按不要验证来实现的。 如果需要,这里也可以拿到socksRequest的几种认证方式进行认证。
SocksInitResponse
是SocksResponse
的子类,是SocksMessage
的子类。于是由SocksMessageEncoder
进行编码。
SocksInitResponse
中encodeAsByteBuf
就是编码方法
1 2 3 4 5 @Override public void encodeAsByteBuf(ByteBuf byteBuf ) { byteBuf.writeByte(protocolVersion () .byteValue() ); byteBuf.writeByte(authScheme .byteValue () ); }
于是由客户端发0x05 0x00
的消息,是socks的服务器回应。
这个时候,再看处理器链上有哪些处理器呢? 有
SocksCmdRequestDecoder
SocksMessageEncoder
SocksServerHandler
在0x05 0x00
消息客户端收到之后,认证过程结果,客户端会向服务器端发送SOCKS5请求 接下来就看是如何处理的呢? 在SocksInitRequest
移除后,增加SocksCmdRequestDecoder
处理器。 下面来看SocksCmdRequestDecoder
SocksCmdRequestDecoder 1 2 public class SocksCmdRequestDecoder extends ReplayingDecoder <State> {
同样是ReplayingDecoder的子类。
看decode
方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Override protected void decode (ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception { switch (state()) { case CHECK_PROTOCOL_VERSION: { version = SocksProtocolVersion.valueOf(byteBuf.readByte()); if (version != SocksProtocolVersion.SOCKS5) { break ; } checkpoint(State.READ_CMD_HEADER); } case READ_CMD_HEADER: { cmdType = SocksCmdType.valueOf(byteBuf.readByte()); reserved = byteBuf.readByte(); addressType = SocksAddressType.valueOf(byteBuf.readByte()); checkpoint(State.READ_CMD_ADDRESS); } case READ_CMD_ADDRESS: { switch (addressType) { case IPv4: { host = SocksCommonUtils.intToIp(byteBuf.readInt()); port = byteBuf.readUnsignedShort(); msg = new SocksCmdRequest (cmdType, addressType, host, port); break ; } case DOMAIN: { fieldLength = byteBuf.readByte(); host = SocksCommonUtils.readUsAscii(byteBuf, fieldLength); port = byteBuf.readUnsignedShort(); msg = new SocksCmdRequest (cmdType, addressType, host, port); break ; } case IPv6: { byte [] bytes = new byte [16 ]; byteBuf.readBytes(bytes); host = SocksCommonUtils.ipv6toStr(bytes); port = byteBuf.readUnsignedShort(); msg = new SocksCmdRequest (cmdType, addressType, host, port); break ; } case UNKNOWN: break ; } } } ctx.pipeline().remove(this ); out.add(msg); }
在这里把命令类型,地址类型进行解析,并封装成SocksCmdRequest
类,交由上层(SocksServerHandler)处理。 关把自己从处理器链中移除。
SocksCmdRequest
是SocksRequest
的子类
1 public final class SocksCmdRequest extends SocksRequest {
SocksServerHandler的CMD命令处理 1 2 3 4 5 6 7 8 9 10 case CMD: SocksCmdRequest req = (SocksCmdRequest) socksRequest; if (req.cmdType() == SocksCmdType.CONNECT) { ctx.pipeline().addLast(new SocksServerConnectHandler ()); ctx.pipeline().remove(this ); ctx.fireChannelRead(socksRequest); } else { ctx.close(); } break ;
判断如果是CONNECT
命令,就增加SocksServerConnectHandler
处理器进行处理。 然后把自己从处理器链中移除。(应该在这个阶段完成的,已经完成。) 并调用ctx.fireChannelRead(socksRequest);
,目地是让SocksServerConnectHandler
类能再次处理这个socksRequest
,进行连接。
注意: CMD还有好几种,这个示例中只处理了CONNECT
命令
1 2 3 4 5 public enum SocksCmdType { CONNECT((byte ) 0x01 ), BIND((byte ) 0x02 ), UDP((byte ) 0x03 ), UNKNOWN((byte ) 0xff )
在现处理器链上的处理器有:
SocksServerConnectHandler
SocksMessageEncoder
SocksServerConnectHandler 经过上面分析,已经有SocksCmdRequest中的相关信息。并走了SocksServerConnectHandler的channelRead0
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public final class SocksServerConnectHandler extends SimpleChannelInboundHandler <SocksCmdRequest> { private final Bootstrap b = new Bootstrap (); @Override public void channelRead0 (final ChannelHandlerContext ctx, final SocksCmdRequest request) throws Exception { Promise<Channel> promise = ctx.executor().newPromise(); promise.addListener( new GenericFutureListener <Future<Channel>>() { @Override public void operationComplete (final Future<Channel> future) throws Exception { final Channel outboundChannel = future.getNow(); if (future.isSuccess()) { ctx.channel().writeAndFlush(new SocksCmdResponse (SocksCmdStatus.SUCCESS, request.addressType())) .addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture channelFuture) { ctx.pipeline().remove(SocksServerConnectHandler.this ); outboundChannel.pipeline().addLast(new RelayHandler (ctx.channel())); ctx.pipeline().addLast(new RelayHandler (outboundChannel)); } }); } else { ctx.channel().writeAndFlush(new SocksCmdResponse (SocksCmdStatus.FAILURE, request.addressType())); SocksServerUtils.closeOnFlush(ctx.channel()); } } }); final Channel inboundChannel = ctx.channel(); b.group(inboundChannel.eventLoop()) .channel(NioSocketChannel.class) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000 ) .option(ChannelOption.SO_KEEPALIVE, true ) .handler(new DirectClientHandler (promise)); b.connect(request.host(), request.port()).addListener(new ChannelFutureListener () { @Override public void operationComplete (ChannelFuture future) throws Exception { if (future.isSuccess()) { } else { ctx.channel().writeAndFlush( new SocksCmdResponse (SocksCmdStatus.FAILURE, request.addressType())); SocksServerUtils.closeOnFlush(ctx.channel()); } } }); }
这里是处理代理的核心代码:
看起来很复杂,一步一步分析,并不复杂
7-28行,定义了一个promise
。
14行 在promise完成之后发送SocksCmdResponse
命令
18-20行,在发送完后移除SocksServerConnectHandler
,并增加RelayHandler
处理器
30-35行 定义了一个新的netty客户端
37行 进行对服务器的地址与端口进行连接
nio的代码有时不好理解是因为它是异步的,不一定按代码书写顺序执行。
进一步解析下执行顺序:
连接远程服务器(connect)
连接成功后,会在DirectClientHandler中处理
DirectClientHandler中把setSuccess(promise.setSuccess(ctx.channel());
)
setSuccess相当于这个事件成功了,就会通知到11行的代码开始执行
向客户端发送new SocksCmdResponse(SocksCmdStatus.SUCCESS)
消息
这个消息发送成功后,就会执行17行开始的代码。
在outboundChannel上增加一个RelayHandler
outboundChannel是怎么来的呢?是future拿到的,是promise的返回结果
这个返回结果是在DirectClientHandler中设置的
就是与远程服务器连接的channel
在ctx上也增加一个RelayHandler,这个channel是客户端与自己的channel
于是,通过ctx与outboundChannel,做到了“你中有我,我中有你”
客户端与自己的连接处理器中有自己与远程服务器的channel的引用
自己与远程服务器的连接处理器中有客户端与自己的channel的引用
RelayHandler是有两个实例的,就在接收到消息就写到对方的channel中,就保证了消息的互相转发。
流程
client
proxy
remote server
协商版本 –>
SocksInitRequestDecoder
SocksServerHandler 处理
<– SocksInitResponse
SOCKS5请求 –>
SocksCmdRequestDecoder
SocksServerHandler
SocksServerConnectHandler
connect to reomte server –>
<– 连接成功消息
DirectClientHandler 处理
Promise.operationComplete
<–- 连接成功消息
add client RelayHandler
add server RelayHandler
收到返回的消息
http 请求 –>
client RelayHandler
http 请求 –>
<– http响应
server RelayHandler
<– http响应
收到http响应
总结 看代码是最直观理解机制的方法 解释了上篇中为什么没有第3条数包的打印。 因为在SocksServerHandler
的INIT
分支中,把ctx.pipeline().addFirst(new SocksCmdRequestDecoder());
这样LoggingHandler
就在SocksCmdRequestDecoder
之后,所以在SocksCmdRequestDecoder
处理完客户端来的消息之后,就没有经过LoggingHandler
处理。
之前对Promise
和Future
的概念不是很理解,看了这个代码的处理这后,就和之前看的Promise
和Future
的说法互想验证了,了解应该应用在什么场景下。