Netty系列文章之服务端启动分析

Catalogue
  1. 1. 前言
  2. 2. 服务端代码
  3. 3. Channel的创建和初始化过程
    1. 3.1. NioServerSocketChannel的创建
    2. 3.2. NioServerSocketChanel的实例化过程
      1. 3.2.1. 关于 unsafe
  4. 4. ChannelPipeline的初始化
  5. 5. NioEventLoopGroup
    1. 5.1. NioEventLoopGroup
      1. 5.1.1. 创建ThreadPerTaskExecutor
      2. 5.1.2. 调用 newChild 方法初始化 children 数组
      3. 5.1.3. 创建线程选择器——chooser
    2. 5.2. NioEventLoop的初始化
  6. 6. Channel的注册过程
  7. 7. 添加 ChannelHandler
  8. 8. 小结
  9. 9. 参考资料 & 鸣谢

前言

本文主要分析 Netty服务端的启动,以便对Netty框架有一个基本的认识,我用的Netty版本是 netty-4.1.29,之前的文章Netty 系列文章之基本组件概览 对Netty的基本组件做了一个简单的介绍,算是对本文分析Netty服务端的启动做一个基础铺垫

服务端代码

该源码出自 netty官方提供的 服务端demo,详细地址: https://github.com/netty/netty/blob/4.1/example/src/main/java/io/netty/example/echo/EchoServer.java

我做了一点小改动,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public final class EchoServer {

static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

服务端启动流程概览:

  • 创建 ServerBootStrap启动类
  • 设置并绑定 NioEventLoopGroup线程池
  • 创建服务端 Channel
  • 添加并设置 ChannelHandler
  • 绑定并启动监听端口

在之前的文章我就提到过,ServerBootstrap是Netty服务端的启动辅助类,帮助Netty服务端完成初始化,下面我将深入代码,仔细分析Netty服务端启动过程中各组件的创建与初始化

Channel的创建和初始化过程

Channel是Netty的网络操作抽象类,对应于JDK底层的 Socket,Netty服务端的Channel类型是 NioServerSocketChannel。下面来分析NioServerSocketChannel的创建和初始化

NioServerSocketChannel的创建

NioServerSocketChannel的创建实际上是从 ServerBootStrapbind()方法开始的,进入bind()源码分析(AbstractBootstrap的bind()方法):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
......
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
......
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
......
}

在源码里注意到一个 initAndRegister()方法,这个方法就负责 NioServerSocketChannel的初始化和注册操作,走进initAndRegister()方法,如下图所示:

channel_init_and_create

从上图可以看出,源码里是调用 channelFactory.newChannel()来创建 channel , 走进ChannelFactory发现该接口被 @Deprecated注解标注了,说明是一个过时的接口:

1
2
3
4
5
6
7
@Deprecated
public interface ChannelFactory<T extends Channel> {
/**
* Creates a new channel.
*/
T newChannel();
}

我用的Netty版本是 Netty-4.1.29,其Netty API 文档 中介绍 io.netty.bootstrap.ChannelFactory提到用 io.netty.channel.ChannelFactory代替。

这里 ChannelFactory只是一个工厂接口,真正创建 Channel的是ReflectiveChannelFactory类,它是ChannelFactory的一个重要实现类,该类通过反射方式创建 Channel,源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

private final Class<? extends T> clazz;

public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}

@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}

@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}

其中 newChannel()方法通过 clazz.getConstructor().newInstance()来创建 Channel,即通过反射方式来创建 Channel,而这个 clazz就是 通过ServerBootStrapchannel方法传入的,最开始的服务端代码传入的NioServerSocketChannel,所以对应通过反射创建了NioServerSocketChannel,并且 ChannelFactory的初始化也是在该方法中进行的,代码如下:

1
2
3
4
5
6
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

到此,NioServerSocketChannel的创建过程大体结束,再次总结一下:

  • ServerBootstrap中的ChannelFactory的实现是 ReflectiveChannelFactory
  • 生成的 Channel的具体类型是 NioServerSocketChannel
  • Channel的实例化过程其实就是调用 ChannelFactory.newChannel方法,实际上是通过反射方式进行创建的

NioServerSocketChanel的实例化过程

在前面的分析中,NioServerSocketChannel是通过反射创建的,它的构造方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

方法newSocket利用 provider.openServerSocketChannel()生成Nio中的 ServerSocketChannel对象,然后调用重载的构造器:

1
2
3
4
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

该构造器中,调用了父类的构造器,传入的参数是 SelectionKey.OP_ACCEPT,这个参数对于有Java NIO编程经验的人来说应该非常熟悉,在Java NIO中服务端要监听客户端的连接请求,就向多路复用器 Selector注册 SelectionKey.OP_ACCEPT 客户端连接事件,而Netty又是基于 Java NIO开发的,这里可见一斑。接着进入父类构造器:

