wuxw
2022-07-19 05683f2b2bdbdbe21cf17ad523c21ab338bd1c54
service-job/src/main/java/com/java110/job/kafka/JobServiceKafka.java
@@ -3,22 +3,34 @@
import com.alibaba.fastjson.JSONObject;
import com.java110.core.base.controller.BaseController;
import com.java110.core.context.BusinessServiceDataFlow;
import com.java110.core.factory.AuthenticationFactory;
import com.java110.core.factory.DataTransactionFactory;
import com.java110.dto.community.CommunityAttrDto;
import com.java110.dto.community.CommunityDto;
import com.java110.dto.reportData.ReportDataDto;
import com.java110.dto.reportData.ReportDataHeaderDto;
import com.java110.intf.community.ICommunityInnerServiceSMO;
import com.java110.job.adapt.hcGov.HcGovConstant;
import com.java110.job.adapt.hcGov.IReportReturnDataAdapt;
import com.java110.job.adapt.hcGov.asyn.BaseHcGovSendAsyn;
import com.java110.job.smo.IJobServiceSMO;
import com.java110.utils.constant.KafkaConstant;
import com.java110.utils.constant.ResponseConstant;
import com.java110.utils.constant.StatusConstant;
import com.java110.utils.exception.InitConfigDataException;
import com.java110.utils.exception.InitDataFlowContextException;
import com.java110.utils.factory.ApplicationContextFactory;
import com.java110.utils.kafka.KafkaFactory;
import com.java110.utils.util.Assert;
import com.java110.utils.util.BeanConvertUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.java110.core.log.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@@ -29,8 +41,14 @@
    private final static Logger logger = LoggerFactory.getLogger(JobServiceKafka.class);
    public static final String KAFKA_RETURN = "_RETURN";
    @Autowired
    private IJobServiceSMO jobServiceSMOImpl;
    @Autowired
    private ICommunityInnerServiceSMO communityInnerServiceSMOImpl;
    @Autowired
    private BaseHcGovSendAsyn baseHcGovSendAsynImpl;
    @KafkaListener(topics = {"jobServiceTopic"})
    public void listen(ConsumerRecord<?, ?> record) {
@@ -71,6 +89,88 @@
        }
    }
    @KafkaListener(topics = {"${kafka.hcGovTopic}"})
    public void hcGovListen(ConsumerRecord<?, ?> record) {
        String orderInfo = record.value().toString();
        logger.debug("hcGovkafka 接收到数据", orderInfo);
        doListen(orderInfo);
    }
    private void doListen(String orderInfo) {
        JSONObject reqJson = JSONObject.parseObject(orderInfo);
        ReportDataDto reportDataDto = null;
        JSONObject header = reqJson.getJSONObject("header");
        String extCommunityId = header.getString("extCommunityId");
        try {
            //获得小区对象
            String secure = getExtCommunityCode(extCommunityId);
            //报文合规性校验
            preValiateOrderInfo(orderInfo);
            //构建对象
            reportDataDto = freshReportDataDto(orderInfo);
            //签名认证
            AuthenticationFactory.authReportDataSign(reportDataDto, secure);
            //适配器
            IReportReturnDataAdapt reportDataAdapt = ApplicationContextFactory.getBean(reportDataDto.getReportDataHeaderDto().getServiceCode() + KAFKA_RETURN, IReportReturnDataAdapt.class);
            if (reportDataAdapt == null) {
                throw new IllegalArgumentException("serviceCode 错误 请检查");
            }
            //业务处理
            reportDataAdapt.reportReturn(reportDataDto, extCommunityId);
        } catch (Exception e) {
            logger.error("政务回写失败", e);
        }finally {
            //回写日志表
            baseHcGovSendAsynImpl.updateHcGovLog(reqJson);
        }
    }
    /**
     * 获取小区密钥
     *
     * @param extCommunityId
     * @return
     */
    private String getExtCommunityCode(String extCommunityId) {
        CommunityAttrDto communityAttrDto = new CommunityAttrDto();
        communityAttrDto.setSpecCd(CommunityAttrDto.SPEC_CD_GOV);
        communityAttrDto.setValue(extCommunityId);
        List<CommunityAttrDto> tmpCommunityAttrDtos = communityInnerServiceSMOImpl.getCommunityAttrs(communityAttrDto);
        if (tmpCommunityAttrDtos == null || tmpCommunityAttrDtos.size() < 1) {
            return HcGovConstant.COMMUNITY_SECURE;
        }
        communityAttrDto = new CommunityAttrDto();
        communityAttrDto.setSpecCd(CommunityAttrDto.SPEC_CD_GOV_SECURE);
        communityAttrDto.setCommunityId(tmpCommunityAttrDtos.get(0).getCommunityId());
        tmpCommunityAttrDtos = communityInnerServiceSMOImpl.getCommunityAttrs(communityAttrDto);
        if (tmpCommunityAttrDtos == null || tmpCommunityAttrDtos.size() < 1) {
            return HcGovConstant.COMMUNITY_SECURE;
        }
        return tmpCommunityAttrDtos.get(0).getValue();
    }
    /**
     * 构建对象
     *
     * @param orderInfo
     * @return
     */
    private ReportDataDto freshReportDataDto(String orderInfo) {
        ReportDataDto reportDataDto = new ReportDataDto();
        JSONObject reqJson = JSONObject.parseObject(orderInfo);
        ReportDataHeaderDto reportDataHeaderDto = BeanConvertUtil.covertBean(reqJson.getJSONObject("header"), ReportDataHeaderDto.class);
        reportDataDto.setReportDataHeaderDto(reportDataHeaderDto);
        reportDataDto.setReportDataBodyDto(reqJson.getJSONObject("body"));
        return reportDataDto;
    }
    /**
     * 这里预校验,请求报文中不能有 dataFlowId