package com.java110.event.center; import com.java110.common.constant.CommonConstant; import com.java110.common.exception.BusinessException; import com.java110.common.factory.ApplicationContextFactory; import com.java110.common.log.LoggerEngine; import com.java110.common.util.Assert; import com.java110.core.context.DataFlow; import com.java110.event.center.event.*; import com.java110.event.center.listener.DataFlowListener; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.Executors; /** * 数据流 事件发布 * Created by wuxw on 2018/4/17. */ public class DataFlowEventPublishing extends LoggerEngine { private static Executor taskExecutor; //默认 线程数 100 private final static int DEFAULT_THREAD_NUM = 100; /** * 保存侦听实例信息,一般启动时加载 */ private final static List listeners = new ArrayList(); /** * 保存事件实例信息,一般启动时加载 */ /* private final static Map> events = new HashMap>();*/ /** * 根据 事件类型查询侦听 */ private final static Map>> cacheListenersMap = new HashMap>>(); /** * 添加 侦听,这个只有启动时,单线程 处理,所以是线程安全的 * @param listener */ /*public static void addListener(DataFlowListener listener){ listeners.add(listener); }*/ /** * 注解注册侦听 * @param listenerBeanName */ public static void addListener(String listenerBeanName){ //将 listener 放入 AppEventPublishing 中方便后期操作 //注册侦听 listeners.add(listenerBeanName); } /** * 获取侦听(全部侦听) * @return */ private static List getListeners(){ return listeners; } /** * 根据是否实现了某个接口,返回侦听 * @param interfaceClassName * @since 1.8 * @return */ private static List> getListeners(String interfaceClassName){ Assert.hasLength(interfaceClassName,"获取需要发布的事件处理侦听时,传递事件为空,请检查"); //先从缓存中获取,为了提升效率 if(cacheListenersMap.containsKey(interfaceClassName)){ return cacheListenersMap.get(interfaceClassName); } List> dataFlowListeners = new ArrayList>(); for(String listenerBeanName : getListeners()){ DataFlowListener listener = ApplicationContextFactory.getBean(listenerBeanName,DataFlowListener.class); Type[] types = listener.getClass().getGenericInterfaces(); for (Type type : types) { if (type instanceof ParameterizedType) { Type[] typeInterfaces = ((ParameterizedType) type).getActualTypeArguments(); for (Type typeInterface : typeInterfaces){ if(interfaceClassName.equals(typeInterface.getTypeName())){ dataFlowListeners.add(listener); } } } } } //这里排序 DataFlowListenerOrderComparator.sort(dataFlowListeners); //将数据放入缓存中 cacheListenersMap.put(interfaceClassName,dataFlowListeners); return dataFlowListeners; } /** * 注册事件 */ /*public static void addEvent(String serviceCode ,Class event) { events.put(serviceCode,event); }*/ /** * 获取事件 * @param serviceCode * @return * @throws Exception */ /*public static Class getEvent(String serviceCode) throws BusinessException{ Class targetEvent = events.get(serviceCode); //Assert.notNull(targetEvent,"改服务未注册该事件[serviceCode = "+serviceCode+"],系统目前不支持!"); return targetEvent; }*/ /** * 发布事件 * @param event 事件 */ /* public static void multicastEvent(String serviceCode,DataFlow dataFlow) throws BusinessException{ multicastEvent(serviceCode,dataFlow,null); }*/ private static void multicastEvent(final DataFlowEvent event) throws BusinessException{ multicastEvent(event,"S"); } /** * 发布事件 * @param event * @param asyn A 表示异步处理 */ private static void multicastEvent(final DataFlowEvent event, String asyn) { for (final DataFlowListener listener : getListeners(event.getClass().getName())) { if(CommonConstant.PROCESS_ORDER_ASYNCHRONOUS.equals(asyn)){ //异步处理 Executor executor = getTaskExecutor(); executor.execute(new Runnable() { @Override public void run() { invokeListener(listener, event); } }); } else { invokeListener(listener, event); } } } /** * Return the current task executor for this multicaster. */ protected static synchronized Executor getTaskExecutor() { if(taskExecutor == null) { taskExecutor = Executors.newFixedThreadPool(DEFAULT_THREAD_NUM); } return taskExecutor; } /** * Invoke the given listener with the given event. * @param listener the ApplicationListener to invoke * @param event the current event to propagate * @since 4.1 */ @SuppressWarnings({"unchecked", "rawtypes"}) protected static void invokeListener(DataFlowListener listener, DataFlowEvent event) { try { listener.soService(event); }catch (Exception e){ LoggerEngine.error("发布侦听失败",e); throw new RuntimeException("发布侦听失败,"+listener+ event + e); } } /***********************************************发布侦听 开始***************************************************************/ /** * 发布接受请求事件 * @param requestData * @param headers */ public static void receiveRequest(String requestData,Map headers){ multicastEvent(new ReceiveRequestEvent("",null,requestData,headers)); } /** * 发布预校验侦听 * @param requestData * @param headers */ public static void preValidateData(String requestData,Map headers){ multicastEvent(new DataPreValidateEvent("",null,requestData,headers)); } /** * 初始化 DataFlow 对象完成 * @param dataFlow 数据流对象 */ public static void initDataFlowComplete(DataFlow dataFlow){ multicastEvent(new DataFlowInitCompleteEvent("",dataFlow)); } /** * 规则校验完成事件 * @param dataFlow 数据流对象 */ public static void ruleValidateComplete(DataFlow dataFlow){ multicastEvent(new RuleValidateCompleteEvent("",dataFlow)); } /** * 加载配置文件完成 * @param dataFlow 数据流对象 */ public static void loadConfigDataComplete(DataFlow dataFlow){ multicastEvent(new LoadConfigDataCompleteEvent("",dataFlow)); } /** * 调用业务系统事件 * @param dataFlow 数据流 */ public static void invokeBusinessSystem(DataFlow dataFlow){ multicastEvent(new InvokeBusinessSystemEvent("",dataFlow)); } /** * 数据返回事件 * @param dataFlow 数据流 */ public static void dataResponse(DataFlow dataFlow,String responseData,Map headers){ multicastEvent(new DataResponseEvent("",dataFlow,responseData,headers)); } /***********************************************发布侦听 结束***************************************************************/ }