java110
2021-09-01 bdc870fab7c09b420079257fb05e3e08a5436ff9
service-job/src/main/java/com/java110/job/kafka/JobServiceKafka.java
old mode 100644 new mode 100755
@@ -11,6 +11,7 @@
import com.java110.utils.exception.InitConfigDataException;
import com.java110.utils.exception.InitDataFlowContextException;
import com.java110.utils.kafka.KafkaFactory;
import com.java110.utils.util.Assert;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +44,7 @@
            //预校验
            preValiateOrderInfo(orderInfo);
            businessServiceDataFlow = this.writeDataToDataFlowContext(orderInfo, headers);
            responseJson = jobServiceSMOImpl.service(businessServiceDataFlow);
            //responseJson = jobServiceSMOImpl.service(businessServiceDataFlow);
        } catch (InitDataFlowContextException e) {
            logger.error("请求报文错误,初始化 BusinessServiceDataFlow失败" + orderInfo, e);
            responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo, ResponseConstant.RESULT_PARAM_ERROR, e.getMessage(), null);
@@ -77,9 +78,14 @@
     * @param orderInfo
     */
    private void preValiateOrderInfo(String orderInfo) {
       /* if(JSONObject.parseObject(orderInfo).getJSONObject("orders").containsKey("dataFlowId")){
            throw new BusinessException(ResponseConstant.RESULT_CODE_ERROR,"报文中不能存在dataFlowId节点");
        }*/
        JSONObject reqJson = JSONObject.parseObject(orderInfo);
        Assert.hasKeyAndValue(reqJson, "header", "请求报文中未包含header");
        Assert.hasKeyAndValue(reqJson, "body", "请求报文中未包含body");
        JSONObject header = reqJson.getJSONObject("header");
        Assert.hasKeyAndValue(header, "serviceCode", "请求报文中未包含serviceCode");
        Assert.hasKeyAndValue(header, "sign", "请求报文中未包含sign");
        Assert.hasKeyAndValue(header, "resTime", "请求报文中未包含reqTime");
        Assert.hasKeyAndValue(header, "code", "请求报文中未包含reqTime");
        Assert.hasKeyAndValue(header, "msg", "请求报文中未包含reqTime");
    }
}