java110
2021-09-01 85dceb0dd3e455082aed9c343dd76e675ff24d6e
service-job/src/main/java/com/java110/job/kafka/JobServiceKafka.java
old mode 100644 new mode 100755
@@ -11,6 +11,7 @@
import com.java110.utils.exception.InitConfigDataException;
import com.java110.utils.exception.InitDataFlowContextException;
import com.java110.utils.kafka.KafkaFactory;
import com.java110.utils.util.Assert;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +44,7 @@
            //预校验
            preValiateOrderInfo(orderInfo);
            businessServiceDataFlow = this.writeDataToDataFlowContext(orderInfo, headers);
            responseJson = jobServiceSMOImpl.service(businessServiceDataFlow);
            //responseJson = jobServiceSMOImpl.service(businessServiceDataFlow);
        } catch (InitDataFlowContextException e) {
            logger.error("请求报文错误,初始化 BusinessServiceDataFlow失败" + orderInfo, e);
            responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo, ResponseConstant.RESULT_PARAM_ERROR, e.getMessage(), null);
@@ -70,6 +71,22 @@
        }
    }
    @KafkaListener(topics = {"${kafka.hcGovTopic}"})
    public void hcGovListen(ConsumerRecord<?, ?> record) {
        logger.info("kafka的key: " + record.key());
        logger.info("kafka的value: " + record.value().toString());
        String orderInfo = record.value().toString();
        try {
            logger.debug("hcGovkafka 接收到数据", orderInfo);
            //responseJson = jobServiceSMOImpl.service(businessServiceDataFlow);
        } catch (Exception e) {
            logger.error("请求订单异常", e);
        } finally {
        }
    }
    /**
     * 这里预校验,请求报文中不能有 dataFlowId
@@ -77,9 +94,14 @@
     * @param orderInfo
     */
    private void preValiateOrderInfo(String orderInfo) {
       /* if(JSONObject.parseObject(orderInfo).getJSONObject("orders").containsKey("dataFlowId")){
            throw new BusinessException(ResponseConstant.RESULT_CODE_ERROR,"报文中不能存在dataFlowId节点");
        }*/
        JSONObject reqJson = JSONObject.parseObject(orderInfo);
        Assert.hasKeyAndValue(reqJson, "header", "请求报文中未包含header");
        Assert.hasKeyAndValue(reqJson, "body", "请求报文中未包含body");
        JSONObject header = reqJson.getJSONObject("header");
        Assert.hasKeyAndValue(header, "serviceCode", "请求报文中未包含serviceCode");
        Assert.hasKeyAndValue(header, "sign", "请求报文中未包含sign");
        Assert.hasKeyAndValue(header, "resTime", "请求报文中未包含reqTime");
        Assert.hasKeyAndValue(header, "code", "请求报文中未包含reqTime");
        Assert.hasKeyAndValue(header, "msg", "请求报文中未包含reqTime");
    }
}