1
2
3
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}

然后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}

throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}

设置当前 ServerSocketChannel为非阻塞通道,然后再次进入父类构造器 AbstractChannel(Channel parent):

1
2
3
4
5
6
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

  • parent属性设置为null
  • 初始化unsafe,用来负责底层的connect,register,read和write操作
  • 初始化pipeline,在实例化一个Channel的同时,当有事件发生的时候,pipeline负责调用相应的Handler进行处理

关于 unsafe

Netty中的unsafe不是JDK中的sun.misc.Unsafe,该unsafe实际是封装了 Java 底层 Socket的操作,因此是沟通 Netty上层和Java 底层重要的桥梁。

ChannelPipeline的初始化

每个Channel都有对应的 ChannelPipeline,当一个Channel被创建时,对应的ChannelPipeline也会自动创建,在上面分析 NioServerSocketChannel实例化过程就看到,在其父类构造器中,有初始化一个 pipeline,对应源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
.....
private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
......
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
......
/**
* Returns a new {@link DefaultChannelPipeline} instance.
*/
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}

从上面代码看到,pipeline最终被初始化为一个 DefaultChannelPipelineDefaultChannelPipelineChannelPipeline的实现类,进入它的构造方法,如下:

1
2
3
4
5
6
7
8
9
10
11
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

该构造方法有一个参数 channel,这个 channel就是我们之前传入的 NioServerSocketChannel。关于该构造器其他方法以及ChannelPipeline更详细的介绍将在后续文章分析。

NioEventLoopGroup

在我们最开始的Netty服务端代码中初始化了两个 NioEventLoopGroup,即一个处理客户端连接请求的线程池——bossGroup,一个处理客户端读写操作的线程池——workerGroup

1
2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

NioEventLoopGroup的类继承结构图如下所示:

nioeventLoopGroup

从图中可以看到,NioEventLoopGroup实现了 Executor接口,Executor框架可以用来创建线程池的,也是一个线程执行器。关于 Executor框架更加详细的介绍请参阅 《Java并发编程的艺术》

NioEventLoopGroup

看了NioEventLoopGroup的类继承结构,下面来分析一下它的初始化过程,构造器源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
......
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
......
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
......
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

上面几个重载构造器其实没做啥,最终调用父类 MultithreadEventLoopGroup 的构造器,

1
2
3
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

这里需要注意的是如果我们传入的线程数 nThreads 是 0 的话,那么Netty将会为我们设置默认的线程数 DEFAULT_EVENT_LOOP_THREADS,这个默认值是处理器核心数 * 2,如下:

1
2
3
4
5
6
7
8
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}

bossGroup这个线程池我们传入的 nThread是1,实际上在 bossGroup中只会有一个线程用于处理客户端连接请求,所以这里设置为1,而不使用默认的线程数,至于为什么只用一个线程处理连接请求还需用线程池,在Stack Overflow有相关问题的讨论。

然后回来再次进入父类MultithreadEventExecutorGroup的构造器,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
......
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
......
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}

chooser = chooserFactory.newChooser(children);

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};

for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

MultithreadEventExecutorGroup管理着 eventLoop的生命周期,它有几个变量:

  • children: EventExecutor数组,保存eventloop
  • chooser: 线程选择器,从children中选取一个 eventloop的策略

MultithreadEventExecutorGroup的构造器主要分为以下几个步骤:

  • 创建线程执行器——ThreadPerTaskExecutor
  • 调用 newChild方法初始化 children数组
  • 创建线程选择器——chooser

创建ThreadPerTaskExecutor

我们一开始初始化 NioEventLoopGroup,并没有传入 Executor参数:

1
2
3
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}

所以到父类MultithreadEventExecutorGroup构造器时,executor 为null, 然后执行:

1
2
3
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}

ThreadPerTaskExecutor是一个线程执行器,它实现了 Executor接口,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}

@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}

ThreadPerTaskExecutor 实现了 execute方法,每次通过调用execute方法执行线程任务

调用 newChild 方法初始化 children 数组

1
children[i] = newChild(executor, args);

在一个for循环里,nThread线程数是总的循环次数,通过 newChild方法初始化 EventExecutor数组的每个元素,而 newChild方法如下:

1
2
3
4
5
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

每次循环通过newChild实例化一个NioEventLoop对象。

创建线程选择器——chooser

1
2
3
4
5
6
7
8
9
10
11
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
......
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}

