| | |
| | | package com.java110.event.center; |
| | | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.java110.common.constant.CommonConstant; |
| | | import com.java110.common.constant.ResponseConstant; |
| | | import com.java110.common.exception.BusinessException; |
| | | import com.java110.common.factory.ApplicationContextFactory; |
| | | import com.java110.common.log.LoggerEngine; |
| | | import com.java110.common.util.Assert; |
| | | import com.java110.entity.center.DataFlow; |
| | | import com.java110.core.context.IOrderDataFlowContext; |
| | | import com.java110.entity.order.Business; |
| | | import com.java110.event.center.event.*; |
| | | import com.java110.event.center.listener.DataFlowListener; |
| | | |
| | | import java.lang.reflect.Constructor; |
| | | import java.lang.reflect.ParameterizedType; |
| | | import java.lang.reflect.Type; |
| | | import java.util.ArrayList; |
| | |
| | | /** |
| | | * 保存侦听实例信息,一般启动时加载 |
| | | */ |
| | | private final static List<DataFlowListener<?>> listeners = new ArrayList<DataFlowListener<?>>(); |
| | | private final static List<String> listeners = new ArrayList<String>(); |
| | | |
| | | /** |
| | | * 保存事件实例信息,一般启动时加载 |
| | | */ |
| | | private final static Map<String,Class<DataFlowEvent>> events = new HashMap<String,Class<DataFlowEvent>>(); |
| | | /* private final static Map<String,Class<DataFlowEvent>> events = new HashMap<String,Class<DataFlowEvent>>();*/ |
| | | |
| | | /** |
| | | * 根据 事件类型查询侦听 |
| | |
| | | * 添加 侦听,这个只有启动时,单线程 处理,所以是线程安全的 |
| | | * @param listener |
| | | */ |
| | | public static void addListenner(DataFlowListener<?> listener){ |
| | | /*public static void addListener(DataFlowListener<?> listener){ |
| | | listeners.add(listener); |
| | | }*/ |
| | | |
| | | /** |
| | | * 注解注册侦听 |
| | | * @param listenerBeanName |
| | | */ |
| | | public static void addListener(String listenerBeanName){ |
| | | //将 listener 放入 AppEventPublishing 中方便后期操作 |
| | | //注册侦听 |
| | | listeners.add(listenerBeanName); |
| | | } |
| | | |
| | | /** |
| | | * 获取侦听(全部侦听) |
| | | * @return |
| | | */ |
| | | public static List<DataFlowListener<?>> getListeners(){ |
| | | private static List<String> getListeners(){ |
| | | return listeners; |
| | | } |
| | | |
| | |
| | | * @since 1.8 |
| | | * @return |
| | | */ |
| | | public static List<DataFlowListener<?>> getListeners(String interfaceClassName){ |
| | | private static List<DataFlowListener<?>> getListeners(String interfaceClassName){ |
| | | |
| | | Assert.isNull(interfaceClassName,"获取需要发布的事件处理侦听时,传递事件为空,请检查"); |
| | | Assert.hasLength(interfaceClassName,"获取需要发布的事件处理侦听时,传递事件为空,请检查"); |
| | | |
| | | //先从缓存中获取,为了提升效率 |
| | | if(cacheListenersMap.containsKey(interfaceClassName)){ |
| | |
| | | } |
| | | |
| | | List<DataFlowListener<?>> dataFlowListeners = new ArrayList<DataFlowListener<?>>(); |
| | | for(DataFlowListener<?> listener : getListeners()){ |
| | | |
| | | for(String listenerBeanName : getListeners()){ |
| | | DataFlowListener<?> listener = ApplicationContextFactory.getBean(listenerBeanName,DataFlowListener.class); |
| | | Type[] types = listener.getClass().getGenericInterfaces(); |
| | | for (Type type : types) { |
| | | if (type instanceof ParameterizedType) { |
| | |
| | | /** |
| | | * 注册事件 |
| | | */ |
| | | public static void addEvent(String serviceCode ,Class<DataFlowEvent> event) { |
| | | /*public static void addEvent(String serviceCode ,Class<DataFlowEvent> event) { |
| | | events.put(serviceCode,event); |
| | | } |
| | | }*/ |
| | | |
| | | /** |
| | | * 获取事件 |
| | |
| | | * @return |
| | | * @throws Exception |
| | | */ |
| | | public static Class<DataFlowEvent> getEvent(String serviceCode) throws BusinessException{ |
| | | /*public static Class<DataFlowEvent> getEvent(String serviceCode) throws BusinessException{ |
| | | Class<DataFlowEvent> targetEvent = events.get(serviceCode); |
| | | //Assert.notNull(targetEvent,"改服务未注册该事件[serviceCode = "+serviceCode+"],系统目前不支持!"); |
| | | return targetEvent; |
| | | } |
| | | }*/ |
| | | |
| | | |
| | | /** |
| | | * 发布事件 |
| | | * @param serviceCode |
| | | * @param dataFlow |
| | | * @param event 事件 |
| | | */ |
| | | public static void multicastEvent(String serviceCode,DataFlow dataFlow) throws BusinessException{ |
| | | /* public static void multicastEvent(String serviceCode,DataFlow dataFlow) throws BusinessException{ |
| | | multicastEvent(serviceCode,dataFlow,null); |
| | | }*/ |
| | | |
| | | private static void multicastEvent(final DataFlowEvent event) throws BusinessException{ |
| | | multicastEvent(event,"S"); |
| | | } |
| | | |
| | | /** |
| | | * 发布事件 |
| | | * @param serviceCode |
| | | * @param dataFlow 这个订单信息,以便于 侦听那边需要用 |
| | | */ |
| | | public static void multicastEvent(String serviceCode,DataFlow dataFlow,String asyn) throws BusinessException{ |
| | | try { |
| | | Class<DataFlowEvent> dataFlowEventClass = getEvent(serviceCode); |
| | | |
| | | if(dataFlowEventClass == null){ |
| | | return ; |
| | | } |
| | | |
| | | Class[] parameterTypes = {Object.class, DataFlow.class}; |
| | | |
| | | Constructor constructor = dataFlowEventClass.getClass().getConstructor(parameterTypes); |
| | | Object[] parameters = {null, dataFlow}; |
| | | DataFlowEvent targetDataFlowEvent = (DataFlowEvent) constructor.newInstance(parameters); |
| | | multicastEvent(targetDataFlowEvent, asyn); |
| | | }catch (Exception e){ |
| | | throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR,"发布侦听失败,失败原因为:"+e); |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | /** |
| | | * 发布事件 |
| | | * @param event |
| | | * @param asyn A 表示异步处理 |
| | | */ |
| | | public static void multicastEvent(final DataFlowEvent event,String asyn) { |
| | | for (final DataFlowListener<?> listener : getListeners(event.getClass().getName())) { |
| | | private static void multicastEvent(final DataFlowEvent event, String asyn) { |
| | | |
| | | for (final DataFlowListener<?> listener : getListeners(event.getClass().getName())) { |
| | | |
| | | if(CommonConstant.PROCESS_ORDER_ASYNCHRONOUS.equals(asyn)){ //异步处理 |
| | | |
| | |
| | | throw new RuntimeException("发布侦听失败,"+listener+ event + e); |
| | | } |
| | | } |
| | | |
| | | |
| | | /***********************************************发布侦听 开始***************************************************************/ |
| | | /** |
| | | * 发布接受请求事件 |
| | | * @param requestData |
| | | * @param headers |
| | | */ |
| | | public static void receiveRequest(String requestData,Map<String,String> headers){ |
| | | multicastEvent(new ReceiveRequestEvent("",null,requestData,headers)); |
| | | } |
| | | |
| | | /** |
| | | * 发布预校验侦听 |
| | | * @param requestData |
| | | * @param headers |
| | | */ |
| | | public static void preValidateData(String requestData,Map<String,String> headers){ |
| | | multicastEvent(new DataPreValidateEvent("",null,requestData,headers)); |
| | | } |
| | | |
| | | /** |
| | | * 初始化 DataFlow 对象完成 |
| | | * @param dataFlow 数据流对象 |
| | | */ |
| | | public static void initDataFlowComplete(IOrderDataFlowContext dataFlow){ |
| | | multicastEvent(new DataFlowInitCompleteEvent("",dataFlow)); |
| | | } |
| | | |
| | | /** |
| | | * 规则校验完成事件 |
| | | * @param dataFlow 数据流对象 |
| | | */ |
| | | public static void ruleValidateComplete(IOrderDataFlowContext dataFlow){ |
| | | multicastEvent(new RuleValidateCompleteEvent("",dataFlow)); |
| | | } |
| | | |
| | | /** |
| | | * 加载配置文件完成 |
| | | * @param dataFlow 数据流对象 |
| | | */ |
| | | public static void loadConfigDataComplete(IOrderDataFlowContext dataFlow){ |
| | | multicastEvent(new LoadConfigDataCompleteEvent("",dataFlow)); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 调用业务系统事件 |
| | | * @param dataFlow 数据流 |
| | | */ |
| | | public static void invokeBusinessSystem(IOrderDataFlowContext dataFlow){ |
| | | multicastEvent(new InvokeBusinessSystemEvent("",dataFlow)); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 调用业务系统成功后事件 |
| | | * @param dataFlow |
| | | * @param business 成功的事件业务数据封装对象 |
| | | */ |
| | | public static void invokeBusinessBSuccess(IOrderDataFlowContext dataFlow, Business business, JSONObject businessResponseData){ |
| | | multicastEvent(new InvokeBusinessBSuccessEvent("",dataFlow,business,businessResponseData)); |
| | | } |
| | | |
| | | /** |
| | | * 调用业务系统成功后事件 |
| | | * @param dataFlow |
| | | * @param business 成功的事件业务数据封装对象 |
| | | */ |
| | | public static void invokeBusinessBSuccess(IOrderDataFlowContext dataFlow, Business business){ |
| | | multicastEvent(new InvokeBusinessBSuccessEvent("",dataFlow,business)); |
| | | } |
| | | |
| | | /** |
| | | * 调用业务系统成功后事件 |
| | | * @param dataFlow |
| | | * @param business 成功的事件业务数据封装对象 |
| | | */ |
| | | public static void invokeBusinessISuccess(IOrderDataFlowContext dataFlow, Business business){ |
| | | multicastEvent(new InvokeBusinessISuccessEvent("",dataFlow,business)); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 数据返回事件 |
| | | * @param dataFlow 数据流 |
| | | */ |
| | | public static void dataResponse(IOrderDataFlowContext dataFlow,String responseData,Map<String,String> headers){ |
| | | multicastEvent(new DataResponseEvent("",dataFlow,responseData,headers)); |
| | | } |
| | | |
| | | /***********************************************发布侦听 结束***************************************************************/ |
| | | } |