wuxw
2019-04-25 d4e1929dcab147030d3bcae89b1801250fd6a5da
java110-event/src/main/java/com/java110/event/service/api/ServiceDataFlowEventPublishing.java
@@ -2,7 +2,9 @@
import com.java110.common.constant.CommonConstant;
import com.java110.common.constant.ResponseConstant;
import com.java110.common.constant.ServiceCodeConstant;
import com.java110.common.exception.BusinessException;
import com.java110.common.exception.ListenerExecuteException;
import com.java110.common.factory.ApplicationContextFactory;
import com.java110.common.log.LoggerEngine;
import com.java110.common.util.Assert;
@@ -11,6 +13,7 @@
import com.java110.event.center.DataFlowListenerOrderComparator;
import com.java110.event.service.BusinessServiceDataFlowEvent;
import com.java110.event.service.BusinessServiceDataFlowListener;
import org.springframework.http.HttpMethod;
import java.util.ArrayList;
import java.util.HashMap;
@@ -62,19 +65,26 @@
     * @since 1.8
     * @return
     */
    public static List<ServiceDataFlowListener> getListeners(String serviceCode){
    public static List<ServiceDataFlowListener> getListeners(String serviceCode,String httpMethod){
        Assert.hasLength(serviceCode,"获取需要发布的事件处理侦听时,传递事件为空,请检查");
        String needCachedServiceCode = serviceCode+httpMethod;
        //先从缓存中获取,为了提升效率
        if(cacheListenersMap.containsKey(serviceCode)){
            return cacheListenersMap.get(serviceCode);
        if(cacheListenersMap.containsKey(needCachedServiceCode)){
            return cacheListenersMap.get(needCachedServiceCode);
        }
        List<ServiceDataFlowListener> dataFlowListeners = new ArrayList<ServiceDataFlowListener>();
        for(String listenerBeanName : getListeners()){
            ServiceDataFlowListener listener = ApplicationContextFactory.getBean(listenerBeanName,ServiceDataFlowListener.class);
            if(serviceCode.equals(listener.getServiceCode())){
            if(serviceCode.equals(listener.getServiceCode())
                    && listener.getHttpMethod() == HttpMethod.valueOf(httpMethod)){
                dataFlowListeners.add(listener);
            }
            //特殊处理 透传类接口
            if(ServiceCodeConstant.SERVICE_CODE_DO_SERVICE_TRANSFER.equals(listener.getServiceCode())
                    && ServiceCodeConstant.SERVICE_CODE_DO_SERVICE_TRANSFER.equals(serviceCode)){
                dataFlowListeners.add(listener);
            }
        }
@@ -83,7 +93,7 @@
        DataFlowListenerOrderComparator.sort(dataFlowListeners);
        //将数据放入缓存中
        cacheListenersMap.put(serviceCode,dataFlowListeners);
        cacheListenersMap.put(needCachedServiceCode,dataFlowListeners);
        return dataFlowListeners;
    }
@@ -118,7 +128,8 @@
            multicastEvent(serviceCode,targetDataFlowEvent, asyn);
        }catch (Exception e){
            throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR,"发布侦听失败,失败原因为:"+e);
            logger.error("发布侦听失败,失败原因为:",e);
            throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR,"发布侦听失败,失败原因为:"+e.getMessage());
        }
    }
@@ -130,7 +141,14 @@
     * @param asyn A 表示异步处理
     */
    public static void multicastEvent(String serviceCode,final ServiceDataFlowEvent event, String asyn) {
        for (final ServiceDataFlowListener listener : getListeners(serviceCode)) {
        String httpMethod = event.getDataFlowContext().getRequestCurrentHeaders().get(CommonConstant.HTTP_METHOD);
        List<ServiceDataFlowListener> listeners = getListeners(serviceCode,httpMethod);
        //这里判断 serviceCode + httpMethod 的侦听,如果没有注册直接报错。
        if(listeners == null || listeners.size() == 0){
            throw new ListenerExecuteException(ResponseConstant.RESULT_CODE_ERROR,
                    "服务【" + serviceCode + "】调用方式【"+httpMethod+"】当前不支持");
        }
        for (final ServiceDataFlowListener listener : listeners) {
            if(CommonConstant.PROCESS_ORDER_ASYNCHRONOUS.equals(asyn)){ //异步处理
@@ -150,6 +168,8 @@
        }
    }
    /**
     * Return the current task executor for this multicaster.
     */