wuxw
2019-04-26 b1825e0e308329210229b72e556a137dd56f88ed
OrderService/src/main/java/com/java110/order/smo/impl/OrderServiceSMOImpl.java
@@ -65,10 +65,11 @@
    /**
     * 业务统一处理服务方法
     *
     * @param reqJson 请求报文json
     * @return
     */
    public ResponseEntity<String> service(String reqJson, Map<String,String> headers) throws SMOException{
    public ResponseEntity<String> service(String reqJson, Map<String, String> headers) throws SMOException {
        IOrderDataFlowContext dataFlow = null;
        JSONObject responseJson = null;
@@ -76,7 +77,7 @@
        ResponseEntity<String> responseEntity = null;
        try {
            DataFlowEventPublishing.preValidateData(reqJson,headers);
            DataFlowEventPublishing.preValidateData(reqJson, headers);
            //1.0 创建数据流
            dataFlow = OrderDataFlowContextFactory.newInstance(OrderDataFlow.class).builder(reqJson, headers);
            DataFlowEventPublishing.initDataFlowComplete(dataFlow);
@@ -95,25 +96,25 @@
            //能够执行到这一步 认为是都成功了
            refreshOrderDataFlowResJson(dataFlow);
        }catch (BusinessException e) {
            responseEntity = new ResponseEntity<String>(e.getMessage() , OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()),HttpStatus.INTERNAL_SERVER_ERROR);
        } catch (BusinessException e) {
            responseEntity = new ResponseEntity<String>(e.getMessage(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()), HttpStatus.INTERNAL_SERVER_ERROR);
        } catch (OrdersException e) {
            responseEntity = new ResponseEntity<String>(e.getMessage() , OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()),HttpStatus.INTERNAL_SERVER_ERROR);
            responseEntity = new ResponseEntity<String>(e.getMessage(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()), HttpStatus.INTERNAL_SERVER_ERROR);
        } catch (RuleException e) {
            responseEntity = new ResponseEntity<String>(e.getMessage(),OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()), HttpStatus.NETWORK_AUTHENTICATION_REQUIRED);
            responseEntity = new ResponseEntity<String>(e.getMessage(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()), HttpStatus.NETWORK_AUTHENTICATION_REQUIRED);
        } catch (NoAuthorityException e) {
            responseEntity = new ResponseEntity<String>(e.getMessage(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()),HttpStatus.UNAUTHORIZED);
        } catch (InitConfigDataException e){
            responseEntity = new ResponseEntity<String>(e.getMessage(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()),HttpStatus.INTERNAL_SERVER_ERROR);
        }catch (Exception e) {
            logger.error("内部异常了:",e);
            responseEntity = new ResponseEntity<String>("内部异常了:"+e.getMessage() + e.getLocalizedMessage(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()),HttpStatus.INTERNAL_SERVER_ERROR);
            responseEntity = new ResponseEntity<String>(e.getMessage(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()), HttpStatus.UNAUTHORIZED);
        } catch (InitConfigDataException e) {
            responseEntity = new ResponseEntity<String>(e.getMessage(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()), HttpStatus.INTERNAL_SERVER_ERROR);
        } catch (Exception e) {
            logger.error("内部异常了:", e);
            responseEntity = new ResponseEntity<String>("内部异常了:" + e.getMessage() + e.getLocalizedMessage(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()), HttpStatus.INTERNAL_SERVER_ERROR);
        } finally {
            if(responseEntity == null){
                responseEntity = new ResponseEntity<String>(dataFlow.getResJson().getString("msg"),OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()),HttpStatus.OK);
            if (responseEntity == null) {
                responseEntity = new ResponseEntity<String>(dataFlow.getResJson().getJSONArray("msg").toJSONString(), OrderDataFlowContextFactory.hashMap2MultiValueMap(dataFlow.getResHeaders()), HttpStatus.OK);
            }
            if(dataFlow != null) {
            if (dataFlow != null) {
                //添加耗时
                //OrderDataFlowContextFactory.addCostTime(dataFlow, "service", "业务处理总耗时", dataFlow.getStartDate(), dataFlow.getEndDate());
                //保存耗时
@@ -125,50 +126,53 @@
            //这里保存耗时,以及日志
            return responseEntity ;
            return responseEntity;
        }
    }
    /**
     * 刷返回值
     *
     * @param dataFlow
     */
    private void refreshOrderDataFlowResJson(IOrderDataFlowContext dataFlow){
    private void refreshOrderDataFlowResJson(IOrderDataFlowContext dataFlow) {
        if(dataFlow.getResJson() == null || dataFlow.getResJson().isEmpty()){
            JSONObject resJson = new JSONObject();
            resJson.put("msg","成功");
            dataFlow.setResJson(resJson);
        }
//        if(dataFlow.getResJson() == null || dataFlow.getResJson().isEmpty()){
//            JSONObject resJson = new JSONObject();
//            resJson.put("msg","成功");
//            dataFlow.setResJson(resJson);
//        }
    }
    /**
     * 抒写返回头信息
     *
     * @param dataFlow
     */
    private void putResponseHeader(DataFlow dataFlow,Map<String,String> headers) {
    private void putResponseHeader(DataFlow dataFlow, Map<String, String> headers) {
        headers.put("responseTime", DateUtil.getDefaultFormateTimeString(new Date()));
        headers.put("transactionId",dataFlow.getTransactionId());
        headers.put("transactionId", dataFlow.getTransactionId());
    }
    /**
     * 解密
     *
     * @param reqJson
     * @return
     */
    private String decrypt(String reqJson,Map<String,String> headers) throws DecryptException{
    private String decrypt(String reqJson, Map<String, String> headers) throws DecryptException {
        try {
            if (MappingConstant.VALUE_ON.equals(headers.get(CommonConstant.ENCRYPT))) {
                logger.debug("解密前字符:" + reqJson);
                reqJson = new String(AuthenticationFactory.decrypt(reqJson.getBytes("UTF-8"), AuthenticationFactory.loadPrivateKey(MappingConstant.KEY_PRIVATE_STRING)
                        , NumberUtils.isNumber(headers.get(CommonConstant.ENCRYPT_KEY_SIZE)) ? Integer.parseInt(headers.get(CommonConstant.ENCRYPT_KEY_SIZE)) :
                                Integer.parseInt(MappingCache.getValue(MappingConstant.KEY_DEFAULT_DECRYPT_KEY_SIZE))),"UTF-8");
                                Integer.parseInt(MappingCache.getValue(MappingConstant.KEY_DEFAULT_DECRYPT_KEY_SIZE))), "UTF-8");
                logger.debug("解密后字符:" + reqJson);
            }
        }catch (Exception e){
            throw new DecryptException(ResponseConstant.RESULT_CODE_NO_AUTHORITY_ERROR,"解密失败");
        } catch (Exception e) {
            throw new DecryptException(ResponseConstant.RESULT_CODE_NO_AUTHORITY_ERROR, "解密失败");
        }
        return reqJson;
@@ -176,25 +180,25 @@
    /**
     * 加密
     *
     * @param resJson
     * @param headers
     * @return
     */
    private String encrypt(String resJson,Map<String,String> headers){
    private String encrypt(String resJson, Map<String, String> headers) {
        try {
            if (MappingConstant.VALUE_ON.equals(headers.get(CommonConstant.ENCRYPT))) {
                logger.debug("加密前字符:" + resJson);
                resJson = new String(AuthenticationFactory.encrypt(resJson.getBytes("UTF-8"), AuthenticationFactory.loadPubKey(MappingConstant.KEY_PUBLIC_STRING)
                        , NumberUtils.isNumber(headers.get(CommonConstant.ENCRYPT_KEY_SIZE)) ? Integer.parseInt(headers.get(CommonConstant.ENCRYPT_KEY_SIZE)) :
                                Integer.parseInt(MappingCache.getValue(MappingConstant.KEY_DEFAULT_DECRYPT_KEY_SIZE))),"UTF-8");
                                Integer.parseInt(MappingCache.getValue(MappingConstant.KEY_DEFAULT_DECRYPT_KEY_SIZE))), "UTF-8");
                logger.debug("加密后字符:" + resJson);
            }
        }catch (Exception e){
            logger.error("加密失败:",e);
        } catch (Exception e) {
            logger.error("加密失败:", e);
        }
        return resJson;
    }
    /**
@@ -222,7 +226,7 @@
                //不做校验
                //添加耗时
                OrderDataFlowContextFactory.addCostTime(dataFlow, "ruleValidate", "规则校验耗时", startDate);
                return ;
                return;
            }
            //调用规则
@@ -245,11 +249,11 @@
     */
    private void saveOrdersAndBusiness(IOrderDataFlowContext dataFlow) throws OrdersException {
        Date startDate = DateUtil.getCurrentDate();
        if(MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER) != null
                &&MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER).contains(dataFlow.getOrders().getOrderTypeCd())){
        if (MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER) != null
                && MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER).contains(dataFlow.getOrders().getOrderTypeCd())) {
            //不保存订单信息
            OrderDataFlowContextFactory.addCostTime(dataFlow, "saveOrdersAndBusiness", "保存订单和业务项耗时", startDate);
            return ;
            return;
        }
