| | |
| | | |
| | | import com.java110.common.constant.CommonConstant; |
| | | import com.java110.common.constant.ResponseConstant; |
| | | import com.java110.common.constant.ServiceCodeConstant; |
| | | import com.java110.common.exception.BusinessException; |
| | | import com.java110.common.exception.ListenerExecuteException; |
| | | import com.java110.common.factory.ApplicationContextFactory; |
| | | import com.java110.common.log.LoggerEngine; |
| | | import com.java110.common.util.Assert; |
| | |
| | | import com.java110.event.center.DataFlowListenerOrderComparator; |
| | | import com.java110.event.service.BusinessServiceDataFlowEvent; |
| | | import com.java110.event.service.BusinessServiceDataFlowListener; |
| | | import org.springframework.http.HttpMethod; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.HashMap; |
| | |
| | | * @since 1.8 |
| | | * @return |
| | | */ |
| | | public static List<ServiceDataFlowListener> getListeners(String serviceCode){ |
| | | public static List<ServiceDataFlowListener> getListeners(String serviceCode,String httpMethod){ |
| | | |
| | | Assert.hasLength(serviceCode,"获取需要发布的事件处理侦听时,传递事件为空,请检查"); |
| | | |
| | | String needCachedServiceCode = serviceCode+httpMethod; |
| | | //先从缓存中获取,为了提升效率 |
| | | if(cacheListenersMap.containsKey(serviceCode)){ |
| | | return cacheListenersMap.get(serviceCode); |
| | | if(cacheListenersMap.containsKey(needCachedServiceCode)){ |
| | | return cacheListenersMap.get(needCachedServiceCode); |
| | | } |
| | | |
| | | List<ServiceDataFlowListener> dataFlowListeners = new ArrayList<ServiceDataFlowListener>(); |
| | | for(String listenerBeanName : getListeners()){ |
| | | ServiceDataFlowListener listener = ApplicationContextFactory.getBean(listenerBeanName,ServiceDataFlowListener.class); |
| | | if(serviceCode.equals(listener.getServiceCode())){ |
| | | if(serviceCode.equals(listener.getServiceCode()) |
| | | && listener.getHttpMethod() == HttpMethod.valueOf(httpMethod)){ |
| | | dataFlowListeners.add(listener); |
| | | } |
| | | //特殊处理 透传类接口 |
| | | if(ServiceCodeConstant.SERVICE_CODE_DO_SERVICE_TRANSFER.equals(listener.getServiceCode()) |
| | | && ServiceCodeConstant.SERVICE_CODE_DO_SERVICE_TRANSFER.equals(serviceCode)){ |
| | | dataFlowListeners.add(listener); |
| | | } |
| | | } |
| | |
| | | DataFlowListenerOrderComparator.sort(dataFlowListeners); |
| | | |
| | | //将数据放入缓存中 |
| | | cacheListenersMap.put(serviceCode,dataFlowListeners); |
| | | cacheListenersMap.put(needCachedServiceCode,dataFlowListeners); |
| | | return dataFlowListeners; |
| | | } |
| | | |
| | |
| | | |
| | | multicastEvent(serviceCode,targetDataFlowEvent, asyn); |
| | | }catch (Exception e){ |
| | | throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR,"发布侦听失败,失败原因为:"+e); |
| | | logger.error("发布侦听失败,失败原因为:",e); |
| | | throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR,"发布侦听失败,失败原因为:"+e.getMessage()); |
| | | } |
| | | |
| | | } |
| | |
| | | * @param asyn A 表示异步处理 |
| | | */ |
| | | public static void multicastEvent(String serviceCode,final ServiceDataFlowEvent event, String asyn) { |
| | | for (final ServiceDataFlowListener listener : getListeners(serviceCode)) { |
| | | String httpMethod = event.getDataFlowContext().getRequestCurrentHeaders().get(CommonConstant.HTTP_METHOD); |
| | | List<ServiceDataFlowListener> listeners = getListeners(serviceCode,httpMethod); |
| | | //这里判断 serviceCode + httpMethod 的侦听,如果没有注册直接报错。 |
| | | if(listeners == null || listeners.size() == 0){ |
| | | throw new ListenerExecuteException(ResponseConstant.RESULT_CODE_ERROR, |
| | | "服务【" + serviceCode + "】调用方式【"+httpMethod+"】当前不支持"); |
| | | } |
| | | for (final ServiceDataFlowListener listener : listeners) { |
| | | |
| | | if(CommonConstant.PROCESS_ORDER_ASYNCHRONOUS.equals(asyn)){ //异步处理 |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Return the current task executor for this multicaster. |
| | | */ |