old mode 100644
new mode 100755
| | |
| | | import com.java110.utils.exception.InitConfigDataException; |
| | | import com.java110.utils.exception.InitDataFlowContextException; |
| | | import com.java110.utils.kafka.KafkaFactory; |
| | | import com.java110.utils.util.Assert; |
| | | import org.apache.kafka.clients.consumer.ConsumerRecord; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | |
| | | //预校验 |
| | | preValiateOrderInfo(orderInfo); |
| | | businessServiceDataFlow = this.writeDataToDataFlowContext(orderInfo, headers); |
| | | responseJson = jobServiceSMOImpl.service(businessServiceDataFlow); |
| | | //responseJson = jobServiceSMOImpl.service(businessServiceDataFlow); |
| | | } catch (InitDataFlowContextException e) { |
| | | logger.error("请求报文错误,初始化 BusinessServiceDataFlow失败" + orderInfo, e); |
| | | responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo, ResponseConstant.RESULT_PARAM_ERROR, e.getMessage(), null); |
| | |
| | | } |
| | | } |
| | | |
| | | @KafkaListener(topics = {"${kafka.hcGovTopic}"}) |
| | | public void hcGovListen(ConsumerRecord<?, ?> record) { |
| | | logger.info("kafka的key: " + record.key()); |
| | | logger.info("kafka的value: " + record.value().toString()); |
| | | String orderInfo = record.value().toString(); |
| | | |
| | | try { |
| | | logger.debug("hcGovkafka 接收到数据", orderInfo); |
| | | //responseJson = jobServiceSMOImpl.service(businessServiceDataFlow); |
| | | } catch (Exception e) { |
| | | logger.error("请求订单异常", e); |
| | | } finally { |
| | | |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 这里预校验,请求报文中不能有 dataFlowId |
| | |
| | | * @param orderInfo |
| | | */ |
| | | private void preValiateOrderInfo(String orderInfo) { |
| | | /* if(JSONObject.parseObject(orderInfo).getJSONObject("orders").containsKey("dataFlowId")){ |
| | | throw new BusinessException(ResponseConstant.RESULT_CODE_ERROR,"报文中不能存在dataFlowId节点"); |
| | | }*/ |
| | | JSONObject reqJson = JSONObject.parseObject(orderInfo); |
| | | Assert.hasKeyAndValue(reqJson, "header", "请求报文中未包含header"); |
| | | Assert.hasKeyAndValue(reqJson, "body", "请求报文中未包含body"); |
| | | JSONObject header = reqJson.getJSONObject("header"); |
| | | Assert.hasKeyAndValue(header, "serviceCode", "请求报文中未包含serviceCode"); |
| | | Assert.hasKeyAndValue(header, "sign", "请求报文中未包含sign"); |
| | | Assert.hasKeyAndValue(header, "resTime", "请求报文中未包含reqTime"); |
| | | Assert.hasKeyAndValue(header, "code", "请求报文中未包含reqTime"); |
| | | Assert.hasKeyAndValue(header, "msg", "请求报文中未包含reqTime"); |
| | | } |
| | | |
| | | } |