wuxw
2019-05-30 f39cd4d82c3952f8587ebff9a7e8ede73b46bcda
java110-event/src/main/java/com/java110/event/center/DataFlowEventPublishing.java
@@ -1,13 +1,16 @@
package com.java110.event.center;
import com.alibaba.fastjson.JSONObject;
import com.java110.common.constant.CommonConstant;
import com.java110.common.constant.ResponseConstant;
import com.java110.common.exception.BusinessException;
import com.java110.common.factory.ApplicationContextFactory;
import com.java110.common.log.LoggerEngine;
import com.java110.common.util.Assert;
import com.java110.entity.center.DataFlow;
import com.java110.core.context.IOrderDataFlowContext;
import com.java110.entity.order.Business;
import com.java110.event.center.event.*;
import com.java110.event.center.listener.DataFlowListener;
import java.lang.reflect.Constructor;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
@@ -31,12 +34,12 @@
    /**
     * 保存侦听实例信息,一般启动时加载
     */
    private final static List<DataFlowListener<?>> listeners = new ArrayList<DataFlowListener<?>>();
    private final static List<String> listeners = new ArrayList<String>();
    /**
     * 保存事件实例信息,一般启动时加载
     */
    private final static Map<String,Class<DataFlowEvent>> events = new HashMap<String,Class<DataFlowEvent>>();
   /* private final static Map<String,Class<DataFlowEvent>> events = new HashMap<String,Class<DataFlowEvent>>();*/
    /**
     * 根据 事件类型查询侦听
@@ -51,15 +54,25 @@
     * 添加 侦听,这个只有启动时,单线程 处理,所以是线程安全的
     * @param listener
     */
    public static void addListenner(DataFlowListener<?> listener){
    /*public static void addListener(DataFlowListener<?> listener){
        listeners.add(listener);
    }*/
    /**
     * 注解注册侦听
     * @param listenerBeanName
     */
    public static void addListener(String listenerBeanName){
        //将 listener 放入 AppEventPublishing 中方便后期操作
        //注册侦听
        listeners.add(listenerBeanName);
    }
    /**
     * 获取侦听(全部侦听)
     * @return
     */
    public static List<DataFlowListener<?>> getListeners(){
    private static List<String> getListeners(){
        return listeners;
    }
@@ -69,9 +82,9 @@
     * @since 1.8
     * @return
     */
    public static List<DataFlowListener<?>> getListeners(String interfaceClassName){
    private static List<DataFlowListener<?>> getListeners(String interfaceClassName){
        Assert.isNull(interfaceClassName,"获取需要发布的事件处理侦听时,传递事件为空,请检查");
        Assert.hasLength(interfaceClassName,"获取需要发布的事件处理侦听时,传递事件为空,请检查");
        //先从缓存中获取,为了提升效率
        if(cacheListenersMap.containsKey(interfaceClassName)){
@@ -79,7 +92,9 @@
        }
        List<DataFlowListener<?>> dataFlowListeners = new ArrayList<DataFlowListener<?>>();
        for(DataFlowListener<?> listener : getListeners()){
        for(String listenerBeanName : getListeners()){
            DataFlowListener<?> listener = ApplicationContextFactory.getBean(listenerBeanName,DataFlowListener.class);
            Type[] types =  listener.getClass().getGenericInterfaces();
            for (Type type : types) {
                if (type instanceof ParameterizedType) {
@@ -104,9 +119,9 @@
    /**
     * 注册事件
     */
    public static void addEvent(String serviceCode ,Class<DataFlowEvent> event) {
    /*public static void addEvent(String serviceCode ,Class<DataFlowEvent> event) {
        events.put(serviceCode,event);
    }
    }*/
    /**
     * 获取事件
@@ -114,57 +129,33 @@
     * @return
     * @throws Exception
     */
    public static Class<DataFlowEvent> getEvent(String serviceCode) throws BusinessException{
    /*public static Class<DataFlowEvent> getEvent(String serviceCode) throws BusinessException{
        Class<DataFlowEvent> targetEvent = events.get(serviceCode);
        //Assert.notNull(targetEvent,"改服务未注册该事件[serviceCode = "+serviceCode+"],系统目前不支持!");
        return targetEvent;
    }
    }*/
    /**
     * 发布事件
     * @param serviceCode
     * @param dataFlow
     * @param event 事件
     */
    public static void multicastEvent(String serviceCode,DataFlow dataFlow) throws BusinessException{
   /* public static void multicastEvent(String serviceCode,DataFlow dataFlow) throws BusinessException{
        multicastEvent(serviceCode,dataFlow,null);
    }*/
    private static void multicastEvent(final DataFlowEvent event) throws BusinessException{
        multicastEvent(event,"S");
    }
    /**
     * 发布事件
     * @param serviceCode
     * @param dataFlow 这个订单信息,以便于 侦听那边需要用
     */
    public static void multicastEvent(String serviceCode,DataFlow dataFlow,String asyn) throws  BusinessException{
        try {
            Class<DataFlowEvent> dataFlowEventClass = getEvent(serviceCode);
            if(dataFlowEventClass == null){
                return ;
            }
            Class[] parameterTypes = {Object.class, DataFlow.class};
            Constructor constructor = dataFlowEventClass.getClass().getConstructor(parameterTypes);
            Object[] parameters = {null, dataFlow};
            DataFlowEvent targetDataFlowEvent = (DataFlowEvent) constructor.newInstance(parameters);
            multicastEvent(targetDataFlowEvent, asyn);
        }catch (Exception e){
            throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR,"发布侦听失败,失败原因为:"+e);
        }
    }
    /**
     * 发布事件
     * @param event
     * @param asyn A 表示异步处理
     */
    public static void multicastEvent(final DataFlowEvent event,String asyn) {
        for (final DataFlowListener<?> listener : getListeners(event.getClass().getName())) {
    private static void multicastEvent(final DataFlowEvent event, String asyn) {
            for (final DataFlowListener<?> listener :  getListeners(event.getClass().getName())) {
            if(CommonConstant.PROCESS_ORDER_ASYNCHRONOUS.equals(asyn)){ //异步处理
@@ -208,4 +199,96 @@
            throw new RuntimeException("发布侦听失败,"+listener+ event + e);
        }
    }
    /***********************************************发布侦听 开始***************************************************************/
    /**
     * 发布接受请求事件
     * @param requestData
     * @param headers
     */
    public static void receiveRequest(String requestData,Map<String,String> headers){
        multicastEvent(new ReceiveRequestEvent("",null,requestData,headers));
    }
    /**
     * 发布预校验侦听
     * @param requestData
     * @param headers
     */
    public static void preValidateData(String requestData,Map<String,String> headers){
        multicastEvent(new DataPreValidateEvent("",null,requestData,headers));
    }
    /**
     * 初始化 DataFlow 对象完成
     * @param dataFlow 数据流对象
     */
    public static void initDataFlowComplete(IOrderDataFlowContext dataFlow){
        multicastEvent(new DataFlowInitCompleteEvent("",dataFlow));
    }
    /**
     * 规则校验完成事件
     * @param dataFlow 数据流对象
     */
    public static void ruleValidateComplete(IOrderDataFlowContext dataFlow){
        multicastEvent(new RuleValidateCompleteEvent("",dataFlow));
    }
    /**
     * 加载配置文件完成
     * @param dataFlow 数据流对象
     */
    public static void loadConfigDataComplete(IOrderDataFlowContext dataFlow){
        multicastEvent(new LoadConfigDataCompleteEvent("",dataFlow));
    }
    /**
     * 调用业务系统事件
     * @param dataFlow 数据流
     */
    public static void invokeBusinessSystem(IOrderDataFlowContext dataFlow){
        multicastEvent(new InvokeBusinessSystemEvent("",dataFlow));
    }
    /**
     * 调用业务系统成功后事件
     * @param dataFlow
     * @param business 成功的事件业务数据封装对象
     */
    public static void invokeBusinessBSuccess(IOrderDataFlowContext dataFlow, Business business, JSONObject businessResponseData){
        multicastEvent(new InvokeBusinessBSuccessEvent("",dataFlow,business,businessResponseData));
    }
    /**
     * 调用业务系统成功后事件
     * @param dataFlow
     * @param business 成功的事件业务数据封装对象
     */
    public static void invokeBusinessBSuccess(IOrderDataFlowContext dataFlow, Business business){
        multicastEvent(new InvokeBusinessBSuccessEvent("",dataFlow,business));
    }
    /**
     * 调用业务系统成功后事件
     * @param dataFlow
     * @param business 成功的事件业务数据封装对象
     */
    public static void invokeBusinessISuccess(IOrderDataFlowContext dataFlow, Business business){
        multicastEvent(new InvokeBusinessISuccessEvent("",dataFlow,business));
    }
    /**
     * 数据返回事件
     * @param dataFlow 数据流
     */
    public static void dataResponse(IOrderDataFlowContext dataFlow,String responseData,Map<String,String> headers){
        multicastEvent(new DataResponseEvent("",dataFlow,responseData,headers));
    }
    /***********************************************发布侦听 结束***************************************************************/
}