package com.java110.order.smo.impl; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.java110.order.dao.ICenterServiceDAO; import com.java110.order.smo.ICenterServiceSMO; import com.java110.utils.cache.AppRouteCache; import com.java110.utils.cache.MappingCache; import com.java110.utils.constant.*; import com.java110.utils.exception.*; import com.java110.utils.factory.ApplicationContextFactory; import com.java110.utils.kafka.KafkaFactory; import com.java110.utils.log.LoggerEngine; import com.java110.utils.util.*; import com.java110.core.client.RestTemplate; import com.java110.core.context.DataFlow; import com.java110.core.factory.AuthenticationFactory; import com.java110.core.factory.DataFlowFactory; import com.java110.core.factory.DataTransactionFactory; import com.java110.core.factory.GenerateCodeFactory; import com.java110.entity.center.AppRoute; import com.java110.entity.center.AppService; import com.java110.entity.center.Business; import com.java110.entity.center.DataFlowLinksCost; import com.java110.core.event.center.DataFlowEventPublishing; import com.java110.core.log.LogAgent; import com.java110.service.init.ServiceInfoListener; import com.java110.service.smo.IQueryServiceSMO; import org.apache.commons.lang3.math.NumberUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.*; import org.springframework.stereotype.Service; import java.util.*; /** * 中心服务处理类 * Created by wuxw on 2018/4/13. */ @Service("centerServiceSMOImpl") //@Transactional public class CenterServiceSMOImpl extends LoggerEngine implements ICenterServiceSMO { @Autowired ICenterServiceDAO centerServiceDaoImpl; @Autowired private RestTemplate restTemplate; @Autowired private RestTemplate outRestTemplate; @Autowired private IQueryServiceSMO queryServiceSMOImpl; @Override @Deprecated public String service(String reqJson, Map headers) throws SMOException{ DataFlow dataFlow = null; JSONObject responseJson = null; String resJson = ""; try { reqJson = decrypt(reqJson,headers); DataFlowEventPublishing.preValidateData(reqJson,headers); //1.0 创建数据流 dataFlow = DataFlowFactory.newInstance(DataFlow.class).builder(reqJson, headers); //DataFlowEventPublishing.initDataFlowComplete(dataFlow); //2.0 加载配置信息 initConfigData(dataFlow); //DataFlowEventPublishing.loadConfigDataComplete(dataFlow); //3.0 校验 APPID是否有权限操作serviceCode judgeAuthority(dataFlow); //4.0 调用规则校验 ruleValidate(dataFlow); //DataFlowEventPublishing.ruleValidateComplete(dataFlow); //5.0 保存订单和业务项 c_orders c_order_attrs c_business c_business_attrs saveOrdersAndBusiness(dataFlow); //6.0 调用下游系统 //DataFlowEventPublishing.invokeBusinessSystem(dataFlow); invokeBusinessSystem(dataFlow); responseJson = DataTransactionFactory.createCommonResponseJson(dataFlow); } catch (DecryptException e){ //解密异常 responseJson = DataTransactionFactory.createOrderResponseJson(ResponseConstant.NO_TRANSACTION_ID, e.getResult().getCode(), e.getMessage()); }catch (BusinessException e) { try { //7.0 作废订单和业务项 invalidOrderAndBusiness(dataFlow); //8.0 广播作废业务系统订单信息 //想法:这里可以直接不广播,只有在业务返回时才广播, // 疑问:在这里部分消息发出去了,如果在receiveBusinessSystemNotifyMessage这个方法中依然没有收到,我们认为是下游系统也是失败了不用处理, //目前看逻辑也是对的 //invalidBusinessSystem(dataFlow); } catch (Exception e1) { LoggerEngine.error("作废订单失败", e); //9.0 将订单状态改为失败,人工处理。 updateOrderAndBusinessError(dataFlow); } finally { responseJson = DataTransactionFactory.createOrderResponseJson(dataFlow.getTransactionId(), e.getResult().getCode(), e.getMessage()); } } catch (OrdersException e) { responseJson = DataTransactionFactory.createOrderResponseJson(dataFlow.getTransactionId(), e.getResult().getCode(), e.getMessage()); } catch (RuleException e) { responseJson = DataTransactionFactory.createOrderResponseJson(dataFlow.getTransactionId(), e.getResult().getCode(), e.getMessage()); } catch (NoAuthorityException e) { responseJson = DataTransactionFactory.createOrderResponseJson(dataFlow.getTransactionId(), e.getResult().getCode(), e.getMessage()); } catch (InitConfigDataException e){ responseJson = DataTransactionFactory.createOrderResponseJson(dataFlow.getTransactionId(), e.getResult().getCode(), e.getMessage()); }catch (Exception e) { logger.error("内部异常了:",e); responseJson = DataTransactionFactory.createOrderResponseJson(dataFlow == null ? ResponseConstant.NO_TRANSACTION_ID : dataFlow.getTransactionId(), ResponseConstant.RESULT_CODE_INNER_ERROR, "内部异常了:" + e.getMessage() + e.getLocalizedMessage()); } finally { if(dataFlow != null) { //这里记录日志 Date endDate = DateUtil.getCurrentDate(); dataFlow.setEndDate(endDate); dataFlow.setResJson(responseJson); //添加耗时 //DataFlowFactory.addCostTime(dataFlow, "service", "业务处理总耗时", dataFlow.getStartDate(), dataFlow.getEndDate()); //保存耗时 //saveCostTimeLogMessage(dataFlow); //处理返回报文鉴权 AuthenticationFactory.putSign(dataFlow, responseJson); saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestHeaders(),dataFlow.getReqJson().toJSONString()), LogAgent.createLogMessage(dataFlow.getResponseHeaders(),dataFlow.getResJson().toJSONString()),endDate.getTime()-dataFlow.getStartDate().getTime()); // DataFlowEventPublishing.dataResponse(dataFlow,reqJson,headers); } resJson = encrypt(responseJson.toJSONString(),headers); //这里保存耗时,以及日志 } return resJson; } /** * 业务统一处理服务方法 * @param reqJson 请求报文json * @return */ public ResponseEntity serviceApi(String reqJson, Map headers) throws SMOException{ DataFlow dataFlow = null; JSONObject responseJson = null; ResponseEntity responseEntity = null; try { DataFlowEventPublishing.preValidateData(reqJson,headers); //1.0 创建数据流 dataFlow = DataFlowFactory.newInstance(DataFlow.class).builder(reqJson, headers); //DataFlowEventPublishing.initDataFlowComplete(dataFlow); //2.0 加载配置信息 initConfigData(dataFlow); //DataFlowEventPublishing.loadConfigDataComplete(dataFlow); //3.0 校验 APPID是否有权限操作serviceCode judgeAuthority(dataFlow); //4.0 调用规则校验 ruleValidate(dataFlow); //DataFlowEventPublishing.ruleValidateComplete(dataFlow); //5.0 保存订单和业务项 c_orders c_order_attrs c_business c_business_attrs saveOrdersAndBusiness(dataFlow); //6.0 调用下游系统 //DataFlowEventPublishing.invokeBusinessSystem(dataFlow); invokeBusinessSystem(dataFlow); responseJson = DataTransactionFactory.createCommonResponseJson(dataFlow); } catch (DecryptException e){ //解密异常 responseEntity = new ResponseEntity(e.getMessage() , HttpStatus.PROXY_AUTHENTICATION_REQUIRED); }catch (BusinessException e) { try { //7.0 作废订单和业务项 invalidOrderAndBusiness(dataFlow); //8.0 广播作废业务系统订单信息 //想法:这里可以直接不广播,只有在业务返回时才广播, // 疑问:在这里部分消息发出去了,如果在receiveBusinessSystemNotifyMessage这个方法中依然没有收到,我们认为是下游系统也是失败了不用处理, //目前看逻辑也是对的 //invalidBusinessSystem(dataFlow); } catch (Exception e1) { LoggerEngine.error("作废订单失败", e); //9.0 将订单状态改为失败,人工处理。 updateOrderAndBusinessError(dataFlow); } finally { responseEntity = new ResponseEntity(e.getMessage() , HttpStatus.INTERNAL_SERVER_ERROR); } } catch (OrdersException e) { responseEntity = new ResponseEntity(e.getMessage() , HttpStatus.INTERNAL_SERVER_ERROR); } catch (RuleException e) { responseEntity = new ResponseEntity(e.getMessage(), HttpStatus.NETWORK_AUTHENTICATION_REQUIRED); } catch (NoAuthorityException e) { responseEntity = new ResponseEntity(e.getMessage(), HttpStatus.UNAUTHORIZED); } catch (InitConfigDataException e){ responseEntity = new ResponseEntity(e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR); }catch (Exception e) { logger.error("内部异常了:",e); responseEntity = new ResponseEntity("内部异常了:"+e.getMessage() + e.getLocalizedMessage(), HttpStatus.INTERNAL_SERVER_ERROR); } finally { if(responseEntity == null){ responseEntity = new ResponseEntity(responseJson.toJSONString(),HttpStatus.OK); } if(dataFlow != null) { //这里记录日志 Date endDate = DateUtil.getCurrentDate(); dataFlow.setEndDate(endDate); if(responseJson != null ) { dataFlow.setResJson(responseJson); } //添加耗时 //DataFlowFactory.addCostTime(dataFlow, "service", "业务处理总耗时", dataFlow.getStartDate(), dataFlow.getEndDate()); //保存耗时 //saveCostTimeLogMessage(dataFlow); saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestHeaders(),dataFlow.getReqJson().toJSONString()), LogAgent.createLogMessage(dataFlow.getResponseHeaders(),responseEntity.getBody()),endDate.getTime()-dataFlow.getStartDate().getTime()); //DataFlowEventPublishing.dataResponse(dataFlow,reqJson,headers); } //这里保存耗时,以及日志 } return responseEntity ; } /** * 透传处理 * @param reqJson * @param headers * @return * @throws SMOException */ @Override public String serviceTransfer(String reqJson, Map headers) throws SMOException { DataFlow dataFlow = null; String responseData = null; String resJson = ""; try { reqJson = decrypt(reqJson,headers); //1.0 创建数据流 dataFlow = DataFlowFactory.newInstance(DataFlow.class).builderTransfer(reqJson, headers); //2.0 加载配置信息 initConfigData(dataFlow); //3.0 校验 APPID是否有权限操作serviceCode judgeAuthority(dataFlow); //4.0 调用规则校验 ruleValidate(dataFlow); //5.0 保存订单和业务项 c_orders c_order_attrs c_business c_business_attrs //saveOrdersAndBusiness(dataFlow); //6.0 调用下游系统 transferInvokeBusinessSystem(dataFlow); responseData = DataTransactionFactory.createCommonResData(dataFlow); } catch (DecryptException e){ //解密异常 responseData = DataTransactionFactory.createOrderResponseJson(ResponseConstant.NO_TRANSACTION_ID, e.getResult().getCode(), e.getMessage()).toJSONString(); } catch (RuleException e) { responseData = DataTransactionFactory.createOrderResponseJson(dataFlow.getTransactionId(), e.getResult().getCode(), e.getMessage()).toJSONString(); } catch (NoAuthorityException e) { responseData = DataTransactionFactory.createOrderResponseJson(dataFlow.getTransactionId(), e.getResult().getCode(), e.getMessage()).toJSONString(); } catch (InitConfigDataException e){ responseData = DataTransactionFactory.createOrderResponseJson(dataFlow.getTransactionId(), e.getResult().getCode(), e.getMessage()).toJSONString(); }catch (Exception e) { logger.error("内部异常了:",e); responseData = DataTransactionFactory.createOrderResponseJson(dataFlow == null ? ResponseConstant.NO_TRANSACTION_ID : dataFlow.getTransactionId(), ResponseConstant.RESULT_CODE_INNER_ERROR, "内部异常了:" + e.getMessage() + e.getLocalizedMessage()).toJSONString(); } finally { if(dataFlow != null) { //这里记录日志 Date endDate = DateUtil.getCurrentDate(); dataFlow.setEndDate(endDate); dataFlow.setResData(responseData); //添加耗时 DataFlowFactory.addCostTime(dataFlow, "service", "业务处理总耗时", dataFlow.getStartDate(), dataFlow.getEndDate()); //这里保存耗时,以及日志 saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestHeaders(),dataFlow.getReqJson().toJSONString()), LogAgent.createLogMessage(dataFlow.getResponseHeaders(),dataFlow.getResJson().toJSONString()),endDate.getTime()-dataFlow.getStartDate().getTime()); //保存耗时 saveCostTimeLogMessage(dataFlow); //组装返回头信息 putResponseHeader(dataFlow,headers); //处理返回报文鉴权 AuthenticationFactory.putSign(dataFlow, headers); } resJson = encrypt(responseData,headers); } return resJson; } /** * 抒写返回头信息 * @param dataFlow */ private void putResponseHeader(DataFlow dataFlow,Map headers) { headers.put("responseTime", DateUtil.getDefaultFormateTimeString(new Date())); headers.put("transactionId",dataFlow.getTransactionId()); } /** * 解密 * @param reqJson * @return */ private String decrypt(String reqJson,Map headers) throws DecryptException{ try { if (MappingConstant.VALUE_ON.equals(headers.get(CommonConstant.ENCRYPT))) { logger.debug("解密前字符:" + reqJson); reqJson = new String(AuthenticationFactory.decrypt(reqJson.getBytes("UTF-8"), AuthenticationFactory.loadPrivateKey(MappingConstant.KEY_PRIVATE_STRING) , NumberUtils.isNumber(headers.get(CommonConstant.ENCRYPT_KEY_SIZE)) ? Integer.parseInt(headers.get(CommonConstant.ENCRYPT_KEY_SIZE)) : Integer.parseInt(MappingCache.getValue(MappingConstant.KEY_DEFAULT_DECRYPT_KEY_SIZE))),"UTF-8"); logger.debug("解密后字符:" + reqJson); } }catch (Exception e){ throw new DecryptException(ResponseConstant.RESULT_CODE_NO_AUTHORITY_ERROR,"解密失败"); } return reqJson; } /** * 加密 * @param resJson * @param headers * @return */ private String encrypt(String resJson,Map headers){ try { if (MappingConstant.VALUE_ON.equals(headers.get(CommonConstant.ENCRYPT))) { logger.debug("加密前字符:" + resJson); resJson = new String(AuthenticationFactory.encrypt(resJson.getBytes("UTF-8"), AuthenticationFactory.loadPubKey(MappingConstant.KEY_PUBLIC_STRING) , NumberUtils.isNumber(headers.get(CommonConstant.ENCRYPT_KEY_SIZE)) ? Integer.parseInt(headers.get(CommonConstant.ENCRYPT_KEY_SIZE)) : Integer.parseInt(MappingCache.getValue(MappingConstant.KEY_DEFAULT_DECRYPT_KEY_SIZE))),"UTF-8"); logger.debug("加密后字符:" + resJson); } }catch (Exception e){ logger.error("加密失败:",e); } return resJson; } /** * 2.0初始化配置信息 * * @param dataFlow */ private void initConfigData(DataFlow dataFlow) { Date startDate = DateUtil.getCurrentDate(); //查询配置信息,并将配置信息封装到 dataFlow 对象中 List appRoutes = AppRouteCache.getAppRoute(dataFlow.getAppId()); if (appRoutes == null) { //添加耗时 DataFlowFactory.addCostTime(dataFlow, "initConfigData", "加载配置耗时", startDate); throw new InitConfigDataException(ResponseConstant.RESULT_CODE_INNER_ERROR,"当前没有获取到AppId对应的信息,appId = "+dataFlow.getAppId()); } for(AppRoute appRoute: appRoutes) { dataFlow.addAppRoutes(appRoute); } // if("-1".equals(dataFlow.getDataFlowId()) || StringUtil.isNullOrNone(dataFlow.getDataFlowId())){ dataFlow.setDataFlowId(GenerateCodeFactory.getDataFlowId()); } //添加耗时 DataFlowFactory.addCostTime(dataFlow, "initConfigData", "加载配置耗时", startDate); } /** * 3.0判断 AppId 是否 有serviceCode权限 * * @param dataFlow * @throws RuntimeException */ private void judgeAuthority(DataFlow dataFlow) throws NoAuthorityException { Date startDate = DateUtil.getCurrentDate(); if (StringUtil.isNullOrNone(dataFlow.getAppId()) || dataFlow.getAppRoutes().size() == 0 ) { //添加耗时 DataFlowFactory.addCostTime(dataFlow, "judgeAuthority", "鉴权耗时", startDate); throw new NoAuthorityException(ResponseConstant.RESULT_CODE_NO_AUTHORITY_ERROR, "appId 为空或不正确"); } if (StringUtil.isNullOrNone(dataFlow.getTransactionId())) { //添加耗时 DataFlowFactory.addCostTime(dataFlow, "judgeAuthority", "鉴权耗时", startDate); throw new NoAuthorityException(ResponseConstant.RESULT_CODE_NO_AUTHORITY_ERROR, "transactionId 不能为空"); } if(!StringUtil.isNullOrNone(dataFlow.getAppRoutes().get(0).getSecurityCode())){ String sign = AuthenticationFactory.dataFlowMd5(dataFlow); if(!sign.equals(dataFlow.getReqSign().toLowerCase())){ throw new NoAuthorityException(ResponseConstant.RESULT_CODE_NO_AUTHORITY_ERROR, "签名失败"); } } if (StringUtil.isNullOrNone(dataFlow.getUserId())) { //添加耗时 DataFlowFactory.addCostTime(dataFlow, "judgeAuthority", "鉴权耗时", startDate); throw new NoAuthorityException(ResponseConstant.RESULT_CODE_NO_AUTHORITY_ERROR, "userId 不能为空"); } if (StringUtil.isNullOrNone(dataFlow.getRequestTime()) || !DateUtil.judgeDate(dataFlow.getRequestTime(), DateUtil.DATE_FORMATE_STRING_DEFAULT)) { //添加耗时 DataFlowFactory.addCostTime(dataFlow, "judgeAuthority", "鉴权耗时", startDate); throw new NoAuthorityException(ResponseConstant.RESULT_CODE_NO_AUTHORITY_ERROR, "requestTime 格式不对,遵循yyyyMMddHHmmss格式"); } if (StringUtil.isNullOrNone(dataFlow.getOrderTypeCd())) { //添加耗时 DataFlowFactory.addCostTime(dataFlow, "judgeAuthority", "鉴权耗时", startDate); throw new NoAuthorityException(ResponseConstant.RESULT_CODE_NO_AUTHORITY_ERROR, "orderTypeCd 不能为空"); } //判断 AppId 是否有权限操作相应的服务 if (dataFlow.getBusinesses() != null && dataFlow.getBusinesses().size() > 0) { for (Business business : dataFlow.getBusinesses()) { AppService appService = DataFlowFactory.getService(dataFlow, business.getServiceCode()); //这里调用缓存 查询缓存信息 if (appService == null) { //添加耗时 DataFlowFactory.addCostTime(dataFlow, "judgeAuthority", "鉴权耗时", startDate); throw new NoAuthorityException(ResponseConstant.RESULT_CODE_NO_AUTHORITY_ERROR, "AppId 没有权限访问 serviceCod = " + business.getServiceCode()); } if(CommonConstant.HTTP_SERVICE_API.equals(appService.getBusinessTypeCd())){ DataFlowFactory.addCostTime(dataFlow, "judgeAuthority", "鉴权耗时", startDate); throw new NoAuthorityException(ResponseConstant.RESULT_CODE_NO_AUTHORITY_ERROR, "当前服务配置为API 该AppId 没有权限访问 serviceCod = " + business.getServiceCode()); } } } //检验白名单 List whileListIp = dataFlow.getAppRoutes().get(0).getWhileListIp(); if (whileListIp != null && whileListIp.size() > 0 && !whileListIp.contains(dataFlow.getIp())) { //添加耗时 DataFlowFactory.addCostTime(dataFlow, "judgeAuthority", "鉴权耗时", startDate); throw new NoAuthorityException(ResponseConstant.RESULT_CODE_NO_AUTHORITY_ERROR, "当前IP被限制不能访问服务"); } //检查黑名单 List backListIp = dataFlow.getAppRoutes().get(0).getBackListIp(); if (backListIp != null && backListIp.size() > 0&& backListIp.contains(dataFlow.getIp())) { //添加耗时 DataFlowFactory.addCostTime(dataFlow, "judgeAuthority", "鉴权耗时", startDate); throw new NoAuthorityException(ResponseConstant.RESULT_CODE_NO_AUTHORITY_ERROR, "当前IP被限制不能访问服务"); } //添加耗时 DataFlowFactory.addCostTime(dataFlow, "judgeAuthority", "鉴权耗时", startDate); } /** * 4.0规则校验 * * @param dataFlow * @throws RuleException */ private void ruleValidate(DataFlow dataFlow) throws RuleException { Date startDate = DateUtil.getCurrentDate(); try { if (MappingConstant.VALUE_OFF.equals(MappingCache.getValue(MappingConstant.KEY_RULE_ON_OFF)) || (MappingCache.getValue(MappingConstant.KEY_NO_NEED_RULE_VALDATE_ORDER) != null && MappingCache.getValue(MappingConstant.KEY_NO_NEED_RULE_VALDATE_ORDER).contains(dataFlow.getOrderTypeCd()))) { //不做校验 //添加耗时 DataFlowFactory.addCostTime(dataFlow, "ruleValidate", "规则校验耗时", startDate); return ; } //调用规则 } catch (Exception e) { //添加耗时 DataFlowFactory.addCostTime(dataFlow, "ruleValidate", "规则校验耗时", startDate); throw new RuleException(ResponseConstant.RESULT_CODE_RULE_ERROR, "规则校验异常失败:" + e.getMessage()); } DataFlowFactory.addCostTime(dataFlow, "ruleValidate", "规则校验耗时", startDate); } /** * 5.0 保存订单和业务项 c_orders c_order_attrs c_business c_business_attrs * * @param dataFlow * @throws OrdersException */ private void saveOrdersAndBusiness(DataFlow dataFlow) throws OrdersException { Date startDate = DateUtil.getCurrentDate(); if(MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER) != null &&MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER).contains(dataFlow.getOrderTypeCd())){ //不保存订单信息 DataFlowFactory.addCostTime(dataFlow, "saveOrdersAndBusiness", "保存订单和业务项耗时", startDate); return ; } //1.0 保存 orders信息 centerServiceDaoImpl.saveOrder(DataFlowFactory.getOrder(dataFlow)); centerServiceDaoImpl.saveOrderAttrs(DataFlowFactory.getOrderAttrs(dataFlow)); //2.0 保存 business信息 centerServiceDaoImpl.saveBusiness(DataFlowFactory.getBusiness(dataFlow)); centerServiceDaoImpl.saveBusinessAttrs(DataFlowFactory.getBusinessAttrs(dataFlow)); DataFlowFactory.addCostTime(dataFlow, "saveOrdersAndBusiness", "保存订单和业务项耗时", startDate); } /** * 6.0 调用下游系统 * * @param dataFlow * @throws BusinessException */ private void invokeBusinessSystem(DataFlow dataFlow) throws BusinessException { Date startDate = DateUtil.getCurrentDate(); if(MappingCache.getValue(MappingConstant.KEY_NO_INVOKE_BUSINESS_SYSTEM) != null &&MappingCache.getValue(MappingConstant.KEY_NO_INVOKE_BUSINESS_SYSTEM).contains(dataFlow.getOrderTypeCd())){ //不用调用 下游系统的配置(一般不存在这种情况,这里主要是在没有下游系统的情况下测试中心服务用) DataFlowFactory.addCostTime(dataFlow, "invokeBusinessSystem", "调用下游系统耗时", startDate); dataFlow.setResponseBusinessJson(DataTransactionFactory.createCommonResponseJson(dataFlow.getTransactionId(), ResponseConstant.RESULT_CODE_SUCCESS, "成功",null)); return ; } //6.1 先处理同步方式的服务,每一同步后发布事件广播 doSynchronousBusinesses(dataFlow); //6.2 处理异步服务 doAsynchronousBusinesses(dataFlow); DataFlowFactory.addCostTime(dataFlow, "invokeBusinessSystem", "调用下游系统耗时", startDate); } /** * 6.0 调用下游系统 * * @param dataFlow * @throws BusinessException */ private void transferInvokeBusinessSystem(DataFlow dataFlow) throws BusinessException { Date startDate = DateUtil.getCurrentDate(); /* if(MappingCache.getValue(MappingConstant.KEY_NO_INVOKE_BUSINESS_SYSTEM) != null &&MappingCache.getValue(MappingConstant.KEY_NO_INVOKE_BUSINESS_SYSTEM).contains(dataFlow.getOrderTypeCd())){ //不用调用 下游系统的配置(一般不存在这种情况,这里主要是在没有下游系统的情况下测试中心服务用) DataFlowFactory.addCostTime(dataFlow, "invokeBusinessSystem", "调用下游系统耗时", startDate); dataFlow.setResponseBusinessJson(DataTransactionFactory.createCommonResponseJson(dataFlow.getTransactionId(), ResponseConstant.RESULT_CODE_SUCCESS, "成功",null)); return ; }*/ //6.1 先处理同步方式的服务,每一同步后发布事件广播 AppService service = DataFlowFactory.getService(dataFlow,dataFlow.getCurrentBusiness().getServiceCode()); String responseJson = doTransferRequestBusinessSystem(dataFlow, service, dataFlow.getCurrentBusiness().getTransferData()); dataFlow.setResData(responseJson); DataFlowFactory.addCostTime(dataFlow,dataFlow.getCurrentBusiness().getServiceCode(), "调用"+dataFlow.getCurrentBusiness().getServiceCode()+"耗时", startDate); saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),dataFlow.getCurrentBusiness().getTransferData()), LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),dataFlow.getResData()), DateUtil.getCurrentDate().getTime()-startDate.getTime()); DataFlowFactory.addCostTime(dataFlow, "invokeBusinessSystem", "调用下游系统耗时", startDate); } /** * 7.0 作废订单和业务项 插入撤单记录 等待撤单 * * @param dataFlow */ private void invalidOrderAndBusiness(DataFlow dataFlow) { Date startDate = DateUtil.getCurrentDate(); if(MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER) != null &&MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER).contains(dataFlow.getOrderTypeCd())){ //不用作废订单信息 DataFlowFactory.addCostTime(dataFlow, "invalidOrderAndBusiness", "作废订单和业务项耗时", startDate); return ; } //如果已经作废 不存在 或失败,则不做处理 Map order = centerServiceDaoImpl.getOrderInfoByBId(dataFlow.getCurrentBusiness().getbId()); if(order == null || !order.containsKey("status_cd") || StatusConstant.STATUS_CD_DELETE.equals(order.get("status_cd")) || StatusConstant.STATUS_CD_ERROR.equals(order.get("status_cd"))){ return ; } //作废 订单 centerServiceDaoImpl.updateOrder(DataFlowFactory.getNeedInvalidOrder(dataFlow)); //作废订单项 centerServiceDaoImpl.updateBusiness(DataFlowFactory.getNeedInvalidOrder(dataFlow)); //将当前订单项改为 撤单状态 centerServiceDaoImpl.updateBusinessByBId(DataFlowFactory.getNeedDeleteBusiness(dataFlow)); //插入撤单记录 doAddDeleteOrderBusinessData(dataFlow); DataFlowFactory.addCostTime(dataFlow, "invalidOrderAndBusiness", "作废订单和业务项耗时", startDate); } /** * 8.0 广播作废已经完成业务系统订单信息 * * @param dataFlow */ private void invalidCompletedBusinessSystem(DataFlow dataFlow) throws Exception{ if(!StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) { return ; } //判断 订单instance 是否都变成了撤单状态 if(centerServiceDaoImpl.judgeAllBusinessDeleteOrder(dataFlow.getoId(),StatusConstant.STATUS_CD_DELETE_ORDER) < 1){ return ; } // 根据 c_business 表中的字段business_type_cd 找到对应的消息队列名称 Map paramIn = new HashMap(); paramIn.put("oId",dataFlow.getoId()); paramIn.put("statusCd",StatusConstant.STATUS_CD_DELETE_ORDER); List completedBusinesses = centerServiceDaoImpl.getBusinessByOId(paramIn); for(AppRoute appRoute :dataFlow.getAppRoutes()){ for(Map completedBusiness : completedBusinesses){ if(completedBusiness.get("business_type_cd").equals(appRoute.getAppService().getBusinessTypeCd())){ long startTime = DateUtil.getCurrentDate().getTime(); //发起撤单 KafkaFactory.sendKafkaMessage(appRoute.getAppService().getMessageQueueName(),"", DataFlowFactory.getDeleteInstanceTableJson(dataFlow,completedBusiness,appRoute.getAppService()).toJSONString()); //saveLogMessage(DataFlowFactory.getDeleteInstanceTableJson(dataFlow,completedBusiness,appRoute.getAppService()),null); saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),DataFlowFactory.getDeleteInstanceTableJson(dataFlow,completedBusiness,appRoute.getAppService()).toJSONString()), LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),ResponseConstant.RESULT_CODE_SUCCESS), DateUtil.getCurrentDate().getTime() - startTime); } } } } /** * 9.0 将订单状态改为失败,人工处理。 * * @param dataFlow */ private void updateOrderAndBusinessError(DataFlow dataFlow) { Date startDate = DateUtil.getCurrentDate(); //作废 订单 centerServiceDaoImpl.updateOrder(DataFlowFactory.getNeedErrorOrder(dataFlow)); //作废订单项 centerServiceDaoImpl.updateBusiness(DataFlowFactory.getNeedErrorOrder(dataFlow)); DataFlowFactory.addCostTime(dataFlow, "updateOrderAndBusinessError", "订单状态改为失败耗时", startDate); } /** * 将订单状态改为作废状态。 * * @param dataFlow */ private void updateOrderAndBusinessDelete(DataFlow dataFlow) { Date startDate = DateUtil.getCurrentDate(); //作废 订单 centerServiceDaoImpl.updateOrder(DataFlowFactory.getNeedInvalidOrder(dataFlow)); //作废订单项 centerServiceDaoImpl.updateBusiness(DataFlowFactory.getNeedDeleteBusiness(dataFlow)); //加入撤单记录 //doAddDeleteOrderBusinessData(dataFlow); DataFlowFactory.addCostTime(dataFlow, "updateOrderAndBusinessError", "订单状态改为失败耗时", startDate); } /** * 加入撤单记录 * @param dataFlow */ private void doAddDeleteOrderBusinessData(DataFlow dataFlow){ /* Map business = new HashMap(); business.put("bId",SequenceUtil.getBId()); business.put("oId",dataFlow.getoId()); business.put("businessTypeCd",StatusConstant.REQUEST_BUSINESS_TYPE_DELETE); business.put("remark","发起撤单"); business.put("statusCd",StatusConstant.STATUS_CD_DELETE_ORDER);*/ centerServiceDaoImpl.saveBusiness(DataFlowFactory.getDeleteOrderBusiness(dataFlow,"订单失败,加入撤单")); } /** * 接受业务系统通知消息 * @param receiveJson 接受报文 * @throws SMOException */ @Override public void receiveBusinessSystemNotifyMessage(String receiveJson) throws SMOException{ Date startDate = DateUtil.getCurrentDate(); DataFlow dataFlow = null; try { //1.0 创建数据流 dataFlow = DataFlowFactory.newInstance(DataFlow.class).builderByBusiness(receiveJson); //如果订单都没有保存,则再不要处理 if(MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER) != null &&MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER).contains(dataFlow.getOrderTypeCd())){ //不保存订单信息 return ; } //如果不是 business 和instance 过程 则直接跳出 judgeBusinessOrInstance(dataFlow); //2.0加载数据,没有找到appId 及配置信息 则抛出InitConfigDataException reloadOrderInfoAndConfigData(dataFlow); //3.0 判断是否成功,失败会抛出BusinessStatusException异常 judgeBusinessStatusAndCompleteBusiness(dataFlow); //4.0 修改业务为成功,如果发现业务项已经是作废或失败状态(D或E)则抛出BusinessException异常 //completeBusiness(dataFlow); //5.0 判断 发起 Instance 条件是否满足,如果满足 发起 Instance过程 judgeSendToInstance(dataFlow); //7.0 判断撤单条件是否满足,如果满足发起撤单 invalidCompletedBusinessSystem(dataFlow); }catch (BusinessStatusException e){ logger.error("订单失败:" ,e); //8.0 将订单状态改为失败,人工处理。 updateOrderAndBusinessError(dataFlow); }catch (BusinessException e) { //9.0说明这个订单已经失败了,再不需要 //想法,这里广播当前失败业务 logger.error("修改业务数据失败",e); }catch (InitConfigDataException e){ //这种一般不会出现,除非人工改了数据 LoggerEngine.error("加载配置数据出错", e); try { //6.0 作废订单和所有业务项 invalidOrderAndBusiness(dataFlow); //7.0 广播作废业务系统订单信息,这里只有 Instance 失败后才发起 撤单 if(StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) { invalidCompletedBusinessSystem(dataFlow); } } catch (Exception e1) { LoggerEngine.error("作废订单失败", e1); //8.0 将订单状态改为失败,人工处理。 updateOrderAndBusinessError(dataFlow); } }catch (NoSupportException e){ LoggerEngine.error("当前业务不支持", e); }catch (Exception e){ LoggerEngine.error("作废订单失败", e); //10.0 成功的情况下通知下游系统失败将状态改为NE,人工处理。 updateBusinessNotifyError(dataFlow); }finally{ DataFlowFactory.addCostTime(dataFlow, "receiveBusinessSystemNotifyMessage", "接受业务系统通知消息耗时", startDate); saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),dataFlow.getReqJson().toJSONString()), LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),ResponseConstant.RESULT_CODE_SUCCESS), DateUtil.getCurrentDate().getTime() - dataFlow.getStartDate().getTime()); } } /** * Instance过程 * @param dataFlow */ private void doSendInstance(DataFlow dataFlow) { if(dataFlow == null || !StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(dataFlow.getBusinessType())){ return ; } try { KafkaFactory.sendKafkaMessage(DataFlowFactory.getService(dataFlow, dataFlow.getCurrentBusiness().getServiceCode()).getMessageQueueName(), "", DataFlowFactory.getBusinessTableDataInfoToInstanceTableJson(dataFlow, dataFlow.getCurrentBusiness()).toJSONString()); }catch (Exception e){ } } /** * 判断是否是 business 或者 instance过程 * @param dataFlow * @throws NoSupportException */ private void judgeBusinessOrInstance(DataFlow dataFlow) throws NoSupportException{ if(dataFlow == null || StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(dataFlow.getBusinessType()) || StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())){ return ; } throw new NoSupportException(ResponseConstant.RESULT_PARAM_ERROR,"当前只支持 Business 和 Instance过程"); } /** * 2.0重新加载订单信息到dataFlow 中 * * @param dataFlow */ private void reloadOrderInfoAndConfigData(DataFlow dataFlow) { Map order = centerServiceDaoImpl.getOrderInfoByBId(dataFlow.getBusinesses().get(0).getbId()); dataFlow.setoId(order.get("o_id").toString()); dataFlow.setAppId(order.get("app_id").toString()); if("-1".equals(dataFlow.getDataFlowId()) || StringUtil.isNullOrNone(dataFlow.getDataFlowId())){ throw new InitConfigDataException(ResponseConstant.RESULT_CODE_ERROR,"请求报文中没有包含 dataFlowId 节点"); } //重新刷端口信息 ServiceInfoListener serviceInfoListener = ApplicationContextFactory.getBean("serviceInfoListener",ServiceInfoListener.class); if(serviceInfoListener != null){ dataFlow.setPort(serviceInfoListener.getServerPort()+""); } //重新加载配置 initConfigData(dataFlow); } /** * 9.0 成功的情况下通知下游系统失败将状态改为NE,人工处理。 * * @param dataFlow */ private void updateBusinessNotifyError(DataFlow dataFlow) { Date startDate = DateUtil.getCurrentDate(); //完成订单项 centerServiceDaoImpl.updateBusinessByBId(DataFlowFactory.getNeedNotifyErrorBusiness(dataFlow)); DataFlowFactory.addCostTime(dataFlow, "updateBusinessNotifyError", "订单状态改为失败耗时", startDate); } /** * 判断是否都成功了 * @param dataFlow */ private void judgeBusinessStatusAndCompleteBusiness(DataFlow dataFlow) throws BusinessStatusException{ //List businesses = dataFlow.getBusinesses(); //1.0 判断是否存在撤单,如果是撤单则将当前 bId 标记为撤单状态 if(StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) { Map businessMap = centerServiceDaoImpl.getDeleteOrderBusinessByOId(dataFlow.getoId()); if(businessMap != null && !businessMap.isEmpty()){ centerServiceDaoImpl.updateBusinessByBId(DataFlowFactory.getNeedDeleteBusiness(dataFlow)); return ; } } Business business = dataFlow.getCurrentBusiness(); if(!ResponseConstant.RESULT_CODE_SUCCESS.equals(business.getCode())){ //throw new BusinessStatusException(business.getCode(),"业务bId= "+business.getbId() + " 处理失败,需要作废订单"); //作废订单和业务项 插入撤单记录 等待撤单 invalidOrderAndBusiness(dataFlow); }else{ completeBusiness(dataFlow); } } /** * 3.0 修改业务为成功,如果发现业务项已经是作废或失败状态(D或E)则抛出BusinessException异常 * * @param dataFlow */ private void completeBusiness(DataFlow dataFlow) throws BusinessException{ try { if(StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) { //完成订单项 centerServiceDaoImpl.updateBusinessByBId(DataFlowFactory.getNeedCompleteBusiness(dataFlow)); //如果业务都完成,则将 订单改为完成状态 centerServiceDaoImpl.completeOrderByBId(dataFlow.getCurrentBusiness().getbId()); }else if(StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(dataFlow.getBusinessType())) { centerServiceDaoImpl.updateBusinessByBId(DataFlowFactory.getNeedBusinessComplete(dataFlow)); }else{ //这里到不了,前面做了校验 throw new BusinessException(ResponseConstant.RESULT_PARAM_ERROR,"当前不支持 业务类型为 businessType" +dataFlow.getBusinessType()); } }catch (DAOException e){ throw new BusinessException(e.getResult(),e); } } /** * //4.0当所有业务动作是否都是C,将订单信息改为 C 并且发布竣工消息,这里在广播之前确认 * @param dataFlow */ private void judgeSendToInstance(DataFlow dataFlow) throws Exception{ try { if(centerServiceDaoImpl.judgeAllBusinessCompleted(dataFlow.getoId(),StatusConstant.STATUS_CD_BUSINESS_COMPLETE) > 0) { //通知成功消息 doSendInstance(dataFlow); } }catch (DAOException e){ //这里什么都不做,说明订单没有完成 } } /** * 通知 订单已经完成,后端需要完成数据 * @param dataFlow */ private void notifyBusinessSystemSuccessMessage(DataFlow dataFlow) throws Exception{ long startTime = DateUtil.getCurrentDate().getTime(); //拼装报文通知业务系统 KafkaFactory.sendKafkaMessage( DataFlowFactory.getService(dataFlow,dataFlow.getBusinesses().get(0).getServiceCode()).getMessageQueueName(),"",DataFlowFactory.getNotifyBusinessSuccessJson(dataFlow).toJSONString()); saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),DataFlowFactory.getNotifyBusinessSuccessJson(dataFlow).toJSONString()), LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),ResponseConstant.RESULT_CODE_SUCCESS), DateUtil.getCurrentDate().getTime() - startTime); } /** * 8.0 广播作废业务系统订单信息 * * @param dataFlow */ private void notifyBusinessSystemErrorMessage(DataFlow dataFlow) throws Exception{ long startTime = DateUtil.getCurrentDate().getTime(); //拼装报文通知业务系统 KafkaFactory.sendKafkaMessage( DataFlowFactory.getService(dataFlow,dataFlow.getBusinesses().get(0).getServiceCode()).getMessageQueueName(),"", DataFlowFactory.getNotifyBusinessErrorJson(dataFlow).toJSONString()); saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),DataFlowFactory.getNotifyBusinessErrorJson(dataFlow).toJSONString()), LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),ResponseConstant.RESULT_CODE_ERROR), DateUtil.getCurrentDate().getTime() - startTime); } /** * 处理同步业务 * @param dataFlow */ private void doSynchronousBusinesses(DataFlow dataFlow) throws BusinessException{ Date startDate = DateUtil.getCurrentDate(); List synchronousBusinesses = DataFlowFactory.getSynchronousBusinesses(dataFlow); List deleteBusinesses = new ArrayList(); if(synchronousBusinesses == null || synchronousBusinesses.size() == 0){ return ; } JSONArray responseBusinesses = new JSONArray(); //6.1处理同步服务 发起Business doSaveDataInfoToBusinessTable(dataFlow, synchronousBusinesses, responseBusinesses); try { //6.2发起Instance doBusinessTableDataInfoToInstanceTable(dataFlow, synchronousBusinesses,deleteBusinesses); }catch (Exception e){ try { //这里发起撤单逻辑 doDeleteOrderAndInstanceData(dataFlow, deleteBusinesses); }catch (Exception e1){ logger.error("撤单失败",e1); //这里记录撤单失败的信息 } throw new BusinessException(ResponseConstant.RESULT_PARAM_ERROR,e.getMessage()); } //6.3 c_business 数据修改为完成 /*List asynchronousBusinesses = DataFlowFactory.getAsynchronousBusinesses(dataFlow); if(asynchronousBusinesses == null || asynchronousBusinesses.size() == 0){ doComplateOrderAndBusiness(dataFlow,synchronousBusinesses); }*/ dataFlow.setRequestBusinessJson(dataFlow.getReqJson()); dataFlow.setResponseBusinessJson(DataTransactionFactory.createCommonResponseJson(dataFlow.getTransactionId(), ResponseConstant.RESULT_CODE_SUCCESS, "成功",responseBusinesses)); DataFlowFactory.addCostTime(dataFlow, "doSynchronousBusinesses", "同步调用业务系统总耗时", startDate); } /** * 发起撤单业务 * @param dataFlow * @param deleteBusinesses */ private void doDeleteOrderAndInstanceData(DataFlow dataFlow, List deleteBusinesses) { if(deleteBusinesses == null || deleteBusinesses.size() == 0){ return ; } //1.0 在c_business 表中加入 撤单记录 centerServiceDaoImpl.saveBusiness(DataFlowFactory.getDeleteOrderBusiness(dataFlow,"业务系统实例失败,发起撤单")); //2.0 作废 c_orders 和 c_business 数据 updateOrderAndBusinessDelete(dataFlow); //3.0 发起 撤单业务 doDeleteBusinessSystemInstanceData(dataFlow,deleteBusinesses); } /** * 完成订单状态 * @param synchronousBusinesses */ private void doComplateOrderAndBusiness(DataFlow dataFlow,List synchronousBusinesses) { //Complete Order and business Map order = new HashMap(); order.put("oId",dataFlow.getoId()); order.put("statusCd", StatusConstant.STATUS_CD_COMPLETE); order.put("finishTime",DateUtil.getCurrentDate()); centerServiceDaoImpl.updateOrder(order); centerServiceDaoImpl.updateBusiness(order); Date businessStartDate; AppService service; JSONObject requestBusinessJson; for(Business business : synchronousBusinesses){ businessStartDate = DateUtil.getCurrentDate(); service = DataFlowFactory.getService(dataFlow,business.getServiceCode()); if(!CommonConstant.INSTANCE_Y.equals(service.getIsInstance())){ continue; } requestBusinessJson = DataFlowFactory.getCompleteInstanceDataJson(dataFlow,business); JSONObject responseJson = doRequestBusinessSystem(dataFlow, service, requestBusinessJson); DataFlowFactory.addCostTime(dataFlow, business.getServiceCode(), "调用"+business.getServiceName()+"-doComplete耗时", businessStartDate); saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),requestBusinessJson.toJSONString()), LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),responseJson.toJSONString()), DateUtil.getCurrentDate().getTime() - businessStartDate.getTime()); } } /** * 将BusinessTable 中的数据保存到 InstanceTable * @param dataFlow * @param synchronousBusinesses */ private void doBusinessTableDataInfoToInstanceTable(DataFlow dataFlow, List synchronousBusinesses,List deleteBusinesses) { Date businessStartDate; AppService service; JSONObject requestBusinessJson; for(Business business : synchronousBusinesses){ businessStartDate = DateUtil.getCurrentDate(); service = DataFlowFactory.getService(dataFlow,business.getServiceCode()); if(!CommonConstant.INSTANCE_Y.equals(service.getIsInstance())){ continue; } dataFlow.setCurrentBusiness(business); //添加需要撤单的业务信息 deleteBusinesses.add(business); requestBusinessJson = DataFlowFactory.getBusinessTableDataInfoToInstanceTableJson(dataFlow,business); JSONObject responseJson = doRequestBusinessSystem(dataFlow, service, requestBusinessJson); updateBusinessStatusCdByBId(business.getbId(),StatusConstant.STATUS_CD_COMPLETE); DataFlowFactory.addCostTime(dataFlow, business.getServiceCode(), "调用"+business.getServiceName()+"耗时", businessStartDate); saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),requestBusinessJson.toJSONString()), LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),responseJson.toJSONString()), DateUtil.getCurrentDate().getTime() - businessStartDate.getTime()); } if(dataFlow.getCurrentBusiness() == null){ return ; } service = DataFlowFactory.getService(dataFlow,dataFlow.getCurrentBusiness().getServiceCode()); if(CommonConstant.INSTANCE_Y.equals(service.getIsInstance())){ //判断业务动作是否都竣工,主要考虑 请求报文中 有异步也有同步的情况 //如果业务都完成,则将 订单改为完成状态 centerServiceDaoImpl.completeOrderByBId(dataFlow.getCurrentBusiness().getbId()); } } /** * 业务系统撤单 * @param dataFlow * @param deleteBusinesses */ private void doDeleteBusinessSystemInstanceData(DataFlow dataFlow, List deleteBusinesses) { Date businessStartDate; AppService service; JSONObject requestBusinessJson; for(Business business : deleteBusinesses){ businessStartDate = DateUtil.getCurrentDate(); service = DataFlowFactory.getService(dataFlow,business.getServiceCode()); requestBusinessJson = DataFlowFactory.getDeleteInstanceTableJson(dataFlow,business); JSONObject responseJson = doRequestBusinessSystem(dataFlow, service, requestBusinessJson); DataFlowFactory.addCostTime(dataFlow, business.getServiceCode(), "调用"+business.getServiceName()+"-撤单 耗时", businessStartDate); saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),requestBusinessJson.toJSONString()), LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),responseJson.toJSONString()), DateUtil.getCurrentDate().getTime() - businessStartDate.getTime()); } } private JSONObject doRequestBusinessSystem(DataFlow dataFlow, AppService service, JSONObject requestBusinessJson) { String responseMessage; if(!StringUtil.isNullOrNone(service.getMethod()) && !"POST,PUT,GET,DELETE,PATCH,HEAD,OPTIONS,TRACE".contains(service.getMethod())) {//webservice方式 responseMessage = (String) WebServiceAxisClient.callWebService(service.getUrl(),service.getMethod(), new Object[]{dataFlow.getRequestBusinessJson().toJSONString()}, service.getTimeOut()); }else{//post方式 //http://user-service/test/sayHello responseMessage = restTemplate.postForObject(service.getUrl(),requestBusinessJson.toJSONString(),String.class); } if(StringUtil.isNullOrNone(responseMessage) || !Assert.isJsonObject(responseMessage)){ throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR,"下游系统返回格式不正确,请按协议规范处理"); } JSONObject responseJson = JSONObject.parseObject(responseMessage); Assert.jsonObjectHaveKey(responseJson,"response","下游返回报文格式错误,没有包含responseJson节点【"+service.getUrl()+"】"); JSONObject responseInfo = responseJson.getJSONObject("response"); Assert.jsonObjectHaveKey(responseInfo,"code","下游返回报文格式错误,response 节点中没有包含code节点【"+service.getUrl()+"】"); if(!ResponseConstant.RESULT_CODE_SUCCESS.equals(responseInfo.getString("code"))){ throw new BusinessException(ResponseConstant.RESULT_PARAM_ERROR,"业务系统处理失败,"+responseInfo.getString("message")); } return responseJson; } private String doTransferRequestBusinessSystem(DataFlow dataFlow, AppService service, String reqData) { String responseMessage; if(service.getMethod() == null || "".equals(service.getMethod())) {//post方式 //http://user-service/test/sayHello HttpHeaders header = new HttpHeaders(); for(String key : dataFlow.getRequestCurrentHeaders().keySet()){ header.add(key,dataFlow.getRequestCurrentHeaders().get(key)); } HttpEntity httpEntity = new HttpEntity(reqData, header); responseMessage = outRestTemplate.postForObject(service.getUrl(),httpEntity,String.class); }else{//webservice方式 responseMessage = (String) WebServiceAxisClient.callWebService(service.getUrl(),service.getMethod(), new Object[]{dataFlow.getRequestBusinessJson().toJSONString()}, service.getTimeOut()); } return responseMessage; } /** * 数据保存到BusinessTable 中 * @param dataFlow * @param synchronousBusinesses * @param responseBusinesses */ private void doSaveDataInfoToBusinessTable(DataFlow dataFlow, List synchronousBusinesses, JSONArray responseBusinesses) { Date businessStartDate; AppService service; JSONObject requestBusinessJson; for(Business business : synchronousBusinesses) { businessStartDate = DateUtil.getCurrentDate(); service = DataFlowFactory.getService(dataFlow,business.getServiceCode()); if(CommonConstant.INSTANCE_Y.equals(service.getIsInstance())){ //发起Business过程 updateBusinessStatusCdByBId(business.getbId(),StatusConstant.STATUS_CD_BUSINESS); } requestBusinessJson = DataFlowFactory.getRequestBusinessJson(dataFlow,business); dataFlow.setRequestBusinessJson(requestBusinessJson); JSONObject responseJson = doRequestBusinessSystem(dataFlow, service, dataFlow.getRequestBusinessJson()); dataFlow.setResponseBusinessJson(responseJson); //发布事件 //DataFlowEventPublishing.multicastEvent(service.getServiceCode(),dataFlow); responseBusinesses.add(dataFlow.getResponseBusinessJson()); DataFlowFactory.addCostTime(dataFlow, business.getServiceCode(), "调用"+business.getServiceName()+"耗时", businessStartDate); saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),dataFlow.getRequestBusinessJson().toJSONString()), LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),dataFlow.getResponseBusinessJson().toJSONString()), DateUtil.getCurrentDate().getTime()-businessStartDate.getTime()); } } /** * 处理异步业务 * @param */ private void doAsynchronousBusinesses(DataFlow dataFlow) throws BusinessException{ Date startDate = DateUtil.getCurrentDate(); //6.3 处理异步,按消息队里处理 List asynchronousBusinesses = DataFlowFactory.getAsynchronousBusinesses(dataFlow); if(asynchronousBusinesses == null || asynchronousBusinesses.size() == 0){ return ; } try { for (Business business : asynchronousBusinesses) { dataFlow.setCurrentBusiness(business); KafkaFactory.sendKafkaMessage(DataFlowFactory.getService(dataFlow, business.getServiceCode()).getMessageQueueName(), "", DataFlowFactory.getRequestBusinessJson(dataFlow,business).toJSONString()); } }catch (Exception e){ throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR,e.getMessage()); } dataFlow.setRequestBusinessJson(dataFlow.getReqJson()); dataFlow.setResponseBusinessJson(DataTransactionFactory.createOrderResponseJson(dataFlow.getTransactionId(), ResponseConstant.RESULT_CODE_SUCCESS, "成功")); DataFlowFactory.addCostTime(dataFlow, "doSynchronousBusinesses", "异步调用业务系统总耗时", startDate); saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),dataFlow.getRequestBusinessJson().toJSONString()), LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),dataFlow.getResponseBusinessJson().toJSONString()), DateUtil.getCurrentDate().getTime()-startDate.getTime()); } /** * 保存日志信息 * @param dataFlow 数据流对象 封装用户请求的信息 * * @param requestJson 请求报文 格式为 * {"headers":"", * "body":"" * } * @param responseJson 请求报文 格式为 * {"headers":"", * "body":"" * } */ private void saveLogMessage(DataFlow dataFlow,JSONObject requestJson,JSONObject responseJson,long costTime){ LogAgent.sendLog(dataFlow,requestJson,responseJson,costTime); } /** * 保存日志信息 * @param requestJson */ private void saveLogMessage(String requestJson,String responseJson){ try{ if(MappingConstant.VALUE_ON.equals(MappingCache.getValue(MappingConstant.DOMAIN_SYSTEM_SWITCH,MappingConstant.KEY_LOG_ON_OFF))){ JSONObject log = new JSONObject(); log.put("request",requestJson); log.put("response",responseJson); KafkaFactory.sendKafkaMessage(KafkaConstant.TOPIC_LOG_NAME,"",log.toJSONString()); } }catch (Exception e){ logger.error("报错日志出错了,",e); } } /** * 保存耗时信息 * @param dataFlow */ private void saveCostTimeLogMessage(DataFlow dataFlow){ try{ if(MappingConstant.VALUE_ON.equals(MappingCache.getValue(MappingConstant.KEY_COST_TIME_ON_OFF))){ List dataFlowLinksCosts = dataFlow.getLinksCostDates(); JSONObject costDate = new JSONObject(); JSONArray costDates = new JSONArray(); JSONObject newObj = null; for(DataFlowLinksCost dataFlowLinksCost : dataFlowLinksCosts){ newObj = JSONObject.parseObject(JSONObject.toJSONString(dataFlowLinksCost)); newObj.put("dataFlowId",dataFlow.getDataFlowId()); newObj.put("transactionId",dataFlow.getTransactionId()); costDates.add(newObj); } costDate.put("costDates",costDates); KafkaFactory.sendKafkaMessage(KafkaConstant.TOPIC_COST_TIME_LOG_NAME,"",costDate.toJSONString()); } }catch (Exception e){ logger.error("报错日志出错了,",e); } } /** * 修改c_business状态 * @param bId * @param statusCd */ private void updateBusinessStatusCdByBId(String bId,String statusCd){ Map business = new HashMap(); business.put("bId",bId); business.put("statusCd",statusCd); business.put("finishTime",DateUtil.getCurrentDate()); centerServiceDaoImpl.updateBusinessByBId(business); } public ICenterServiceDAO getCenterServiceDaoImpl() { return centerServiceDaoImpl; } public void setCenterServiceDaoImpl(ICenterServiceDAO centerServiceDaoImpl) { this.centerServiceDaoImpl = centerServiceDaoImpl; } public RestTemplate getRestTemplate() { return restTemplate; } public void setRestTemplate(RestTemplate restTemplate) { this.restTemplate = restTemplate; } public IQueryServiceSMO getQueryServiceSMOImpl() { return queryServiceSMOImpl; } public void setQueryServiceSMOImpl(IQueryServiceSMO queryServiceSMOImpl) { this.queryServiceSMOImpl = queryServiceSMOImpl; } }