Netty:EventLoopGroup
http://blog.csdn.net/bdmh/article/details/49945765
Group:群组,Loop:循环,Event:事件,这几个东西联在一起,相比大家也大概明白它的用途了。
Netty内部都是通过线程在处理各种数据,EventLoopGroup就是用来管理调度他们的,注册Channel,管理他们的生命周期,下面就来看看EventLoopGroup是怎样工作的。
在,当我们启动客户端或者服务端时,都要声明一个Group对象
- EventLoopGroup group = new NioEventLoopGroup();
这里我们就以NioEventLoopGroup来说明。先看一下它的继承关系
- NioEventLoopGroup extends MultithreadEventLoopGroup extends MultithreadEventExecutorGroup
看看NioEventLoopGroup的构造函数
- public NioEventLoopGroup() {
- this(0);
- }
- //他会连续调用内部的构造函数,直到用下面的构造去执行父类的构造
- //nThreads此时为0,马上就会提到这个参数的用处
- public NioEventLoopGroup(
- int nThreads, Executor executor, final SelectorProvider selectorProvider) {
- super(nThreads, executor, selectorProvider);
- }
基类MultithreadEventLoopGroup的构造
- protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
- super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
- }
- //nThreads:内部线程数,如果为0,就取默认值,通常我们会设置为处理器个数*2
- DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
- "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
继续调用再上一级的MultithreadEventExecutorGroup的构造
- //这里会根据nThreads创建执行者数组
- private final EventExecutor[] children;
- protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
- if (nThreads <= 0) {
- throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
- }
- if (executor == null) {
- executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
- }
- //这里创建EventExecutor数组对象
- children = new EventExecutor[nThreads];
- if (isPowerOfTwo(children.length)) {
- chooser = new PowerOfTwoEventExecutorChooser();
- } else {
- chooser = new GenericEventExecutorChooser();
- }
- //此处循环children数组,来创建内部的NioEventLoop对象
- for (int i = 0; i < nThreads; i ++) {
- boolean success = false;
- try {
- //newChild是abstract方法,运行期会执行具体的实例对象的重载
- children[i] = newChild(executor, args);
- success = true;
- } catch (Exception e) {
- // TODO: Think about if this is a good exception type
- throw new IllegalStateException("failed to create a child event loop", e);
- } finally {
- //如果没有成功,做关闭处理
- if (!success) {
- for (int j = 0; j < i; j ++) {
- children[j].shutdownGracefully();
- }
- for (int j = 0; j < i; j ++) {
- EventExecutor e = children[j];
- try {
- while (!e.isTerminated()) {
- e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
- }
- } catch (InterruptedException interrupted) {
- // Let the caller handle the interruption.
- Thread.currentThread().interrupt();
- break;
- }
- }
- }
- }
- }
- ......
- }
因为我们最初创建的是NioEventLoopGroup对象,所以newChild会执行NioEventLoopGroup的newChild方法,创建NioEventLoop对象。
- @Override
- protected EventLoop newChild(Executor executor, Object... args) throws Exception {
- return new NioEventLoop(this, executor, (SelectorProvider) args[0]);
- }
看看NioEventLoop的继承关系
- NioEventLoop extends SingleThreadEventLoop extends SingleThreadEventExecutor
NioEventLoop通过自己的构造行数,一直调用到SingleThreadEventExecutor的构造
- protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
- super(parent);
- if (executor == null) {
- throw new NullPointerException("executor");
- }
- this.addTaskWakesUp = addTaskWakesUp;
- this.executor = executor;
- taskQueue = newTaskQueue();
- }
至此,Group和内部的Loop对象以及Executor就创建完毕,那么他们是什么时候被调用的呢?就是在服务端bind和客户端connect时。
服务端的bind方法执行的是AbstractBootstrap的bind方法,bind方法中先会调用validate()方法检查是否有group
- @SuppressWarnings("unchecked")
- public B validate() {
- if (group == null) {
- throw new IllegalStateException("group not set");
- }
- if (channelFactory == null) {
- throw new IllegalStateException("channel or channelFactory not set");
- }
- return (B) this;
- }
所以,假如初始时,没有设置Bootstrap的group的话,就会报错。
最终bind调用doBind,然后调用doBind0,启动一个Runnable线程
- private static void doBind0(
- final ChannelFuture regFuture, final Channel channel,
- final SocketAddress localAddress, final ChannelPromise promise) {
- // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
- // the pipeline in its channelRegistered() implementation.
- channel.eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- if (regFuture.isSuccess()) {
- channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- } else {
- promise.setFailure(regFuture.cause());
- }
- }
- });
- }
我们这里,channel.eventLoop()得到的是NioEventLoop对象,所以执行NioEventLoop的run方法,开始线程运行。
我们在创建Bootstrap初期,会调用它的group方法,绑定一个group,这样,一个循环就开始运行了。