package com.java110.order.smo.impl; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.java110.core.client.RestTemplate; import com.java110.core.context.*; import com.java110.core.event.center.DataFlowEventPublishing; import com.java110.core.factory.AuthenticationFactory; import com.java110.core.factory.OrderDataFlowContextFactory; import com.java110.core.log.LogAgent; import com.java110.entity.center.AppService; import com.java110.entity.center.DataFlowLinksCost; import com.java110.entity.order.Business; import com.java110.entity.order.ServiceBusiness; import com.java110.order.dao.ICenterServiceDAO; import com.java110.order.smo.IOrderServiceSMO; import com.java110.service.init.ServiceInfoListener; import com.java110.service.smo.IQueryServiceSMO; import com.java110.utils.cache.MappingCache; import com.java110.utils.constant.*; import com.java110.utils.exception.*; import com.java110.utils.factory.ApplicationContextFactory; import com.java110.utils.kafka.KafkaFactory; import com.java110.utils.log.LoggerEngine; import com.java110.utils.util.DateUtil; import com.java110.utils.util.ServiceBusinessUtil; import com.java110.utils.util.StringUtil; import com.java110.utils.util.WebServiceAxisClient; import org.apache.commons.lang3.math.NumberUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 订单服务处理类 * Created by wuxw on 2018/4/13. */ @Service("orderServiceSMOImpl") //@Transactional public class OrderServiceSMOImpl extends AbstractOrderServiceSMOImpl implements IOrderServiceSMO { private static Logger logger = LoggerFactory.getLogger(OrderServiceSMOImpl.class); @Autowired private IQueryServiceSMO queryServiceSMOImpl; /** * 业务统一处理服务方法 * * @param reqJson 请求报文json * @return */ public ResponseEntity service(String reqJson, Map headers) throws SMOException { IOrderDataFlowContext dataFlow = null; JSONObject responseJson = null; ResponseEntity responseEntity = null; try { DataFlowEventPublishing.preValidateData(reqJson, headers); //1.0 创建数据流 dataFlow = OrderDataFlowContextFactory.newInstance(OrderDataFlow.class).builder(reqJson, headers); DataFlowEventPublishing.initDataFlowComplete(dataFlow); //2.0 调用规则校验 ruleValidate(dataFlow); DataFlowEventPublishing.ruleValidateComplete(dataFlow); //3.0 保存订单和业务项 c_orders c_order_attrs c_business c_business_attrs saveOrdersAndBusiness(dataFlow); //6.0 调用下游系统 DataFlowEventPublishing.invokeBusinessSystem(dataFlow); invokeBusinessSystem(dataFlow); // 业务调用完成 DataFlowEventPublishing.invokeFinishBusinessSystem(dataFlow); //能够执行到这一步 认为是都成功了 refreshOrderDataFlowResJson(dataFlow); } catch (BusinessException e) { responseEntity = new ResponseEntity(e.getMessage(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()), HttpStatus.INTERNAL_SERVER_ERROR); } catch (OrdersException e) { responseEntity = new ResponseEntity(e.getMessage(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()), HttpStatus.INTERNAL_SERVER_ERROR); } catch (RuleException e) { responseEntity = new ResponseEntity(e.getMessage(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()), HttpStatus.NETWORK_AUTHENTICATION_REQUIRED); } catch (NoAuthorityException e) { responseEntity = new ResponseEntity(e.getMessage(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()), HttpStatus.UNAUTHORIZED); } catch (InitConfigDataException e) { responseEntity = new ResponseEntity(e.getMessage(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()), HttpStatus.INTERNAL_SERVER_ERROR); } catch (Exception e) { logger.error("内部异常了:", e); responseEntity = new ResponseEntity("内部异常了:" + e.getMessage() + e.getLocalizedMessage(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()), HttpStatus.INTERNAL_SERVER_ERROR); } finally { if (responseEntity == null) { responseEntity = new ResponseEntity(dataFlow.getResJson().getJSONArray("msg").toJSONString(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()), HttpStatus.OK); } if (dataFlow != null) { //添加耗时 //OrderDataFlowContextFactory.addCostTime(dataFlow, "service", "业务处理总耗时", dataFlow.getStartDate(), dataFlow.getEndDate()); //保存耗时 //saveCostTimeLogMessage(dataFlow); // saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestHeaders(),dataFlow.getReqJson().toJSONString()), // LogAgent.createLogMessage(dataFlow.getResponseHeaders(),responseEntity.getBody()),endDate.getTime()-dataFlow.getStartDate().getTime()); // DataFlowEventPublishing.dataResponse(dataFlow,reqJson,headers); } //这里保存耗时,以及日志 } return responseEntity; } /** * 抒写返回头信息 * * @param dataFlow */ private void putResponseHeader(DataFlow dataFlow, Map headers) { headers.put("responseTime", DateUtil.getDefaultFormateTimeString(new Date())); headers.put("transactionId", dataFlow.getTransactionId()); } /** * 解密 * * @param reqJson * @return */ private String decrypt(String reqJson, Map headers) throws DecryptException { try { if (MappingConstant.VALUE_ON.equals(headers.get(CommonConstant.ENCRYPT))) { logger.debug("解密前字符:" + reqJson); reqJson = new String(AuthenticationFactory.decrypt(reqJson.getBytes("UTF-8"), AuthenticationFactory.loadPrivateKey(MappingConstant.KEY_PRIVATE_STRING) , NumberUtils.isNumber(headers.get(CommonConstant.ENCRYPT_KEY_SIZE)) ? Integer.parseInt(headers.get(CommonConstant.ENCRYPT_KEY_SIZE)) : Integer.parseInt(MappingCache.getValue(MappingConstant.KEY_DEFAULT_DECRYPT_KEY_SIZE))), "UTF-8"); logger.debug("解密后字符:" + reqJson); } } catch (Exception e) { throw new DecryptException(ResponseConstant.RESULT_CODE_NO_AUTHORITY_ERROR, "解密失败"); } return reqJson; } /** * 加密 * * @param resJson * @param headers * @return */ private String encrypt(String resJson, Map headers) { try { if (MappingConstant.VALUE_ON.equals(headers.get(CommonConstant.ENCRYPT))) { logger.debug("加密前字符:" + resJson); resJson = new String(AuthenticationFactory.encrypt(resJson.getBytes("UTF-8"), AuthenticationFactory.loadPubKey(MappingConstant.KEY_PUBLIC_STRING) , NumberUtils.isNumber(headers.get(CommonConstant.ENCRYPT_KEY_SIZE)) ? Integer.parseInt(headers.get(CommonConstant.ENCRYPT_KEY_SIZE)) : Integer.parseInt(MappingCache.getValue(MappingConstant.KEY_DEFAULT_DECRYPT_KEY_SIZE))), "UTF-8"); logger.debug("加密后字符:" + resJson); } } catch (Exception e) { logger.error("加密失败:", e); } return resJson; } /** * 2.0初始化配置信息 * * @param dataFlow */ private void initConfigData(IOrderDataFlowContext dataFlow) { } /** * 7.0 作废订单和业务项 插入撤单记录 等待撤单 * * @param dataFlow */ private void invalidOrderAndBusiness(IOrderNotifyDataFlowContext dataFlow) { Date startDate = DateUtil.getCurrentDate(); if (MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER) != null && MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER).contains(dataFlow.getOrderTypeCd())) { //不用作废订单信息 // OrderDataFlowContextFactory.addCostTime(dataFlow, "invalidOrderAndBusiness", "作废订单和业务项耗时", startDate); return; } //如果已经作废 不存在 或失败,则不做处理 Map order = centerServiceDaoImpl.getOrderInfoByBId(dataFlow.getbId()); if (order == null || !order.containsKey("status_cd") || StatusConstant.STATUS_CD_DELETE.equals(order.get("status_cd")) || StatusConstant.STATUS_CD_ERROR.equals(order.get("status_cd"))) { return; } //作废 订单 centerServiceDaoImpl.updateOrder(OrderDataFlowContextFactory.getNeedInvalidOrder(dataFlow)); //作废订单项 centerServiceDaoImpl.updateBusiness(OrderDataFlowContextFactory.getNeedInvalidOrder(dataFlow)); //将当前订单项改为 撤单状态 centerServiceDaoImpl.updateBusinessByBId(OrderDataFlowContextFactory.getNeedDeleteBusiness(dataFlow)); //插入撤单记录 doAddDeleteOrderBusinessData(dataFlow); //OrderDataFlowContextFactory.addCostTime(dataFlow, "invalidOrderAndBusiness", "作废订单和业务项耗时", startDate); } /** * 8.0 广播作废已经完成业务系统订单信息 * * @param dataFlow */ private void invalidCompletedBusinessSystem(IOrderNotifyDataFlowContext dataFlow) throws Exception { if (!StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) { return; } //判断 订单instance 是否都变成了撤单状态 if (centerServiceDaoImpl.judgeAllBusinessDeleteOrder(dataFlow.getoId(), StatusConstant.STATUS_CD_DELETE_ORDER) < 1) { return; } // 根据 c_business 表中的字段business_type_cd 找到对应的消息队列名称 Map paramIn = new HashMap(); paramIn.put("oId", dataFlow.getoId()); paramIn.put("statusCd", StatusConstant.STATUS_CD_DELETE_ORDER); List completedBusinesses = centerServiceDaoImpl.getBusinessByOId(paramIn); for (Map completedBusiness : completedBusinesses) { ServiceBusiness serviceBusiness = ServiceBusinessUtil.getServiceBusiness(completedBusiness.get("business_type_cd").toString()); long startTime = DateUtil.getCurrentDate().getTime(); //发起撤单 KafkaFactory.sendKafkaMessage(serviceBusiness.getMessageTopic(), "", OrderDataFlowContextFactory.getDeleteInstanceTableJson(dataFlow, completedBusiness).toJSONString()); //saveLogMessage(OrderDataFlowContextFactory.getDeleteInstanceTableJson(dataFlow,completedBusiness,appRoute.getAppService()),null); } } /** * 9.0 将订单状态改为失败,人工处理。 * * @param dataFlow */ private void updateOrderAndBusinessError(IOrderDataFlowContext dataFlow) { Date startDate = DateUtil.getCurrentDate(); //作废 订单 centerServiceDaoImpl.updateOrder(OrderDataFlowContextFactory.getNeedErrorOrder(dataFlow)); //作废订单项 centerServiceDaoImpl.updateBusiness(OrderDataFlowContextFactory.getNeedErrorOrder(dataFlow)); OrderDataFlowContextFactory.addCostTime(dataFlow, "updateOrderAndBusinessError", "订单状态改为失败耗时", startDate); } /** * 9.0 将订单状态改为失败,人工处理。 * * @param dataFlow */ private void updateOrderAndBusinessError(IOrderNotifyDataFlowContext dataFlow) { Date startDate = DateUtil.getCurrentDate(); //作废 订单 centerServiceDaoImpl.updateOrder(OrderDataFlowContextFactory.getNeedErrorOrder(dataFlow)); //作废订单项 centerServiceDaoImpl.updateBusiness(OrderDataFlowContextFactory.getNeedErrorOrder(dataFlow)); //OrderDataFlowContextFactory.addCostTime(dataFlow, "updateOrderAndBusinessError", "订单状态改为失败耗时", startDate); } /** * 加入撤单记录 * * @param dataFlow */ private void doAddDeleteOrderBusinessData(IOrderDataFlowContext dataFlow) { /* Map business = new HashMap(); business.put("bId",SequenceUtil.getBId()); business.put("oId",dataFlow.getoId()); business.put("businessTypeCd",StatusConstant.REQUEST_BUSINESS_TYPE_DELETE); business.put("remark","发起撤单"); business.put("statusCd",StatusConstant.STATUS_CD_DELETE_ORDER);*/ centerServiceDaoImpl.saveBusiness(OrderDataFlowContextFactory.getDeleteOrderBusiness(dataFlow, "订单失败,加入撤单")); } /** * 加入撤单记录 * * @param dataFlow */ private void doAddDeleteOrderBusinessData(IOrderNotifyDataFlowContext dataFlow) { /* Map business = new HashMap(); business.put("bId",SequenceUtil.getBId()); business.put("oId",dataFlow.getoId()); business.put("businessTypeCd",StatusConstant.REQUEST_BUSINESS_TYPE_DELETE); business.put("remark","发起撤单"); business.put("statusCd",StatusConstant.STATUS_CD_DELETE_ORDER);*/ centerServiceDaoImpl.saveBusiness(OrderDataFlowContextFactory.getDeleteOrderBusiness(dataFlow, "订单失败,加入撤单")); } /** * 接受业务系统通知消息 * * @param receiveJson 接受报文 * @throws SMOException */ @Override public void receiveBusinessSystemNotifyMessage(String receiveJson) throws SMOException { Date startDate = DateUtil.getCurrentDate(); IOrderNotifyDataFlowContext dataFlow = null; try { //1.0 创建数据流 dataFlow = OrderDataFlowContextFactory.newInstance(OrderNotifyDataFlow.class).builder(receiveJson, null); //如果订单都没有保存,则再不要处理 if (MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER) != null && MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER).contains(dataFlow.getOrderTypeCd())) { //不保存订单信息 return; } //如果不是 business 和instance 过程 则直接跳出 judgeBusinessOrInstance(dataFlow); //2.0加载数据,没有找到appId 及配置信息 则抛出InitConfigDataException reloadOrderInfo(dataFlow); //3.0 判断是否成功,失败会抛出BusinessStatusException异常 judgeBusinessStatusAndCompleteBusiness(dataFlow); //4.0 修改业务为成功,如果发现业务项已经是作废或失败状态(D或E)则抛出BusinessException异常 //completeBusiness(dataFlow); //5.0 判断 发起 Instance 条件是否满足,如果满足 发起 Instance过程 judgeSendToInstance(dataFlow); //7.0 判断撤单条件是否满足,如果满足发起撤单 invalidCompletedBusinessSystem(dataFlow); } catch (BusinessStatusException e) { logger.error("订单失败:", e); //8.0 将订单状态改为失败,人工处理。 updateOrderAndBusinessError(dataFlow); } catch (BusinessException e) { //9.0说明这个订单已经失败了,再不需要 //想法,这里广播当前失败业务 logger.error("修改业务数据失败", e); }/*catch (InitConfigDataException e){ //这种一般不会出现,除非人工改了数据 LoggerEngine.error("加载配置数据出错", e); try { //6.0 作废订单和所有业务项 invalidOrderAndBusiness(dataFlow); //7.0 广播作废业务系统订单信息,这里只有 Instance 失败后才发起 撤单 if(StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) { invalidCompletedBusinessSystem(dataFlow); } } catch (Exception e1) { LoggerEngine.error("作废订单失败", e1); //8.0 将订单状态改为失败,人工处理。 updateOrderAndBusinessError(dataFlow); } }*/ catch (NoSupportException e) { LoggerEngine.error("当前业务不支持", e); } catch (Exception e) { LoggerEngine.error("作废订单失败", e); //10.0 成功的情况下通知下游系统失败将状态改为NE,人工处理。 updateBusinessNotifyError(dataFlow); } finally { /*OrderDataFlowContextFactory.addCostTime(dataFlow, "receiveBusinessSystemNotifyMessage", "接受业务系统通知消息耗时", startDate); saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),dataFlow.getReqJson().toJSONString()), LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),ResponseConstant.RESULT_CODE_SUCCESS), DateUtil.getCurrentDate().getTime() - dataFlow.getStartDate().getTime());*/ } } /** * Instance过程 * * @param dataFlow */ private void doSendInstance(IOrderNotifyDataFlowContext dataFlow) { if (dataFlow == null || !StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(dataFlow.getBusinessType())) { return; } try { ServiceBusiness serviceBusiness = ServiceBusinessUtil.getServiceBusiness(dataFlow.getBusinessTypeCd()); KafkaFactory.sendKafkaMessage(serviceBusiness.getMessageTopic(), "", OrderDataFlowContextFactory.getBusinessTableDataInfoToInstanceTableJson(dataFlow).toJSONString()); } catch (Exception e) { } } /** * 判断是否是 business 或者 instance过程 * * @param dataFlow * @throws NoSupportException */ private void judgeBusinessOrInstance(IOrderNotifyDataFlowContext dataFlow) throws NoSupportException { if (dataFlow == null || StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(dataFlow.getBusinessType()) || StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) { return; } throw new NoSupportException(ResponseConstant.RESULT_PARAM_ERROR, "当前只支持 Business 和 Instance过程"); } /** * 2.0重新加载订单信息到dataFlow 中 * * @param dataFlow */ private void reloadOrderInfoAndConfigData(DataFlow dataFlow) { Map order = centerServiceDaoImpl.getOrderInfoByBId(dataFlow.getBusinesses().get(0).getbId()); dataFlow.setoId(order.get("o_id").toString()); dataFlow.setAppId(order.get("app_id").toString()); if ("-1".equals(dataFlow.getDataFlowId()) || StringUtil.isNullOrNone(dataFlow.getDataFlowId())) { throw new InitConfigDataException(ResponseConstant.RESULT_CODE_ERROR, "请求报文中没有包含 dataFlowId 节点"); } //重新刷端口信息 ServiceInfoListener serviceInfoListener = ApplicationContextFactory.getBean("serviceInfoListener", ServiceInfoListener.class); if (serviceInfoListener != null) { dataFlow.setPort(serviceInfoListener.getServerPort() + ""); } //重新加载配置 //initConfigData(dataFlow); } /** * 2.0重新加载订单信息到dataFlow 中 * * @param dataFlow */ private void reloadOrderInfo(IOrderNotifyDataFlowContext dataFlow) { Map order = centerServiceDaoImpl.getOrderInfoByBId(dataFlow.getbId()); dataFlow.setoId(order.get("o_id").toString()); if ("-1".equals(dataFlow.getDataFlowId()) || StringUtil.isNullOrNone(dataFlow.getDataFlowId())) { throw new InitConfigDataException(ResponseConstant.RESULT_CODE_ERROR, "请求报文中没有包含 dataFlowId 节点"); } } /** * 9.0 成功的情况下通知下游系统失败将状态改为NE,人工处理。 * * @param dataFlow */ private void updateBusinessNotifyError(IOrderNotifyDataFlowContext dataFlow) { Date startDate = DateUtil.getCurrentDate(); //完成订单项 centerServiceDaoImpl.updateBusinessByBId(OrderDataFlowContextFactory.getNeedNotifyErrorBusiness(dataFlow)); // OrderDataFlowContextFactory.addCostTime(dataFlow, "updateBusinessNotifyError", "订单状态改为失败耗时", startDate); } /** * 判断是否都成功了 * * @param dataFlow */ private void judgeBusinessStatusAndCompleteBusiness(IOrderNotifyDataFlowContext dataFlow) throws BusinessStatusException { //List businesses = dataFlow.getBusinesses(); //1.0 判断是否存在撤单,如果是撤单则将当前 bId 标记为撤单状态 if (StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) { Map businessMap = centerServiceDaoImpl.getDeleteOrderBusinessByOId(dataFlow.getoId()); if (businessMap != null && !businessMap.isEmpty()) { centerServiceDaoImpl.updateBusinessByBId(OrderDataFlowContextFactory.getNeedDeleteBusiness(dataFlow)); return; } } //Business business = dataFlow.getCurrentBusiness(); if (!ResponseConstant.RESULT_CODE_SUCCESS.equals(((IOrderResponse) dataFlow).getCode())) { //throw new BusinessStatusException(business.getCode(),"业务bId= "+business.getbId() + " 处理失败,需要作废订单"); //作废订单和业务项 插入撤单记录 等待撤单 invalidOrderAndBusiness(dataFlow); } else { completeBusiness(dataFlow); } } /** * 3.0 修改业务为成功,如果发现业务项已经是作废或失败状态(D或E)则抛出BusinessException异常 * * @param dataFlow */ private void completeBusiness(IOrderNotifyDataFlowContext dataFlow) throws BusinessException { try { if (StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) { //完成订单项 centerServiceDaoImpl.updateBusinessByBId(OrderDataFlowContextFactory.getNeedCompleteBusiness(dataFlow)); //如果业务都完成,则将 订单改为完成状态 centerServiceDaoImpl.completeOrderByBId(dataFlow.getbId()); } else if (StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(dataFlow.getBusinessType())) { centerServiceDaoImpl.updateBusinessByBId(OrderDataFlowContextFactory.getNeedBusinessComplete(dataFlow)); } else { //这里到不了,前面做了校验 throw new BusinessException(ResponseConstant.RESULT_PARAM_ERROR, "当前不支持 业务类型为 businessType" + dataFlow.getBusinessType()); } } catch (DAOException e) { throw new BusinessException(e.getResult(), e); } } /** * //4.0当所有业务动作是否都是C,将订单信息改为 C 并且发布竣工消息,这里在广播之前确认 * * @param dataFlow */ private void judgeSendToInstance(IOrderNotifyDataFlowContext dataFlow) throws Exception { try { if (centerServiceDaoImpl.judgeAllBusinessCompleted(dataFlow.getoId(), StatusConstant.STATUS_CD_BUSINESS_COMPLETE) > 0) { //通知成功消息 doSendInstance(dataFlow); } } catch (DAOException e) { //这里什么都不做,说明订单没有完成 } } /** * 通知 订单已经完成,后端需要完成数据 * * @param dataFlow */ private void notifyBusinessSystemSuccessMessage(IOrderNotifyDataFlowContext dataFlow) throws Exception { long startTime = DateUtil.getCurrentDate().getTime(); ServiceBusiness serviceBusiness = ServiceBusinessUtil.getServiceBusiness(dataFlow.getBusinessTypeCd()); //拼装报文通知业务系统 KafkaFactory.sendKafkaMessage(serviceBusiness.getMessageTopic(), "", OrderDataFlowContextFactory.getNotifyBusinessSuccessJson(dataFlow).toJSONString()); /*saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),OrderDataFlowContextFactory.getNotifyBusinessSuccessJson(dataFlow).toJSONString()), LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),ResponseConstant.RESULT_CODE_SUCCESS), DateUtil.getCurrentDate().getTime() - startTime);*/ } /** * 8.0 广播作废业务系统订单信息 * * @param dataFlow */ private void notifyBusinessSystemErrorMessage(IOrderNotifyDataFlowContext dataFlow) throws Exception { long startTime = DateUtil.getCurrentDate().getTime(); ServiceBusiness serviceBusiness = ServiceBusinessUtil.getServiceBusiness(dataFlow.getBusinessTypeCd()); //拼装报文通知业务系统 KafkaFactory.sendKafkaMessage(serviceBusiness.getMessageTopic(), "", OrderDataFlowContextFactory.getNotifyBusinessErrorJson(dataFlow).toJSONString()); /*saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),OrderDataFlowContextFactory.getNotifyBusinessErrorJson(dataFlow).toJSONString()), LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),ResponseConstant.RESULT_CODE_ERROR), DateUtil.getCurrentDate().getTime() - startTime);*/ } private String doTransferRequestBusinessSystem(DataFlow dataFlow, AppService service, String reqData) { String responseMessage; if (service.getMethod() == null || "".equals(service.getMethod())) {//post方式 //http://user-service/test/sayHello HttpHeaders header = new HttpHeaders(); for (String key : dataFlow.getRequestCurrentHeaders().keySet()) { header.add(key, dataFlow.getRequestCurrentHeaders().get(key)); } HttpEntity httpEntity = new HttpEntity(reqData, header); responseMessage = outRestTemplate.postForObject(service.getUrl(), httpEntity, String.class); } else {//webservice方式 responseMessage = (String) WebServiceAxisClient.callWebService(service.getUrl(), service.getMethod(), new Object[]{dataFlow.getRequestBusinessJson().toJSONString()}, service.getTimeOut()); } return responseMessage; } /** * 处理异步业务 * * @param */ @Override protected void doAsynchronousBusinesses(IOrderDataFlowContext dataFlow) throws BusinessException { Date startDate = DateUtil.getCurrentDate(); //6.3 处理异步,按消息队里处理 List asynchronousBusinesses = OrderDataFlowContextFactory.getAsynchronousBusinesses(dataFlow); if (asynchronousBusinesses == null || asynchronousBusinesses.size() == 0) { return; } ServiceBusiness serviceBusiness; try { for (Business business : asynchronousBusinesses) { serviceBusiness = ServiceBusinessUtil.getServiceBusiness(business.getBusinessTypeCd()); KafkaFactory.sendKafkaMessage(serviceBusiness.getMessageTopic(), "", OrderDataFlowContextFactory.getRequestBusinessJson(dataFlow, business).toJSONString()); } } catch (Exception e) { throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR, e.getMessage()); } OrderDataFlowContextFactory.addCostTime(dataFlow, "doSynchronousBusinesses", "异步调用业务系统总耗时", startDate); // saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),dataFlow.getRequestBusinessJson().toJSONString()), // LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),dataFlow.getResponseBusinessJson().toJSONString()), // DateUtil.getCurrentDate().getTime()-startDate.getTime()); } /** * 保存日志信息 * * @param dataFlow 数据流对象 封装用户请求的信息 * @param requestJson 请求报文 格式为 * {"headers":"", * "body":"" * } * @param responseJson 请求报文 格式为 * {"headers":"", * "body":"" * } */ private void saveLogMessage(DataFlow dataFlow, JSONObject requestJson, JSONObject responseJson, long costTime) { LogAgent.sendLog(dataFlow, requestJson, responseJson, costTime); } /** * 保存日志信息 * * @param requestJson */ private void saveLogMessage(String requestJson, String responseJson) { try { if (MappingConstant.VALUE_ON.equals(MappingCache.getValue(MappingConstant.KEY_LOG_ON_OFF))) { JSONObject log = new JSONObject(); log.put("request", requestJson); log.put("response", responseJson); KafkaFactory.sendKafkaMessage(KafkaConstant.TOPIC_LOG_NAME, "", log.toJSONString()); } } catch (Exception e) { logger.error("报错日志出错了,", e); } } /** * 保存耗时信息 * * @param dataFlow */ private void saveCostTimeLogMessage(DataFlow dataFlow) { try { if (MappingConstant.VALUE_ON.equals(MappingCache.getValue(MappingConstant.KEY_COST_TIME_ON_OFF))) { List dataFlowLinksCosts = dataFlow.getLinksCostDates(); JSONObject costDate = new JSONObject(); JSONArray costDates = new JSONArray(); JSONObject newObj = null; for (DataFlowLinksCost dataFlowLinksCost : dataFlowLinksCosts) { newObj = JSONObject.parseObject(JSONObject.toJSONString(dataFlowLinksCost)); newObj.put("dataFlowId", dataFlow.getDataFlowId()); newObj.put("transactionId", dataFlow.getTransactionId()); costDates.add(newObj); } costDate.put("costDates", costDates); KafkaFactory.sendKafkaMessage(KafkaConstant.TOPIC_COST_TIME_LOG_NAME, "", costDate.toJSONString()); } } catch (Exception e) { logger.error("报错日志出错了,", e); } } public ICenterServiceDAO getCenterServiceDaoImpl() { return centerServiceDaoImpl; } public void setCenterServiceDaoImpl(ICenterServiceDAO centerServiceDaoImpl) { this.centerServiceDaoImpl = centerServiceDaoImpl; } public RestTemplate getRestTemplate() { return restTemplate; } public void setRestTemplate(RestTemplate restTemplate) { this.restTemplate = restTemplate; } public IQueryServiceSMO getQueryServiceSMOImpl() { return queryServiceSMOImpl; } public void setQueryServiceSMOImpl(IQueryServiceSMO queryServiceSMOImpl) { this.queryServiceSMOImpl = queryServiceSMOImpl; } }