java110-event/src/main/java/com/java110/event/service/api/ServiceDataFlowEventPublishing.java
@@ -1,16 +1,17 @@
package com.java110.event.service.api;
import com.java110.common.constant.CommonConstant;
import com.java110.common.constant.ResponseConstant;
import com.java110.common.exception.BusinessException;
import com.java110.common.factory.ApplicationContextFactory;
import com.java110.common.log.LoggerEngine;
import com.java110.common.util.Assert;
import com.java110.utils.constant.CommonConstant;
import com.java110.utils.constant.ResponseConstant;
import com.java110.utils.constant.ServiceCodeConstant;
import com.java110.utils.exception.BusinessException;
import com.java110.utils.exception.ListenerExecuteException;
import com.java110.utils.factory.ApplicationContextFactory;
import com.java110.utils.log.LoggerEngine;
import com.java110.utils.util.Assert;
import com.java110.core.context.DataFlowContext;
import com.java110.entity.center.AppService;
import com.java110.event.center.DataFlowListenerOrderComparator;
import com.java110.event.service.BusinessServiceDataFlowEvent;
import com.java110.event.service.BusinessServiceDataFlowListener;
import org.springframework.http.HttpMethod;
import java.util.ArrayList;
import java.util.HashMap;
@@ -28,53 +29,63 @@
    private static Executor taskExecutor;
    //默认 线程数 100
    private final static int DEFAULT_THREAD_NUM = 100;
    private static final int DEFAULT_THREAD_NUM = 100;
    /**
     * 保存侦听实例信息,一般启动时加载
     */
    private final static List<String> listeners = new ArrayList<String>();
    private static final List<String> listeners = new ArrayList<String>();
    /**
     * 根据 事件类型查询侦听
     */
    private final static Map<String,List<ServiceDataFlowListener>> cacheListenersMap = new HashMap<String, List<ServiceDataFlowListener>>();
    private static final Map<String, List<ServiceDataFlowListener>> cacheListenersMap = new HashMap<String, List<ServiceDataFlowListener>>();
    /**
     * 添加 侦听,这个只有启动时,单线程 处理,所以是线程安全的
     *
     * @param listener
     */
    public static void addListener(String listener){
    public static void addListener(String listener) {
        listeners.add(listener);
    }
    /**
     * 获取侦听(全部侦听)
     *
     * @return
     */
    public static List<String> getListeners(){
    public static List<String> getListeners() {
        return listeners;
    }
    /**
     * 根据是否实现了某个接口,返回侦听
     *
     * @param serviceCode
     * @since 1.8
     * @return
     * @since 1.8
     */
    public static List<ServiceDataFlowListener> getListeners(String serviceCode){
    public static List<ServiceDataFlowListener> getListeners(String serviceCode, String httpMethod) {
        Assert.hasLength(serviceCode,"获取需要发布的事件处理侦听时,传递事件为空,请检查");
        Assert.hasLength(serviceCode, "获取需要发布的事件处理侦听时,传递事件为空,请检查");
        String needCachedServiceCode = serviceCode + httpMethod;
        //先从缓存中获取,为了提升效率
        if(cacheListenersMap.containsKey(serviceCode)){
            return cacheListenersMap.get(serviceCode);
        if (cacheListenersMap.containsKey(needCachedServiceCode)) {
            return cacheListenersMap.get(needCachedServiceCode);
        }
        List<ServiceDataFlowListener> dataFlowListeners = new ArrayList<ServiceDataFlowListener>();
        for(String listenerBeanName : getListeners()){
            ServiceDataFlowListener listener = ApplicationContextFactory.getBean(listenerBeanName,ServiceDataFlowListener.class);
            if(serviceCode.equals(listener.getServiceCode())){
        for (String listenerBeanName : getListeners()) {
            ServiceDataFlowListener listener = ApplicationContextFactory.getBean(listenerBeanName, ServiceDataFlowListener.class);
            if (serviceCode.equals(listener.getServiceCode())
                    && listener.getHttpMethod() == HttpMethod.valueOf(httpMethod)) {
                dataFlowListeners.add(listener);
            }
            //特殊处理 透传类接口
            if (ServiceCodeConstant.SERVICE_CODE_DO_SERVICE_TRANSFER.equals(listener.getServiceCode())
                    && ServiceCodeConstant.SERVICE_CODE_DO_SERVICE_TRANSFER.equals(serviceCode)) {
                dataFlowListeners.add(listener);
            }
        }
@@ -82,43 +93,50 @@
        //这里排序
        DataFlowListenerOrderComparator.sort(dataFlowListeners);
        //将数据放入缓存中
        cacheListenersMap.put(serviceCode,dataFlowListeners);
        if (dataFlowListeners.size() > 0) {
            cacheListenersMap.put(needCachedServiceCode, dataFlowListeners);
        }
        return dataFlowListeners;
    }
    /**
     * 发布事件
     *
     * @param dataFlowContext
     */
    public static void multicastEvent(DataFlowContext dataFlowContext,AppService appService) throws BusinessException{
        Assert.notNull(dataFlowContext.getServiceCode(),"当前没有可处理的业务信息!");
        multicastEvent(dataFlowContext.getServiceCode(),dataFlowContext,appService,null);
    public static void multicastEvent(DataFlowContext dataFlowContext, AppService appService) throws BusinessException {
        Assert.notNull(dataFlowContext.getServiceCode(), "当前没有可处理的业务信息!");
        multicastEvent(dataFlowContext.getServiceCode(), dataFlowContext, appService, null);
    }
    /**
     * 发布事件
     *
     * @param serviceCode
     * @param dataFlowContext
     */
    public static void multicastEvent(String serviceCode,DataFlowContext dataFlowContext,AppService appService) throws BusinessException{
        multicastEvent(serviceCode,dataFlowContext,appService,null);
    public static void multicastEvent(String serviceCode, DataFlowContext dataFlowContext, AppService appService) throws BusinessException {
        multicastEvent(serviceCode, dataFlowContext, appService, null);
    }
    /**
     * 发布事件
     *
     * @param serviceCode
     * @param dataFlowContext 这个订单信息,以便于 侦听那边需要用
     */
    public static void multicastEvent(String serviceCode, DataFlowContext dataFlowContext, AppService appService, String asyn) throws  BusinessException{
    public static void multicastEvent(String serviceCode, DataFlowContext dataFlowContext, AppService appService, String asyn) throws BusinessException {
        try {
            ServiceDataFlowEvent targetDataFlowEvent = new ServiceDataFlowEvent(serviceCode,dataFlowContext,appService);
            ServiceDataFlowEvent targetDataFlowEvent = new ServiceDataFlowEvent(serviceCode, dataFlowContext, appService);
            multicastEvent(serviceCode,targetDataFlowEvent, asyn);
        }catch (Exception e){
            throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR,"发布侦听失败,失败原因为:"+e);
            multicastEvent(serviceCode, targetDataFlowEvent, asyn);
        } catch (Exception e) {
            logger.error("发布侦听失败,失败原因为:", e);
            throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR, "发布侦听失败,失败原因为:" + e.getMessage());
        }
    }
@@ -126,13 +144,21 @@
    /**
     * 发布事件
     *
     * @param event
     * @param asyn A 表示异步处理
     * @param asyn  A 表示异步处理
     */
    public static void multicastEvent(String serviceCode,final ServiceDataFlowEvent event, String asyn) {
        for (final ServiceDataFlowListener listener : getListeners(serviceCode)) {
    public static void multicastEvent(String serviceCode, final ServiceDataFlowEvent event, String asyn) {
        String httpMethod = event.getDataFlowContext().getRequestCurrentHeaders().get(CommonConstant.HTTP_METHOD);
        List<ServiceDataFlowListener> listeners = getListeners(serviceCode, httpMethod);
        //这里判断 serviceCode + httpMethod 的侦听,如果没有注册直接报错。
        if (listeners == null || listeners.size() == 0) {
            throw new ListenerExecuteException(ResponseConstant.RESULT_CODE_ERROR,
                    "服务【" + serviceCode + "】调用方式【" + httpMethod + "】当前不支持");
        }
        for (final ServiceDataFlowListener listener : listeners) {
            if(CommonConstant.PROCESS_ORDER_ASYNCHRONOUS.equals(asyn)){ //异步处理
            if (CommonConstant.PROCESS_ORDER_ASYNCHRONOUS.equals(asyn)) { //异步处理
                Executor executor = getTaskExecutor();
                executor.execute(new Runnable() {
@@ -142,19 +168,19 @@
                    }
                });
                break;
            }
            else {
            } else {
                invokeListener(listener, event);
                break;
            }
        }
    }
    /**
     * Return the current task executor for this multicaster.
     */
    protected static synchronized Executor getTaskExecutor() {
        if(taskExecutor == null) {
        if (taskExecutor == null) {
            taskExecutor = Executors.newFixedThreadPool(DEFAULT_THREAD_NUM);
        }
        return taskExecutor;
@@ -162,17 +188,18 @@
    /**
     * Invoke the given listener with the given event.
     *
     * @param listener the ApplicationListener to invoke
     * @param event the current event to propagate
     * @param event    the current event to propagate
     * @since 4.1
     */
    @SuppressWarnings({"unchecked", "rawtypes"})
    protected static void invokeListener(ServiceDataFlowListener listener, ServiceDataFlowEvent event) {
        try {
            listener.soService(event);
        }catch (Exception e){
            LoggerEngine.error("发布侦听失败",e);
            throw new RuntimeException("发布侦听失败,"+listener+ event + e);
        } catch (Exception e) {
            LoggerEngine.error("发布侦听失败", e);
            throw new RuntimeException("发布侦听失败," + listener + event + e);
        }
    }
}