前言
本文主要分析 Netty服务端的启动,以便对Netty框架有一个基本的认识,我用的Netty版本是 netty-4.1.29
,之前的文章Netty 系列文章之基本组件概览
对Netty的基本组件做了一个简单的介绍,算是对本文分析Netty服务端的启动做一个基础铺垫
服务端代码
该源码出自 netty官方提供的 服务端demo,详细地址: https://github.com/netty/netty/blob/4.1/example/src/main/java/io/netty/example/echo/EchoServer.java
我做了一点小改动,代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端启动流程概览:
- 创建
ServerBootStrap
启动类 - 设置并绑定
NioEventLoopGroup
线程池 - 创建服务端
Channel
- 添加并设置
ChannelHandler
- 绑定并启动监听端口
在之前的文章我就提到过,ServerBootstrap是Netty服务端的启动辅助类,帮助Netty服务端完成初始化,下面我将深入代码,仔细分析Netty服务端启动过程中各组件的创建与初始化
Channel的创建和初始化过程
Channel是Netty的网络操作抽象类,对应于JDK底层的 Socket,Netty服务端的Channel类型是 NioServerSocketChannel。下面来分析NioServerSocketChannel的创建和初始化
NioServerSocketChannel的创建
NioServerSocketChannel
的创建实际上是从 ServerBootStrap
的bind()
方法开始的,进入bind()
源码分析(AbstractBootstrap
的bind()方法):1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31......
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
......
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
......
}
在源码里注意到一个 initAndRegister()
方法,这个方法就负责 NioServerSocketChannel
的初始化和注册操作,走进initAndRegister()
方法,如下图所示:
从上图可以看出,源码里是调用 channelFactory.newChannel()
来创建 channel , 走进ChannelFactory
发现该接口被 @Deprecated
注解标注了,说明是一个过时的接口:1
2
3
4
5
6
7
public interface ChannelFactory<T extends Channel> {
/**
* Creates a new channel.
*/
T newChannel();
}
我用的Netty版本是 Netty-4.1.29
,其Netty API 文档 中介绍 io.netty.bootstrap.ChannelFactory
提到用 io.netty.channel.ChannelFactory
代替。
这里 ChannelFactory
只是一个工厂接口,真正创建 Channel
的是ReflectiveChannelFactory类,它是ChannelFactory
的一个重要实现类,该类通过反射方式创建 Channel
,源代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}
其中 newChannel()
方法通过 clazz.getConstructor().newInstance()
来创建 Channel,即通过反射方式来创建 Channel,而这个 clazz
就是 通过ServerBootStrap
的 channel
方法传入的,最开始的服务端代码传入的NioServerSocketChannel,所以对应通过反射创建了NioServerSocketChannel,并且 ChannelFactory
的初始化也是在该方法中进行的,代码如下:1
2
3
4
5
6public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
到此,NioServerSocketChannel
的创建过程大体结束,再次总结一下:
ServerBootstrap
中的ChannelFactory
的实现是ReflectiveChannelFactory
- 生成的 Channel的具体类型是
NioServerSocketChannel
Channel
的实例化过程其实就是调用ChannelFactory.newChannel
方法,实际上是通过反射方式进行创建的
NioServerSocketChanel的实例化过程
在前面的分析中,NioServerSocketChannel是通过反射创建的,它的构造方法如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
方法newSocket利用 provider.openServerSocketChannel()
生成Nio中的 ServerSocketChannel
对象,然后调用重载的构造器:1
2
3
4public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
该构造器中,调用了父类的构造器,传入的参数是 SelectionKey.OP_ACCEPT
,这个参数对于有Java NIO编程经验的人来说应该非常熟悉,在Java NIO中服务端要监听客户端的连接请求,就向多路复用器 Selector
注册 SelectionKey.OP_ACCEPT
客户端连接事件,而Netty又是基于 Java NIO开发的,这里可见一斑。接着进入父类构造器:1
2
3protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
然后:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
设置当前 ServerSocketChannel为非阻塞通道,然后再次进入父类构造器 AbstractChannel(Channel parent)
:1
2
3
4
5
6protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
- parent属性设置为null
- 初始化unsafe,用来负责底层的connect,register,read和write操作
- 初始化pipeline,在实例化一个Channel的同时,当有事件发生的时候,pipeline负责调用相应的Handler进行处理
关于 unsafe
Netty中的unsafe
不是JDK中的sun.misc.Unsafe
,该unsafe
实际是封装了 Java 底层 Socket的操作,因此是沟通 Netty上层和Java 底层重要的桥梁。
ChannelPipeline的初始化
每个Channel都有对应的 ChannelPipeline
,当一个Channel被创建时,对应的ChannelPipeline也会自动创建,在上面分析 NioServerSocketChannel
实例化过程就看到,在其父类构造器中,有初始化一个 pipeline
,对应源代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
.....
private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
......
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
......
/**
* Returns a new {@link DefaultChannelPipeline} instance.
*/
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
从上面代码看到,pipeline最终被初始化为一个 DefaultChannelPipeline
,DefaultChannelPipeline
是ChannelPipeline
的实现类,进入它的构造方法,如下:1
2
3
4
5
6
7
8
9
10
11protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
该构造方法有一个参数 channel
,这个 channel
就是我们之前传入的 NioServerSocketChannel
。关于该构造器其他方法以及ChannelPipeline
更详细的介绍将在后续文章分析。
NioEventLoopGroup
在我们最开始的Netty服务端代码中初始化了两个 NioEventLoopGroup
,即一个处理客户端连接请求的线程池——bossGroup
,一个处理客户端读写操作的线程池——workerGroup
:1
2EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
NioEventLoopGroup
的类继承结构图如下所示:
从图中可以看到,NioEventLoopGroup
实现了 Executor
接口,Executor
框架可以用来创建线程池的,也是一个线程执行器。关于 Executor
框架更加详细的介绍请参阅 《Java并发编程的艺术》
NioEventLoopGroup
看了NioEventLoopGroup
的类继承结构,下面来分析一下它的初始化过程,构造器源代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17......
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
......
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
......
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
上面几个重载构造器其实没做啥,最终调用父类 MultithreadEventLoopGroup 的构造器,1
2
3protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
这里需要注意的是如果我们传入的线程数 nThreads
是 0 的话,那么Netty将会为我们设置默认的线程数 DEFAULT_EVENT_LOOP_THREADS
,这个默认值是处理器核心数 * 2,如下:1
2
3
4
5
6
7
8static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
在 bossGroup
这个线程池我们传入的 nThread
是1,实际上在 bossGroup
中只会有一个线程用于处理客户端连接请求,所以这里设置为1,而不使用默认的线程数,至于为什么只用一个线程处理连接请求还需用线程池,在Stack Overflow有相关问题的讨论。
然后回来再次进入父类MultithreadEventExecutorGroup的构造器,1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66......
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
......
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
MultithreadEventExecutorGroup
管理着 eventLoop
的生命周期,它有几个变量:
- children: EventExecutor数组,保存eventloop
- chooser: 线程选择器,从children中选取一个 eventloop的策略
MultithreadEventExecutorGroup
的构造器主要分为以下几个步骤:
- 创建线程执行器——
ThreadPerTaskExecutor
- 调用
newChild
方法初始化children
数组 - 创建线程选择器——
chooser
创建ThreadPerTaskExecutor
我们一开始初始化 NioEventLoopGroup
,并没有传入 Executor
参数:1
2
3public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
所以到父类MultithreadEventExecutorGroup
构造器时,executor 为null, 然后执行:1
2
3if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
ThreadPerTaskExecutor是一个线程执行器,它实现了 Executor
接口,1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
ThreadPerTaskExecutor 实现了 execute
方法,每次通过调用execute
方法执行线程任务
调用 newChild 方法初始化 children 数组
1 | children[i] = newChild(executor, args); |
在一个for循环里,nThread线程数是总的循环次数,通过 newChild方法初始化 EventExecutor数组的每个元素,而 newChild
方法如下:1
2
3
4
5
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
每次循环通过newChild实例化一个NioEventLoop
对象。
创建线程选择器——chooser
1 | public EventExecutorChooser newChooser(EventExecutor[] executors) { |
根据EventExecutor[]
数组的大小,采用不同策略初始化一个 Chooser,如果大小为 2的幂次方则采用 PowerOfTwoEventExecutorChooser
,否则使用 GenericEventExecutorChooser
。
无论使用哪个 chooser,它们的功能都是一样的,即从 EventExecutor[]
数组中,这里也就是 NioEventLoop
数组中,选择一个合适的 NioEventLoop
。
NioEventLoop的初始化
1 | protected EventLoop newChild(Executor executor, Object... args) throws Exception { |
从前面的分析就可以看到,通过 newChild方法初始化 NioEventLoopGroup
中的 NioEventLoop
,下面来看下NioEventLoop
的构造方法是怎样的:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
之前在 NioEventLoopGroup
的构造器中通过 SelectorProvider.provider()
创建了一个 SelectorProvider
,这里传递给了NioEventLoop中的provider,而NioEventLoop
又通过 openSelector()
方法获取一个 selector对象,实际上是通过 provider
的openSelector
方法。这不就是 对应Java NIO中的创建多路复用器 selector。(这里只是简单阐述NioEventLoop的构造方法,后续文章会对NioEventLoop做更加详细的分析)
Channel的注册过程
前面已经介绍了 Channel的创建和初始化过程,是在 initAndRegister
方法中进行的,这个方法里还会将初始化好的 channel注册到 EventLoop
线程中去1
2
3
4
5
6
7
8
9
10
11
12final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
.....
}
ChannelFuture regFuture = config().group().register(channel);
......
}
调用config().group().register
方法将 channel注册到 EventLoopGroup
中去,其目的就是为了实现NIO中把ServerSocketChannel注册到 Selector中去,这样就是可以实现client请求的监听,代码如下:1
2
3
4
5
6
7public ChannelFuture register(Channel channel) {
return next().register(channel);
}
......
public EventLoop next() {
return (EventLoop) super.next();
}
父类MultithreadEventExecutorGroup的next()方法,next方法使用 chooser策略从 EventExecutor[]
数组中选择一个 SingleThreadEventLoop
:1
2
3
4
5
6
7public EventExecutor next() {
return chooser.next();
}
.....
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
然后再执行 SingleThreadEventLoop
的 register()
注册方法:1
2
3
4
5
6
7
8
9public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
...
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
上面代码调用了 unsafe的register方法,具体是AbstractUnsafe.register
,而unsafe主要用于实现底层的 rergister,read,write等操作。该register
方法是:1
2
3
4
5
6
7
8
9
10public final void register(EventLoop eventLoop, final ChannelPromise promise) {
......
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
......
}
}
将 eventLoop 赋值给 Channel 的 eventLoop 属性,然后又调用了 register0()方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21javaprivate void register0(ChannelPromise promise) {
try {
......
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
......
}
}
上面有个关键方法就是 doRegister()
,doRegister
才是最终Nio的注册方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
通过javaChannel().register(eventLoop().unwrappedSelector(), 0, this)
将 Channel对应的Java NIO ServerSocketChannel注册到 EventLoop
中的Selector上,最终完成了channel向eventLoop的注册过程。
这里总结下 Channel注册过程中函数调用链:
AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register -> AbstractUnsafe.register0 -> AbstractNioChannel.doRegister()
添加 ChannelHandler
在之前的 initAndRegister()
方法里,里面有个 init()
方法,如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
"unchecked") (
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
在init()
方法中设置服务端自定义的 ChannelOptions,ChannelAttrs
属性和为服务端Channel创建出来的新连接的Channel设置的自定义属性ChildOptions,ChildAttrs
,这里就不多叙述设置参数的问题了,重点关注 pipeline
的 addLast
方法,该方法就是添加用于处理出站和入站数据流的 ChannelHandler,而pipeline
是从 channel
中获取的,之前分析过当创建channel
时会自动创建一个对应的 channelPipeline
。
至于ChannelInitializer
又是什么,来看下它的类继承结构图就知道了:
ChannelInitializer
是一个抽象类,实现了ChannelHandler
接口,它有一个抽象方法initChannel
,上面代码实现了该方法并且添加了bootstrap
的handler,逻辑如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16.....
//1
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
......
//2
public final ChannelHandler handler() {
return bootstrap.handler();
}
......
//3
final ChannelHandler handler() {
return handler;
}
initChannel
添加的Handler
就是我们服务端代码中 serverbootstrap
设置的handler
,如下:1
2
3
4
5
6
7
8
9
10
11
12b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
示例代码设置的handler为LoggingHandler
,用于处理日志,这里不细说。上面的initChannel
方法可以添加 Handler
,这里的serverbootstrap
启动类还增加了 childHandler方法,也是用来添加 handler,只不过是向已经连接的 channel客户端的 channnelpipeline
添加handler
。
serverbootstrap.handler()
设置的handler
在初始化就会执行,而 serverbootstrap.childHandler()
设置的childHandler
在客户端连接成功才会执行
小结
由于自身知识与经验有限,对Netty的服务端启动源码分析得不是很全面,在此过程中也参考了一些大佬的Netty源码分析文章,本文如有错误之处,欢迎指出。