@@ -295,20 +299,20 @@
     */
    private void invalidOrderAndBusiness(IOrderNotifyDataFlowContext dataFlow) {
        Date startDate = DateUtil.getCurrentDate();
        if(MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER) != null
                &&MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER).contains(dataFlow.getOrderTypeCd())){
        if (MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER) != null
                && MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER).contains(dataFlow.getOrderTypeCd())) {
            //不用作废订单信息
           // OrderDataFlowContextFactory.addCostTime(dataFlow, "invalidOrderAndBusiness", "作废订单和业务项耗时", startDate);
            return ;
            // OrderDataFlowContextFactory.addCostTime(dataFlow, "invalidOrderAndBusiness", "作废订单和业务项耗时", startDate);
            return;
        }
        //如果已经作废 不存在 或失败,则不做处理
        Map order = centerServiceDaoImpl.getOrderInfoByBId(dataFlow.getbId());
        if(order == null || !order.containsKey("status_cd") || StatusConstant.STATUS_CD_DELETE.equals(order.get("status_cd"))
                || StatusConstant.STATUS_CD_ERROR.equals(order.get("status_cd"))){
            return ;
        if (order == null || !order.containsKey("status_cd") || StatusConstant.STATUS_CD_DELETE.equals(order.get("status_cd"))
                || StatusConstant.STATUS_CD_ERROR.equals(order.get("status_cd"))) {
            return;
        }
        //作废 订单
@@ -331,30 +335,30 @@
     *
     * @param dataFlow
     */
    private void invalidCompletedBusinessSystem(IOrderNotifyDataFlowContext dataFlow) throws Exception{
    private void invalidCompletedBusinessSystem(IOrderNotifyDataFlowContext dataFlow) throws Exception {
        if(!StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) {
            return ;
        if (!StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) {
            return;
        }
        //判断 订单instance 是否都变成了撤单状态
        if(centerServiceDaoImpl.judgeAllBusinessDeleteOrder(dataFlow.getoId(),StatusConstant.STATUS_CD_DELETE_ORDER) < 1){
            return ;
        if (centerServiceDaoImpl.judgeAllBusinessDeleteOrder(dataFlow.getoId(), StatusConstant.STATUS_CD_DELETE_ORDER) < 1) {
            return;
        }
        // 根据 c_business 表中的字段business_type_cd 找到对应的消息队列名称
        Map paramIn = new HashMap();
        paramIn.put("oId",dataFlow.getoId());
        paramIn.put("statusCd",StatusConstant.STATUS_CD_DELETE_ORDER);
        paramIn.put("oId", dataFlow.getoId());
        paramIn.put("statusCd", StatusConstant.STATUS_CD_DELETE_ORDER);
        List<Map> completedBusinesses = centerServiceDaoImpl.getBusinessByOId(paramIn);
            for(Map completedBusiness : completedBusinesses){
                ServiceBusiness serviceBusiness = ServiceBusinessUtil.getServiceBusiness(completedBusiness.get("business_type_cd").toString());
                    long startTime = DateUtil.getCurrentDate().getTime();
                    //发起撤单
                    KafkaFactory.sendKafkaMessage(serviceBusiness.getMessageTopic(),"",
                            OrderDataFlowContextFactory.getDeleteInstanceTableJson(dataFlow,completedBusiness).toJSONString());
                    //saveLogMessage(OrderDataFlowContextFactory.getDeleteInstanceTableJson(dataFlow,completedBusiness,appRoute.getAppService()),null);
                }
        for (Map completedBusiness : completedBusinesses) {
            ServiceBusiness serviceBusiness = ServiceBusinessUtil.getServiceBusiness(completedBusiness.get("business_type_cd").toString());
            long startTime = DateUtil.getCurrentDate().getTime();
            //发起撤单
            KafkaFactory.sendKafkaMessage(serviceBusiness.getMessageTopic(), "",
                    OrderDataFlowContextFactory.getDeleteInstanceTableJson(dataFlow, completedBusiness).toJSONString());
            //saveLogMessage(OrderDataFlowContextFactory.getDeleteInstanceTableJson(dataFlow,completedBusiness,appRoute.getAppService()),null);
        }
    }
    /**
@@ -397,6 +401,7 @@
        //OrderDataFlowContextFactory.addCostTime(dataFlow, "updateOrderAndBusinessError", "订单状态改为失败耗时", startDate);
    }
    /**
     * 将订单状态改为作废状态。
     *
@@ -416,58 +421,59 @@
        //doAddDeleteOrderBusinessData(dataFlow);
        OrderDataFlowContextFactory.addCostTime(dataFlow, "updateOrderAndBusinessError", "订单状态改为失败耗时", startDate);
    }
    /**
     * 加入撤单记录
     *
     * @param dataFlow
     */
    private void doAddDeleteOrderBusinessData(IOrderDataFlowContext dataFlow){
    private void doAddDeleteOrderBusinessData(IOrderDataFlowContext dataFlow) {
       /* Map business = new HashMap();
        business.put("bId",SequenceUtil.getBId());
        business.put("oId",dataFlow.getoId());
        business.put("businessTypeCd",StatusConstant.REQUEST_BUSINESS_TYPE_DELETE);
        business.put("remark","发起撤单");
        business.put("statusCd",StatusConstant.STATUS_CD_DELETE_ORDER);*/
        centerServiceDaoImpl.saveBusiness(OrderDataFlowContextFactory.getDeleteOrderBusiness(dataFlow,"订单失败,加入撤单"));
        centerServiceDaoImpl.saveBusiness(OrderDataFlowContextFactory.getDeleteOrderBusiness(dataFlow, "订单失败,加入撤单"));
    }
    /**
     * 加入撤单记录
     *
     * @param dataFlow
     */
    private void doAddDeleteOrderBusinessData(IOrderNotifyDataFlowContext dataFlow){
    private void doAddDeleteOrderBusinessData(IOrderNotifyDataFlowContext dataFlow) {
       /* Map business = new HashMap();
        business.put("bId",SequenceUtil.getBId());
        business.put("oId",dataFlow.getoId());
        business.put("businessTypeCd",StatusConstant.REQUEST_BUSINESS_TYPE_DELETE);
        business.put("remark","发起撤单");
        business.put("statusCd",StatusConstant.STATUS_CD_DELETE_ORDER);*/
        centerServiceDaoImpl.saveBusiness(OrderDataFlowContextFactory.getDeleteOrderBusiness(dataFlow,"订单失败,加入撤单"));
        centerServiceDaoImpl.saveBusiness(OrderDataFlowContextFactory.getDeleteOrderBusiness(dataFlow, "订单失败,加入撤单"));
    }
    /**
     * 接受业务系统通知消息
     *
     * @param receiveJson 接受报文
     * @throws SMOException
     */
    @Override
    public void receiveBusinessSystemNotifyMessage(String receiveJson) throws SMOException{
    public void receiveBusinessSystemNotifyMessage(String receiveJson) throws SMOException {
        Date startDate = DateUtil.getCurrentDate();
        IOrderNotifyDataFlowContext dataFlow = null;
        try {
            //1.0 创建数据流
            dataFlow = OrderDataFlowContextFactory.newInstance(OrderNotifyDataFlow.class).builder(receiveJson,null);
            dataFlow = OrderDataFlowContextFactory.newInstance(OrderNotifyDataFlow.class).builder(receiveJson, null);
            //如果订单都没有保存,则再不要处理
            if(MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER) != null
                    &&MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER).contains(dataFlow.getOrderTypeCd())){
            if (MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER) != null
                    && MappingCache.getValue(MappingConstant.KEY_NO_SAVE_ORDER).contains(dataFlow.getOrderTypeCd())) {
                //不保存订单信息
                return ;
                return;
            }
            //如果不是 business 和instance 过程 则直接跳出
@@ -488,16 +494,16 @@
            //7.0 判断撤单条件是否满足,如果满足发起撤单
            invalidCompletedBusinessSystem(dataFlow);
        }catch (BusinessStatusException e){
        } catch (BusinessStatusException e) {
            logger.error("订单失败:" ,e);
            logger.error("订单失败:", e);
            //8.0 将订单状态改为失败,人工处理。
            updateOrderAndBusinessError(dataFlow);
        }catch (BusinessException e) {
        } catch (BusinessException e) {
            //9.0说明这个订单已经失败了,再不需要
            //想法,这里广播当前失败业务
            logger.error("修改业务数据失败",e);
            logger.error("修改业务数据失败", e);
        }/*catch (InitConfigDataException e){ //这种一般不会出现,除非人工改了数据
            LoggerEngine.error("加载配置数据出错", e);
            try {
@@ -513,13 +519,13 @@
                updateOrderAndBusinessError(dataFlow);
            }
        }*/catch (NoSupportException e){
        }*/ catch (NoSupportException e) {
            LoggerEngine.error("当前业务不支持", e);
        }catch (Exception e){
        } catch (Exception e) {
            LoggerEngine.error("作废订单失败", e);
            //10.0 成功的情况下通知下游系统失败将状态改为NE,人工处理。
            updateBusinessNotifyError(dataFlow);
        }finally{
        } finally {
            /*OrderDataFlowContextFactory.addCostTime(dataFlow, "receiveBusinessSystemNotifyMessage", "接受业务系统通知消息耗时", startDate);
            saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),dataFlow.getReqJson().toJSONString()),
                    LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),ResponseConstant.RESULT_CODE_SUCCESS),
@@ -529,17 +535,18 @@
    /**
     * Instance过程
     *
     * @param dataFlow
     */
    private void doSendInstance(IOrderNotifyDataFlowContext dataFlow) {
        if(dataFlow == null || !StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(dataFlow.getBusinessType())){
            return ;
        if (dataFlow == null || !StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(dataFlow.getBusinessType())) {
            return;
        }
        try {
            ServiceBusiness serviceBusiness = ServiceBusinessUtil.getServiceBusiness(dataFlow.getBusinessTypeCd());
            KafkaFactory.sendKafkaMessage(serviceBusiness.getMessageTopic(), "",
                    OrderDataFlowContextFactory.getBusinessTableDataInfoToInstanceTableJson(dataFlow).toJSONString());
        }catch (Exception e){
        } catch (Exception e) {
        }
@@ -547,17 +554,18 @@
    /**
     * 判断是否是 business 或者 instance过程
     *
     * @param dataFlow
     * @throws NoSupportException
     */
    private void judgeBusinessOrInstance(IOrderNotifyDataFlowContext dataFlow) throws  NoSupportException{
    private void judgeBusinessOrInstance(IOrderNotifyDataFlowContext dataFlow) throws NoSupportException {
        if(dataFlow == null || StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(dataFlow.getBusinessType()) ||
                StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())){
            return ;
        if (dataFlow == null || StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(dataFlow.getBusinessType()) ||
                StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) {
            return;
        }
        throw new NoSupportException(ResponseConstant.RESULT_PARAM_ERROR,"当前只支持 Business 和 Instance过程");
        throw new NoSupportException(ResponseConstant.RESULT_PARAM_ERROR, "当前只支持 Business 和 Instance过程");
    }
@@ -571,13 +579,13 @@
        Map order = centerServiceDaoImpl.getOrderInfoByBId(dataFlow.getBusinesses().get(0).getbId());
        dataFlow.setoId(order.get("o_id").toString());
        dataFlow.setAppId(order.get("app_id").toString());
        if("-1".equals(dataFlow.getDataFlowId()) || StringUtil.isNullOrNone(dataFlow.getDataFlowId())){
            throw new InitConfigDataException(ResponseConstant.RESULT_CODE_ERROR,"请求报文中没有包含 dataFlowId 节点");
        if ("-1".equals(dataFlow.getDataFlowId()) || StringUtil.isNullOrNone(dataFlow.getDataFlowId())) {
            throw new InitConfigDataException(ResponseConstant.RESULT_CODE_ERROR, "请求报文中没有包含 dataFlowId 节点");
        }
        //重新刷端口信息
        ServiceInfoListener serviceInfoListener =  ApplicationContextFactory.getBean("serviceInfoListener",ServiceInfoListener.class);
        if(serviceInfoListener != null){
            dataFlow.setPort(serviceInfoListener.getServerPort()+"");
        ServiceInfoListener serviceInfoListener = ApplicationContextFactory.getBean("serviceInfoListener", ServiceInfoListener.class);
        if (serviceInfoListener != null) {
            dataFlow.setPort(serviceInfoListener.getServerPort() + "");
        }
        //重新加载配置
        //initConfigData(dataFlow);
@@ -593,8 +601,8 @@
        Map order = centerServiceDaoImpl.getOrderInfoByBId(dataFlow.getbId());
        dataFlow.setoId(order.get("o_id").toString());
        if("-1".equals(dataFlow.getDataFlowId()) || StringUtil.isNullOrNone(dataFlow.getDataFlowId())){
            throw new InitConfigDataException(ResponseConstant.RESULT_CODE_ERROR,"请求报文中没有包含 dataFlowId 节点");
        if ("-1".equals(dataFlow.getDataFlowId()) || StringUtil.isNullOrNone(dataFlow.getDataFlowId())) {
            throw new InitConfigDataException(ResponseConstant.RESULT_CODE_ERROR, "请求报文中没有包含 dataFlowId 节点");
        }
    }
@@ -606,36 +614,37 @@
    private void updateBusinessNotifyError(IOrderNotifyDataFlowContext dataFlow) {
        Date startDate = DateUtil.getCurrentDate();
            //完成订单项
        //完成订单项
        centerServiceDaoImpl.updateBusinessByBId(OrderDataFlowContextFactory.getNeedNotifyErrorBusiness(dataFlow));
       // OrderDataFlowContextFactory.addCostTime(dataFlow, "updateBusinessNotifyError", "订单状态改为失败耗时", startDate);
        // OrderDataFlowContextFactory.addCostTime(dataFlow, "updateBusinessNotifyError", "订单状态改为失败耗时", startDate);
    }
    /**
     * 判断是否都成功了
     *
     * @param dataFlow
     */
    private void judgeBusinessStatusAndCompleteBusiness(IOrderNotifyDataFlowContext dataFlow) throws BusinessStatusException{
    private void judgeBusinessStatusAndCompleteBusiness(IOrderNotifyDataFlowContext dataFlow) throws BusinessStatusException {
        //List<Business> businesses = dataFlow.getBusinesses();
        //1.0 判断是否存在撤单,如果是撤单则将当前 bId 标记为撤单状态
        if(StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) {
        if (StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) {
            Map businessMap = centerServiceDaoImpl.getDeleteOrderBusinessByOId(dataFlow.getoId());
            if(businessMap != null && !businessMap.isEmpty()){
            if (businessMap != null && !businessMap.isEmpty()) {
                centerServiceDaoImpl.updateBusinessByBId(OrderDataFlowContextFactory.getNeedDeleteBusiness(dataFlow));
                return ;
                return;
            }
        }
        //Business business = dataFlow.getCurrentBusiness();
        if(!ResponseConstant.RESULT_CODE_SUCCESS.equals(((IOrderResponse)dataFlow).getCode())){
        if (!ResponseConstant.RESULT_CODE_SUCCESS.equals(((IOrderResponse) dataFlow).getCode())) {
            //throw new BusinessStatusException(business.getCode(),"业务bId= "+business.getbId() + " 处理失败,需要作废订单");
            //作废订单和业务项 插入撤单记录 等待撤单
            invalidOrderAndBusiness(dataFlow);
        }else{
        } else {
            completeBusiness(dataFlow);
        }
@@ -646,36 +655,37 @@
     *
     * @param dataFlow
     */
    private void completeBusiness(IOrderNotifyDataFlowContext dataFlow) throws BusinessException{
    private void completeBusiness(IOrderNotifyDataFlowContext dataFlow) throws BusinessException {
        try {
            if(StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) {
            if (StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(dataFlow.getBusinessType())) {
                //完成订单项
                centerServiceDaoImpl.updateBusinessByBId(OrderDataFlowContextFactory.getNeedCompleteBusiness(dataFlow));
                //如果业务都完成,则将 订单改为完成状态
                centerServiceDaoImpl.completeOrderByBId(dataFlow.getbId());
            }else if(StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(dataFlow.getBusinessType())) {
            } else if (StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(dataFlow.getBusinessType())) {
                centerServiceDaoImpl.updateBusinessByBId(OrderDataFlowContextFactory.getNeedBusinessComplete(dataFlow));
            }else{ //这里到不了,前面做了校验
                throw new BusinessException(ResponseConstant.RESULT_PARAM_ERROR,"当前不支持 业务类型为 businessType" +dataFlow.getBusinessType());
            } else { //这里到不了,前面做了校验
                throw new BusinessException(ResponseConstant.RESULT_PARAM_ERROR, "当前不支持 业务类型为 businessType" + dataFlow.getBusinessType());
            }
        }catch (DAOException e){
            throw new BusinessException(e.getResult(),e);
        } catch (DAOException e) {
            throw new BusinessException(e.getResult(), e);
        }
    }
    /**
     * //4.0当所有业务动作是否都是C,将订单信息改为 C 并且发布竣工消息,这里在广播之前确认
     *
     * @param dataFlow
     */
    private void judgeSendToInstance(IOrderNotifyDataFlowContext dataFlow) throws Exception{
    private void judgeSendToInstance(IOrderNotifyDataFlowContext dataFlow) throws Exception {
        try {
            if(centerServiceDaoImpl.judgeAllBusinessCompleted(dataFlow.getoId(),StatusConstant.STATUS_CD_BUSINESS_COMPLETE) > 0) {
            if (centerServiceDaoImpl.judgeAllBusinessCompleted(dataFlow.getoId(), StatusConstant.STATUS_CD_BUSINESS_COMPLETE) > 0) {
                //通知成功消息
                doSendInstance(dataFlow);
            }
        }catch (DAOException e){
        } catch (DAOException e) {
            //这里什么都不做,说明订单没有完成
        }
@@ -684,15 +694,16 @@
    /**
     * 通知 订单已经完成,后端需要完成数据
     *
     * @param dataFlow
     */
    private void notifyBusinessSystemSuccessMessage(IOrderNotifyDataFlowContext dataFlow) throws Exception{
    private void notifyBusinessSystemSuccessMessage(IOrderNotifyDataFlowContext dataFlow) throws Exception {
        long startTime = DateUtil.getCurrentDate().getTime();
        ServiceBusiness serviceBusiness = ServiceBusinessUtil.getServiceBusiness(dataFlow.getBusinessTypeCd());
        //拼装报文通知业务系统
        KafkaFactory.sendKafkaMessage(serviceBusiness.getMessageTopic(),"",
        KafkaFactory.sendKafkaMessage(serviceBusiness.getMessageTopic(), "",
                OrderDataFlowContextFactory.getNotifyBusinessSuccessJson(dataFlow).toJSONString());
        /*saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),OrderDataFlowContextFactory.getNotifyBusinessSuccessJson(dataFlow).toJSONString()),
@@ -705,13 +716,13 @@
     *
     * @param dataFlow
     */
    private void notifyBusinessSystemErrorMessage(IOrderNotifyDataFlowContext dataFlow) throws Exception{
    private void notifyBusinessSystemErrorMessage(IOrderNotifyDataFlowContext dataFlow) throws Exception {
        long startTime = DateUtil.getCurrentDate().getTime();
        ServiceBusiness serviceBusiness = ServiceBusinessUtil.getServiceBusiness(dataFlow.getBusinessTypeCd());
        //拼装报文通知业务系统
        KafkaFactory.sendKafkaMessage(serviceBusiness.getMessageTopic(),"",
        KafkaFactory.sendKafkaMessage(serviceBusiness.getMessageTopic(), "",
                OrderDataFlowContextFactory.getNotifyBusinessErrorJson(dataFlow).toJSONString());
        /*saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),OrderDataFlowContextFactory.getNotifyBusinessErrorJson(dataFlow).toJSONString()),
                LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),ResponseConstant.RESULT_CODE_ERROR),
@@ -720,16 +731,17 @@
    /**
     * 处理同步业务
     *
     * @param dataFlow
     */
    private void doSynchronousBusinesses(IOrderDataFlowContext dataFlow) throws BusinessException{
    private void doSynchronousBusinesses(IOrderDataFlowContext dataFlow) throws BusinessException {
        Date startDate = DateUtil.getCurrentDate();
        List<Business> synchronousBusinesses = OrderDataFlowContextFactory.getSynchronousBusinesses(dataFlow);
        List<Business> deleteBusinesses = new ArrayList<Business>();
        if(synchronousBusinesses == null || synchronousBusinesses.size() == 0){
            return ;
        if (synchronousBusinesses == null || synchronousBusinesses.size() == 0) {
            return;
        }
        JSONArray responseBusinesses = new JSONArray();
@@ -738,16 +750,16 @@
        try {
            //6.2发起Instance
            doBusinessTableDataInfoToInstanceTable(dataFlow, synchronousBusinesses,deleteBusinesses);
        }catch (Exception e){
            doBusinessTableDataInfoToInstanceTable(dataFlow, synchronousBusinesses, deleteBusinesses);
        } catch (Exception e) {
            try {
                //这里发起撤单逻辑
                doDeleteOrderAndInstanceData(dataFlow, deleteBusinesses);
            }catch (Exception e1){
                logger.error("撤单失败",e1);
            } catch (Exception e1) {
                logger.error("撤单失败", e1);
                //这里记录撤单失败的信息
            }
            throw new BusinessException(ResponseConstant.RESULT_PARAM_ERROR,e.getMessage());
            throw new BusinessException(ResponseConstant.RESULT_PARAM_ERROR, e.getMessage());
        }
        //6.3 c_business 数据修改为完成
        /*List<Business> asynchronousBusinesses = OrderDataFlowContextFactory.getAsynchronousBusinesses(dataFlow);
@@ -755,25 +767,26 @@
            doComplateOrderAndBusiness(dataFlow,synchronousBusinesses);
        }*/
        OrderDataFlowContextFactory.addCostTime(dataFlow, "doSynchronousBusinesses", "同步调用业务系统总耗时", startDate);
}
    }
    /**
     * 发起撤单业务
     *
     * @param dataFlow
     * @param deleteBusinesses
     */
    private void doDeleteOrderAndInstanceData(IOrderDataFlowContext dataFlow, List<Business> deleteBusinesses) {
        if(deleteBusinesses == null || deleteBusinesses.size() == 0){
            return ;
        if (deleteBusinesses == null || deleteBusinesses.size() == 0) {
            return;
        }
        //1.0 在c_business 表中加入 撤单记录
        centerServiceDaoImpl.saveBusiness(OrderDataFlowContextFactory.getDeleteOrderBusiness(dataFlow,"业务系统实例失败,发起撤单"));
        centerServiceDaoImpl.saveBusiness(OrderDataFlowContextFactory.getDeleteOrderBusiness(dataFlow, "业务系统实例失败,发起撤单"));
        //2.0 作废 c_orders 和 c_business 数据
        updateOrderAndBusinessDelete(dataFlow);
        //3.0 发起 撤单业务
        doDeleteBusinessSystemInstanceData(dataFlow,deleteBusinesses);
        doDeleteBusinessSystemInstanceData(dataFlow, deleteBusinesses);
    }
    /**
@@ -811,25 +824,26 @@
    /**
     * 将BusinessTable 中的数据保存到 InstanceTable
     *
     * @param dataFlow
     * @param synchronousBusinesses
     */
    private void doBusinessTableDataInfoToInstanceTable(IOrderDataFlowContext dataFlow, List<Business> synchronousBusinesses,List<Business> deleteBusinesses) {
    private void doBusinessTableDataInfoToInstanceTable(IOrderDataFlowContext dataFlow, List<Business> synchronousBusinesses, List<Business> deleteBusinesses) {
        Date businessStartDate;
        ServiceBusiness serviceBusiness;
        JSONObject requestBusinessJson;
        for(Business business : synchronousBusinesses){
        for (Business business : synchronousBusinesses) {
            businessStartDate = DateUtil.getCurrentDate();
            serviceBusiness = ServiceBusinessUtil.getServiceBusiness(business.getBusinessTypeCd());
            //添加需要撤单的业务信息
            deleteBusinesses.add(business);
            requestBusinessJson = OrderDataFlowContextFactory.getBusinessTableDataInfoToInstanceTableJson(dataFlow,business);
            requestBusinessJson = OrderDataFlowContextFactory.getBusinessTableDataInfoToInstanceTableJson(dataFlow, business);
            JSONObject responseJson = doRequestBusinessSystem(dataFlow, serviceBusiness, requestBusinessJson);
            //发布事件
            DataFlowEventPublishing.invokeBusinessISuccess(dataFlow,business);
            updateBusinessStatusCdByBId(business.getbId(),StatusConstant.STATUS_CD_COMPLETE);
            OrderDataFlowContextFactory.addCostTime(dataFlow, business.getBusinessTypeCd(), "调用"+business.getBusinessTypeCd()+"耗时", businessStartDate);
            DataFlowEventPublishing.invokeBusinessISuccess(dataFlow, business);
            updateBusinessStatusCdByBId(business.getbId(), StatusConstant.STATUS_CD_COMPLETE);
            OrderDataFlowContextFactory.addCostTime(dataFlow, business.getBusinessTypeCd(), "调用" + business.getBusinessTypeCd() + "耗时", businessStartDate);
          /*  saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),requestBusinessJson.toJSONString()),
                    LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),responseJson.toJSONString()),
                    DateUtil.getCurrentDate().getTime() - businessStartDate.getTime());*/
@@ -839,13 +853,14 @@
            return ;
        }*/
            //判断业务动作是否都竣工,主要考虑 请求报文中 有异步也有同步的情况
            //如果业务都完成,则将 订单改为完成状态
            centerServiceDaoImpl.completeOrderByOId(dataFlow.getOrders().getoId());
        //判断业务动作是否都竣工,主要考虑 请求报文中 有异步也有同步的情况
        //如果业务都完成,则将 订单改为完成状态
        centerServiceDaoImpl.completeOrderByOId(dataFlow.getOrders().getoId());
    }
    /**
     * 业务系统撤单
     *
     * @param dataFlow
     * @param deleteBusinesses
     */
@@ -853,12 +868,12 @@
        Date businessStartDate;
        JSONObject requestBusinessJson;
        ServiceBusiness serviceBusiness;
        for(Business business : deleteBusinesses){
        for (Business business : deleteBusinesses) {
            businessStartDate = DateUtil.getCurrentDate();
            requestBusinessJson = OrderDataFlowContextFactory.getDeleteInstanceTableJson(dataFlow,business);
            requestBusinessJson = OrderDataFlowContextFactory.getDeleteInstanceTableJson(dataFlow, business);
            serviceBusiness = ServiceBusinessUtil.getServiceBusiness(business.getBusinessTypeCd());
            JSONObject responseJson = doRequestBusinessSystem(dataFlow, serviceBusiness, requestBusinessJson);
            OrderDataFlowContextFactory.addCostTime(dataFlow, business.getBusinessTypeCd(), "调用"+business.getBusinessTypeCd()+"-撤单 耗时", businessStartDate);
            OrderDataFlowContextFactory.addCostTime(dataFlow, business.getBusinessTypeCd(), "调用" + business.getBusinessTypeCd() + "-撤单 耗时", businessStartDate);
//            saveLogMessage(dataFlow,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),requestBusinessJson.toJSONString()),
//                    LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),responseJson.toJSONString()),
//                    DateUtil.getCurrentDate().getTime() - businessStartDate.getTime());
@@ -867,6 +882,7 @@
    /**
     * 调用下游系统
     *
     * @param dataFlow
     * @param serviceBusiness
     * @param requestBusinessJson 请求报文
@@ -875,63 +891,63 @@
    private JSONObject doRequestBusinessSystem(IOrderDataFlowContext dataFlow, ServiceBusiness serviceBusiness, JSONObject requestBusinessJson) {
        String responseMessage;
        Assert.hasLength(serviceBusiness.getInvokeType(),"c_service_business表配置出错,invoke_type 不能为空"+ serviceBusiness.getBusinessTypeCd());
        if(ServiceBusinessConstant.INVOKE_TYPE_WEBSERVICE.equals(serviceBusiness.getInvokeType())) {//webservice方式
        Assert.hasLength(serviceBusiness.getInvokeType(), "c_service_business表配置出错,invoke_type 不能为空" + serviceBusiness.getBusinessTypeCd());
        String httpUrl = "";
        if (ServiceBusinessConstant.INVOKE_TYPE_WEBSERVICE.equals(serviceBusiness.getInvokeType())) {//webservice方式
            String url = serviceBusiness.getUrl();
            String[] urls = url.split("#");
            if(urls.length != 2){
                throw new ConfigDataException(ResponseConstant.RESULT_CODE_CONFIG_ERROR,"配置错误:c_service_business配置url字段错误"+serviceBusiness.getBusinessTypeCd());
            if (urls.length != 2) {
                throw new ConfigDataException(ResponseConstant.RESULT_CODE_CONFIG_ERROR, "配置错误:c_service_business配置url字段错误" + serviceBusiness.getBusinessTypeCd());
            }
            String webserviceUrl = MappingCache.getValue(urls[0]);
            httpUrl = MappingCache.getValue(urls[0]);
            String method = MappingCache.getValue(urls[1]);
            responseMessage = (String) WebServiceAxisClient.callWebService(webserviceUrl,method,
            responseMessage = (String) WebServiceAxisClient.callWebService(httpUrl, method,
                    new Object[]{requestBusinessJson.toJSONString()},
                    serviceBusiness.getTimeout());
        }else if(ServiceBusinessConstant.INVOKE_TYPE_HTTP_POST.equals(serviceBusiness.getInvokeType())){
        } else if (ServiceBusinessConstant.INVOKE_TYPE_HTTP_POST.equals(serviceBusiness.getInvokeType())) {
            //http://user-service/test/sayHello
            String httpPostUrl = MappingCache.getValue(serviceBusiness.getUrl());
            responseMessage = restTemplate.postForObject(httpPostUrl,requestBusinessJson.toJSONString(),String.class);
        }else if(ServiceBusinessConstant.INVOKE_TYPE_OUT_HTTP_POST.equals(serviceBusiness.getInvokeType())){
            String httpPostUrl = MappingCache.getValue(serviceBusiness.getUrl());
            responseMessage = restTemplateNoLoadBalanced.postForObject(httpPostUrl,requestBusinessJson.toJSONString(),String.class);
        }
        else {//post方式
            throw new ConfigDataException(ResponseConstant.RESULT_CODE_CONFIG_ERROR,"配置错误:c_service_business配置url字段错误,当前无法识别"+serviceBusiness.getBusinessTypeCd());
            httpUrl = MappingCache.getValue(serviceBusiness.getUrl());
            responseMessage = restTemplate.postForObject(httpUrl, requestBusinessJson.toJSONString(), String.class);
        } else if (ServiceBusinessConstant.INVOKE_TYPE_OUT_HTTP_POST.equals(serviceBusiness.getInvokeType())) {
            httpUrl = MappingCache.getValue(serviceBusiness.getUrl());
            responseMessage = restTemplateNoLoadBalanced.postForObject(httpUrl, requestBusinessJson.toJSONString(), String.class);
        } else {//post方式
            throw new ConfigDataException(ResponseConstant.RESULT_CODE_CONFIG_ERROR, "配置错误:c_service_business配置url字段错误,当前无法识别" + serviceBusiness.getBusinessTypeCd());
        }
        logger.debug("订单服务调用下游服务请求报文:{},返回报文:{}",requestBusinessJson,responseMessage);
        logger.debug("调用地址:{}, 订单服务调用下游服务请求报文:{},返回报文:{}", httpUrl, requestBusinessJson, responseMessage);
        if(StringUtil.isNullOrNone(responseMessage) || !Assert.isJsonObject(responseMessage)){
            throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR,"下游系统返回格式不正确,请按协议规范处理");
        if (StringUtil.isNullOrNone(responseMessage) || !Assert.isJsonObject(responseMessage)) {
            throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR, "下游系统返回格式不正确,请按协议规范处理");
        }
        JSONObject responseJson = JSONObject.parseObject(responseMessage);
        Assert.jsonObjectHaveKey(responseJson,"response","下游返回报文格式错误,没有包含responseJson节点【"+serviceBusiness.getBusinessTypeCd()+"】");
        Assert.jsonObjectHaveKey(responseJson, "response", "下游返回报文格式错误,没有包含responseJson节点【" + serviceBusiness.getBusinessTypeCd() + "】");
        JSONObject responseInfo = responseJson.getJSONObject("response");
        Assert.jsonObjectHaveKey(responseInfo,"code","下游返回报文格式错误,response 节点中没有包含code节点【"+serviceBusiness.getBusinessTypeCd()+"】");
        Assert.jsonObjectHaveKey(responseInfo, "code", "下游返回报文格式错误,response 节点中没有包含code节点【" + serviceBusiness.getBusinessTypeCd() + "】");
        if(!ResponseConstant.RESULT_CODE_SUCCESS.equals(responseInfo.getString("code"))){
            throw  new BusinessException(ResponseConstant.RESULT_PARAM_ERROR,"业务系统处理失败,"+responseInfo.getString("message"));
        if (!ResponseConstant.RESULT_CODE_SUCCESS.equals(responseInfo.getString("code"))) {
            throw new BusinessException(ResponseConstant.RESULT_PARAM_ERROR, "业务系统处理失败," + responseInfo.getString("message"));
        }
        return responseJson;
    }
    private String doTransferRequestBusinessSystem(DataFlow dataFlow, AppService service, String reqData) {
        String responseMessage;
        if(service.getMethod() == null || "".equals(service.getMethod())) {//post方式
        if (service.getMethod() == null || "".equals(service.getMethod())) {//post方式
            //http://user-service/test/sayHello
            HttpHeaders header = new HttpHeaders();
            for(String key : dataFlow.getRequestCurrentHeaders().keySet()){
                header.add(key,dataFlow.getRequestCurrentHeaders().get(key));
            for (String key : dataFlow.getRequestCurrentHeaders().keySet()) {
                header.add(key, dataFlow.getRequestCurrentHeaders().get(key));
            }
            HttpEntity<String> httpEntity = new HttpEntity<String>(reqData, header);
            responseMessage = restTemplateNoLoadBalanced.postForObject(service.getUrl(),httpEntity,String.class);
        }else{//webservice方式
            responseMessage = (String) WebServiceAxisClient.callWebService(service.getUrl(),service.getMethod(),
            responseMessage = restTemplateNoLoadBalanced.postForObject(service.getUrl(), httpEntity, String.class);
        } else {//webservice方式
            responseMessage = (String) WebServiceAxisClient.callWebService(service.getUrl(), service.getMethod(),
                    new Object[]{dataFlow.getRequestBusinessJson().toJSONString()},
                    service.getTimeOut());
        }
@@ -940,6 +956,7 @@
    /**
     * 数据保存到BusinessTable 中
     *
     * @param dataFlow
     * @param synchronousBusinesses
     * @param responseBusinesses
@@ -948,22 +965,22 @@
        Date businessStartDate;
        ServiceBusiness serviceBusiness;
        JSONObject requestBusinessJson;
        for(Business business : synchronousBusinesses) {
        for (Business business : synchronousBusinesses) {
            businessStartDate = DateUtil.getCurrentDate();
            //发起Business过程
            updateBusinessStatusCdByBId(business.getbId(),StatusConstant.STATUS_CD_BUSINESS);
            updateBusinessStatusCdByBId(business.getbId(), StatusConstant.STATUS_CD_BUSINESS);
            serviceBusiness = ServiceBusinessUtil.getServiceBusiness(business.getBusinessTypeCd());
            requestBusinessJson = OrderDataFlowContextFactory.getRequestBusinessJson(dataFlow,business);
            requestBusinessJson = OrderDataFlowContextFactory.getRequestBusinessJson(dataFlow, business);
            JSONObject responseJson = doRequestBusinessSystem(dataFlow, serviceBusiness, requestBusinessJson);
            //发布事件
            DataFlowEventPublishing.invokeBusinessBSuccess(dataFlow,business);
            DataFlowEventPublishing.invokeBusinessBSuccess(dataFlow, business, responseJson);
            responseBusinesses.add(responseJson);
            OrderDataFlowContextFactory.addCostTime(dataFlow, business.getBusinessTypeCd(), "调用"+business.getBusinessTypeCd()+"耗时", businessStartDate);
            OrderDataFlowContextFactory.addCostTime(dataFlow, business.getBusinessTypeCd(), "调用" + business.getBusinessTypeCd() + "耗时", businessStartDate);
   /*         saveLogMessage(null,LogAgent.createLogMessage(dataFlow.getRequestCurrentHeaders(),requestBusinessJson.toJSONString()),
                    LogAgent.createLogMessage(dataFlow.getResponseCurrentHeaders(),responseJson.toJSONString()),
                    DateUtil.getCurrentDate().getTime()-businessStartDate.getTime());*/
@@ -972,25 +989,26 @@
    /**
     * 处理异步业务
     *
     * @param
     */
    private void doAsynchronousBusinesses(IOrderDataFlowContext dataFlow) throws BusinessException{
    private void doAsynchronousBusinesses(IOrderDataFlowContext dataFlow) throws BusinessException {
        Date startDate = DateUtil.getCurrentDate();
        //6.3 处理异步,按消息队里处理
        List<Business> asynchronousBusinesses = OrderDataFlowContextFactory.getAsynchronousBusinesses(dataFlow);
        if(asynchronousBusinesses == null || asynchronousBusinesses.size() == 0){
            return ;
        if (asynchronousBusinesses == null || asynchronousBusinesses.size() == 0) {
            return;
        }
        ServiceBusiness serviceBusiness;
        try {
            for (Business business : asynchronousBusinesses) {
                serviceBusiness = ServiceBusinessUtil.getServiceBusiness(business.getBusinessTypeCd());
                KafkaFactory.sendKafkaMessage(serviceBusiness.getMessageTopic(), "",
                        OrderDataFlowContextFactory.getRequestBusinessJson(dataFlow,business).toJSONString());
                        OrderDataFlowContextFactory.getRequestBusinessJson(dataFlow, business).toJSONString());
            }
        }catch (Exception e){
            throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR,e.getMessage());
        } catch (Exception e) {
            throw new BusinessException(ResponseConstant.RESULT_CODE_INNER_ERROR, e.getMessage());
        }
        OrderDataFlowContextFactory.addCostTime(dataFlow, "doSynchronousBusinesses", "异步调用业务系统总耗时", startDate);
@@ -1002,75 +1020,78 @@
    /**
     * 保存日志信息
     * @param dataFlow 数据流对象 封装用户请求的信息
     *
     * @param requestJson 请求报文 格式为
     *                    {"headers":"",
     * @param dataFlow     数据流对象 封装用户请求的信息
     * @param requestJson  请求报文 格式为
     *                     {"headers":"",
     *                     "body":""
     *                     }
     * @param responseJson 请求报文 格式为
     *                    {"headers":"",
     *                     {"headers":"",
     *                     "body":""
     *                     }
     */
    private void saveLogMessage(DataFlow dataFlow,JSONObject requestJson,JSONObject responseJson,long costTime){
            LogAgent.sendLog(dataFlow,requestJson,responseJson,costTime);
    private void saveLogMessage(DataFlow dataFlow, JSONObject requestJson, JSONObject responseJson, long costTime) {
        LogAgent.sendLog(dataFlow, requestJson, responseJson, costTime);
    }
    /**
     * 保存日志信息
     *
     * @param requestJson
     */
    private void saveLogMessage(String requestJson,String responseJson){
    private void saveLogMessage(String requestJson, String responseJson) {
        try{
            if(MappingConstant.VALUE_ON.equals(MappingCache.getValue(MappingConstant.KEY_LOG_ON_OFF))){
        try {
            if (MappingConstant.VALUE_ON.equals(MappingCache.getValue(MappingConstant.KEY_LOG_ON_OFF))) {
                JSONObject log = new JSONObject();
                log.put("request",requestJson);
                log.put("response",responseJson);
                KafkaFactory.sendKafkaMessage(KafkaConstant.TOPIC_LOG_NAME,"",log.toJSONString());
                log.put("request", requestJson);
                log.put("response", responseJson);
                KafkaFactory.sendKafkaMessage(KafkaConstant.TOPIC_LOG_NAME, "", log.toJSONString());
            }
        }catch (Exception e){
            logger.error("报错日志出错了,",e);
        } catch (Exception e) {
            logger.error("报错日志出错了,", e);
        }
    }
    /**
     * 保存耗时信息
     *
     * @param dataFlow
     */
    private void saveCostTimeLogMessage(DataFlow dataFlow){
        try{
            if(MappingConstant.VALUE_ON.equals(MappingCache.getValue(MappingConstant.KEY_COST_TIME_ON_OFF))){
    private void saveCostTimeLogMessage(DataFlow dataFlow) {
        try {
            if (MappingConstant.VALUE_ON.equals(MappingCache.getValue(MappingConstant.KEY_COST_TIME_ON_OFF))) {
                List<DataFlowLinksCost> dataFlowLinksCosts = dataFlow.getLinksCostDates();
                JSONObject costDate = new JSONObject();
                JSONArray costDates = new JSONArray();
                JSONObject newObj = null;
                for(DataFlowLinksCost dataFlowLinksCost : dataFlowLinksCosts){
                for (DataFlowLinksCost dataFlowLinksCost : dataFlowLinksCosts) {
                    newObj = JSONObject.parseObject(JSONObject.toJSONString(dataFlowLinksCost));
                    newObj.put("dataFlowId",dataFlow.getDataFlowId());
                    newObj.put("transactionId",dataFlow.getTransactionId());
                    newObj.put("dataFlowId", dataFlow.getDataFlowId());
                    newObj.put("transactionId", dataFlow.getTransactionId());
                    costDates.add(newObj);
                }
                costDate.put("costDates",costDates);
                costDate.put("costDates", costDates);
                KafkaFactory.sendKafkaMessage(KafkaConstant.TOPIC_COST_TIME_LOG_NAME,"",costDate.toJSONString());
                KafkaFactory.sendKafkaMessage(KafkaConstant.TOPIC_COST_TIME_LOG_NAME, "", costDate.toJSONString());
            }
        }catch (Exception e){
            logger.error("报错日志出错了,",e);
        } catch (Exception e) {
            logger.error("报错日志出错了,", e);
        }
    }
    /**
     * 修改c_business状态
     *
     * @param bId
     * @param statusCd
     */
    private void updateBusinessStatusCdByBId(String bId,String statusCd){
    private void updateBusinessStatusCdByBId(String bId, String statusCd) {
        Map business = new HashMap();
        business.put("bId",bId);
        business.put("statusCd",statusCd);
        business.put("finishTime",DateUtil.getCurrentDate());
        business.put("bId", bId);
        business.put("statusCd", statusCd);
        business.put("finishTime", DateUtil.getCurrentDate());
        centerServiceDaoImpl.updateBusinessByBId(business);
    }