根据EventExecutor[]数组的大小,采用不同策略初始化一个 Chooser,如果大小为 2的幂次方则采用 PowerOfTwoEventExecutorChooser,否则使用 GenericEventExecutorChooser
无论使用哪个 chooser,它们的功能都是一样的,即从 EventExecutor[]数组中,这里也就是 NioEventLoop数组中,选择一个合适的 NioEventLoop

NioEventLoop的初始化

1
2
3
4
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

从前面的分析就可以看到,通过 newChild方法初始化 NioEventLoopGroup中的 NioEventLoop,下面来看下NioEventLoop的构造方法是怎样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}

之前在 NioEventLoopGroup的构造器中通过 SelectorProvider.provider()创建了一个 SelectorProvider,这里传递给了NioEventLoop中的provider,而NioEventLoop又通过 openSelector()方法获取一个 selector对象,实际上是通过 provideropenSelector方法。这不就是 对应Java NIO中的创建多路复用器 selector。(这里只是简单阐述NioEventLoop的构造方法,后续文章会对NioEventLoop做更加详细的分析)

Channel的注册过程

前面已经介绍了 Channel的创建和初始化过程,是在 initAndRegister方法中进行的,这个方法里还会将初始化好的 channel注册到 EventLoop线程中去

1
2
3
4
5
6
7
8
9
10
11
12
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
.....
}

ChannelFuture regFuture = config().group().register(channel);
......
}

调用config().group().register方法将 channel注册到 EventLoopGroup中去,其目的就是为了实现NIO中把ServerSocketChannel注册到 Selector中去,这样就是可以实现client请求的监听,代码如下:

1
2
3
4
5
6
7
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
......
public EventLoop next() {
return (EventLoop) super.next();
}

父类MultithreadEventExecutorGroup的next()方法,next方法使用 chooser策略从 EventExecutor[]数组中选择一个 SingleThreadEventLoop

1
2
3
4
5
6
7
public EventExecutor next() {
return chooser.next();
}
.....
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}

然后再执行 SingleThreadEventLoopregister()注册方法:

1
2
3
4
5
6
7
8
9
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
...
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

上面代码调用了 unsafe的register方法,具体是AbstractUnsafe.register,而unsafe主要用于实现底层的 rergister,read,write等操作。该register方法是:

1
2
3
4
5
6
7
8
9
10
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
......
AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
......
}
}

将 eventLoop 赋值给 Channel 的 eventLoop 属性,然后又调用了 register0()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
javaprivate void register0(ChannelPromise promise) {
try {
......
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
......
}
}

上面有个关键方法就是 doRegister(),doRegister才是最终Nio的注册方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}

通过javaChannel().register(eventLoop().unwrappedSelector(), 0, this)将 Channel对应的Java NIO ServerSocketChannel注册到 EventLoop中的Selector上,最终完成了channel向eventLoop的注册过程。

这里总结下 Channel注册过程中函数调用链:
AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register -> AbstractUnsafe.register0 -> AbstractNioChannel.doRegister()

添加 ChannelHandler

在之前的 initAndRegister()方法里,里面有个 init()方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}

final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

init()方法中设置服务端自定义的 ChannelOptions,ChannelAttrs属性和为服务端Channel创建出来的新连接的Channel设置的自定义属性ChildOptions,ChildAttrs,这里就不多叙述设置参数的问题了,重点关注 pipelineaddLast方法,该方法就是添加用于处理出站和入站数据流的 ChannelHandler,而pipeline是从 channel中获取的,之前分析过当创建channel时会自动创建一个对应的 channelPipeline

至于ChannelInitializer又是什么,来看下它的类继承结构图就知道了:

channelHandler

ChannelInitializer是一个抽象类,实现了ChannelHandler接口,它有一个抽象方法initChannel,上面代码实现了该方法并且添加了bootstrap的handler,逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
.....
//1
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
......
//2
public final ChannelHandler handler() {
return bootstrap.handler();
}
......
//3
final ChannelHandler handler() {
return handler;
}

initChannel添加的Handler就是我们服务端代码中 serverbootstrap设置的handler,如下:

1
2
3
4
5
6
7
8
9
10
11
12
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});

示例代码设置的handler为LoggingHandler,用于处理日志,这里不细说。上面的initChannel方法可以添加 Handler,这里的serverbootstrap启动类还增加了 childHandler方法,也是用来添加 handler,只不过是向已经连接的 channel客户端的 channnelpipeline添加handler

serverbootstrap.handler()设置的handler在初始化就会执行,而 serverbootstrap.childHandler()设置的childHandler在客户端连接成功才会执行

小结

由于自身知识与经验有限,对Netty的服务端启动源码分析得不是很全面,在此过程中也参考了一些大佬的Netty源码分析文章,本文如有错误之处,欢迎指出。

参考资料 & 鸣谢

Bagikan Komentar