wuxw
2022-07-19 05683f2b2bdbdbe21cf17ad523c21ab338bd1c54
service-job/src/main/java/com/java110/job/kafka/JobServiceKafka.java
@@ -3,21 +3,34 @@
import com.alibaba.fastjson.JSONObject;
import com.java110.core.base.controller.BaseController;
import com.java110.core.context.BusinessServiceDataFlow;
import com.java110.core.factory.AuthenticationFactory;
import com.java110.core.factory.DataTransactionFactory;
import com.java110.dto.community.CommunityAttrDto;
import com.java110.dto.community.CommunityDto;
import com.java110.dto.reportData.ReportDataDto;
import com.java110.dto.reportData.ReportDataHeaderDto;
import com.java110.intf.community.ICommunityInnerServiceSMO;
import com.java110.job.adapt.hcGov.HcGovConstant;
import com.java110.job.adapt.hcGov.IReportReturnDataAdapt;
import com.java110.job.adapt.hcGov.asyn.BaseHcGovSendAsyn;
import com.java110.job.smo.IJobServiceSMO;
import com.java110.utils.constant.KafkaConstant;
import com.java110.utils.constant.ResponseConstant;
import com.java110.utils.constant.StatusConstant;
import com.java110.utils.exception.InitConfigDataException;
import com.java110.utils.exception.InitDataFlowContextException;
import com.java110.utils.factory.ApplicationContextFactory;
import com.java110.utils.kafka.KafkaFactory;
import com.java110.utils.util.Assert;
import com.java110.utils.util.BeanConvertUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.java110.core.log.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@@ -28,8 +41,14 @@
    private final static Logger logger = LoggerFactory.getLogger(JobServiceKafka.class);
    public static final String KAFKA_RETURN = "_RETURN";
    @Autowired
    private IJobServiceSMO jobServiceSMOImpl;
    @Autowired
    private ICommunityInnerServiceSMO communityInnerServiceSMOImpl;
    @Autowired
    private BaseHcGovSendAsyn baseHcGovSendAsynImpl;
    @KafkaListener(topics = {"jobServiceTopic"})
    public void listen(ConsumerRecord<?, ?> record) {
@@ -43,7 +62,7 @@
            //预校验
            preValiateOrderInfo(orderInfo);
            businessServiceDataFlow = this.writeDataToDataFlowContext(orderInfo, headers);
            responseJson = jobServiceSMOImpl.service(businessServiceDataFlow);
            //responseJson = jobServiceSMOImpl.service(businessServiceDataFlow);
        } catch (InitDataFlowContextException e) {
            logger.error("请求报文错误,初始化 BusinessServiceDataFlow失败" + orderInfo, e);
            responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo, ResponseConstant.RESULT_PARAM_ERROR, e.getMessage(), null);
@@ -70,6 +89,88 @@
        }
    }
    @KafkaListener(topics = {"${kafka.hcGovTopic}"})
    public void hcGovListen(ConsumerRecord<?, ?> record) {
        String orderInfo = record.value().toString();
        logger.debug("hcGovkafka 接收到数据", orderInfo);
        doListen(orderInfo);
    }
    private void doListen(String orderInfo) {
        JSONObject reqJson = JSONObject.parseObject(orderInfo);
        ReportDataDto reportDataDto = null;
        JSONObject header = reqJson.getJSONObject("header");
        String extCommunityId = header.getString("extCommunityId");
        try {
            //获得小区对象
            String secure = getExtCommunityCode(extCommunityId);
            //报文合规性校验
            preValiateOrderInfo(orderInfo);
            //构建对象
            reportDataDto = freshReportDataDto(orderInfo);
            //签名认证
            AuthenticationFactory.authReportDataSign(reportDataDto, secure);
            //适配器
            IReportReturnDataAdapt reportDataAdapt = ApplicationContextFactory.getBean(reportDataDto.getReportDataHeaderDto().getServiceCode() + KAFKA_RETURN, IReportReturnDataAdapt.class);
            if (reportDataAdapt == null) {
                throw new IllegalArgumentException("serviceCode 错误 请检查");
            }
            //业务处理
            reportDataAdapt.reportReturn(reportDataDto, extCommunityId);
        } catch (Exception e) {
            logger.error("政务回写失败", e);
        }finally {
            //回写日志表
            baseHcGovSendAsynImpl.updateHcGovLog(reqJson);
        }
    }
    /**
     * 获取小区密钥
     *
     * @param extCommunityId
     * @return
     */
    private String getExtCommunityCode(String extCommunityId) {
        CommunityAttrDto communityAttrDto = new CommunityAttrDto();
        communityAttrDto.setSpecCd(CommunityAttrDto.SPEC_CD_GOV);
        communityAttrDto.setValue(extCommunityId);
        List<CommunityAttrDto> tmpCommunityAttrDtos = communityInnerServiceSMOImpl.getCommunityAttrs(communityAttrDto);
        if (tmpCommunityAttrDtos == null || tmpCommunityAttrDtos.size() < 1) {
            return HcGovConstant.COMMUNITY_SECURE;
        }
        communityAttrDto = new CommunityAttrDto();
        communityAttrDto.setSpecCd(CommunityAttrDto.SPEC_CD_GOV_SECURE);
        communityAttrDto.setCommunityId(tmpCommunityAttrDtos.get(0).getCommunityId());
        tmpCommunityAttrDtos = communityInnerServiceSMOImpl.getCommunityAttrs(communityAttrDto);
        if (tmpCommunityAttrDtos == null || tmpCommunityAttrDtos.size() < 1) {
            return HcGovConstant.COMMUNITY_SECURE;
        }
        return tmpCommunityAttrDtos.get(0).getValue();
    }
    /**
     * 构建对象
     *
     * @param orderInfo
     * @return
     */
    private ReportDataDto freshReportDataDto(String orderInfo) {
        ReportDataDto reportDataDto = new ReportDataDto();
        JSONObject reqJson = JSONObject.parseObject(orderInfo);
        ReportDataHeaderDto reportDataHeaderDto = BeanConvertUtil.covertBean(reqJson.getJSONObject("header"), ReportDataHeaderDto.class);
        reportDataDto.setReportDataHeaderDto(reportDataHeaderDto);
        reportDataDto.setReportDataBodyDto(reqJson.getJSONObject("body"));
        return reportDataDto;
    }
    /**
     * 这里预校验,请求报文中不能有 dataFlowId
@@ -77,9 +178,14 @@
     * @param orderInfo
     */
    private void preValiateOrderInfo(String orderInfo) {
       /* if(JSONObject.parseObject(orderInfo).getJSONObject("orders").containsKey("dataFlowId")){
            throw new BusinessException(ResponseConstant.RESULT_CODE_ERROR,"报文中不能存在dataFlowId节点");
        }*/
        JSONObject reqJson = JSONObject.parseObject(orderInfo);
        Assert.hasKeyAndValue(reqJson, "header", "请求报文中未包含header");
        Assert.hasKeyAndValue(reqJson, "body", "请求报文中未包含body");
        JSONObject header = reqJson.getJSONObject("header");
        Assert.hasKeyAndValue(header, "serviceCode", "请求报文中未包含serviceCode");
        Assert.hasKeyAndValue(header, "sign", "请求报文中未包含sign");
        Assert.hasKeyAndValue(header, "resTime", "请求报文中未包含reqTime");
        Assert.hasKeyAndValue(header, "code", "请求报文中未包含reqTime");
        Assert.hasKeyAndValue(header, "msg", "请求报文中未包含reqTime");
    }
}