package link.jfire.socket.socketserver.bus;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import link.jfire.baseutil.simplelog.ConsoleLogFactory;
import link.jfire.baseutil.simplelog.Logger;
import link.jfire.socket.socketserver.handler.MessageAction;
import link.jfire.socket.socketserver.handler.MessageHandler;
import link.jfire.socket.socketserver.interceptor.MessageInterceptor;

@Resource
/* loaded from: input_file:link/jfire/socket/socketserver/bus/LocalHandlerCenter.class */
public class LocalHandlerCenter implements MessageHandlerCenter {
    private MessageAction messageAction;

    @Resource
    private List<MessageHandler> handlerList = new ArrayList();

    @Resource
    private List<MessageInterceptor> interceptorList = new ArrayList();
    private BlockingQueue<Message> waitList = new LinkedBlockingQueue();
    private int taskLimit = Runtime.getRuntime().availableProcessors() * 2;
    private int runningTask = 0;
    private ExecutorService threadPool = null;
    private ExecutorCompletionService<Void> executorPool = null;
    private Logger logger = ConsoleLogFactory.getLogger();

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                this.messageAction.addMessage(this.waitList.take());
                this.executorPool.submit(this.messageAction);
                this.runningTask++;
                while (this.executorPool.poll() != null) {
                    this.runningTask--;
                }
                if (this.runningTask >= this.taskLimit) {
                    this.executorPool.take();
                    this.runningTask--;
                }
            } catch (InterruptedException e) {
                this.logger.info("{}被中断", new Object[]{Thread.currentThread().getName()});
                return;
            } catch (Exception e2) {
                this.logger.error("{}异常", new Object[]{Thread.currentThread().getName(), e2});
                return;
            }
        }
    }

    @Override // link.jfire.socket.socketserver.bus.MessageHandlerCenter
    public void offerMessage(Message message) {
        this.waitList.offer(message);
    }

    @Override // link.jfire.socket.socketserver.bus.MessageHandlerCenter
    public void stop() {
        this.threadPool.shutdownNow();
        try {
            this.threadPool.awaitTermination(50L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // link.jfire.socket.socketserver.bus.MessageHandlerCenter
    public void init() {
        this.messageAction = new MessageAction((MessageInterceptor[]) this.interceptorList.toArray(new MessageInterceptor[0]), (MessageHandler[]) this.handlerList.toArray(new MessageHandler[0]));
        this.threadPool = Executors.newFixedThreadPool(this.taskLimit, new ThreadFactory() { // from class: link.jfire.socket.socketserver.bus.LocalHandlerCenter.1
            private int i = 1;

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, "业务处理线程-" + this.i);
                this.i++;
                return thread;
            }
        });
        this.executorPool = new ExecutorCompletionService<>(this.threadPool);
    }

    @Override // link.jfire.socket.socketserver.bus.MessageHandlerCenter
    public List<MessageHandler> getHandlerList() {
        return this.handlerList;
    }

    @Override // link.jfire.socket.socketserver.bus.MessageHandlerCenter
    public List<MessageInterceptor> getInterceptorList() {
        return this.interceptorList;
    }
}
