| | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.java110.core.base.controller.BaseController; |
| | | import com.java110.core.context.BusinessServiceDataFlow; |
| | | import com.java110.core.factory.AuthenticationFactory; |
| | | import com.java110.core.factory.DataTransactionFactory; |
| | | import com.java110.dto.community.CommunityAttrDto; |
| | | import com.java110.dto.community.CommunityDto; |
| | | import com.java110.dto.reportData.ReportDataDto; |
| | | import com.java110.dto.reportData.ReportDataHeaderDto; |
| | | import com.java110.intf.community.ICommunityInnerServiceSMO; |
| | | import com.java110.job.adapt.hcGov.HcGovConstant; |
| | | import com.java110.job.adapt.hcGov.IReportReturnDataAdapt; |
| | | import com.java110.job.adapt.hcGov.asyn.BaseHcGovSendAsyn; |
| | | import com.java110.job.smo.IJobServiceSMO; |
| | | import com.java110.utils.constant.KafkaConstant; |
| | | import com.java110.utils.constant.ResponseConstant; |
| | | import com.java110.utils.constant.StatusConstant; |
| | | import com.java110.utils.exception.InitConfigDataException; |
| | | import com.java110.utils.exception.InitDataFlowContextException; |
| | | import com.java110.utils.factory.ApplicationContextFactory; |
| | | import com.java110.utils.kafka.KafkaFactory; |
| | | import com.java110.utils.util.Assert; |
| | | import com.java110.utils.util.BeanConvertUtil; |
| | | import org.apache.kafka.clients.consumer.ConsumerRecord; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import com.java110.core.log.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.kafka.annotation.KafkaListener; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | |
| | | /** |
| | |
| | | |
| | | private final static Logger logger = LoggerFactory.getLogger(JobServiceKafka.class); |
| | | |
| | | public static final String KAFKA_RETURN = "_RETURN"; |
| | | |
| | | @Autowired |
| | | private IJobServiceSMO jobServiceSMOImpl; |
| | | @Autowired |
| | | private ICommunityInnerServiceSMO communityInnerServiceSMOImpl; |
| | | @Autowired |
| | | private BaseHcGovSendAsyn baseHcGovSendAsynImpl; |
| | | |
| | | @KafkaListener(topics = {"jobServiceTopic"}) |
| | | public void listen(ConsumerRecord<?, ?> record) { |
| | |
| | | |
| | | @KafkaListener(topics = {"${kafka.hcGovTopic}"}) |
| | | public void hcGovListen(ConsumerRecord<?, ?> record) { |
| | | logger.info("kafka的key: " + record.key()); |
| | | logger.info("kafka的value: " + record.value().toString()); |
| | | |
| | | String orderInfo = record.value().toString(); |
| | | logger.debug("hcGovkafka 接收到数据", orderInfo); |
| | | doListen(orderInfo); |
| | | |
| | | try { |
| | | logger.debug("hcGovkafka 接收到数据", orderInfo); |
| | | //responseJson = jobServiceSMOImpl.service(businessServiceDataFlow); |
| | | } catch (Exception e) { |
| | | logger.error("请求订单异常", e); |
| | | } finally { |
| | | |
| | | } |
| | | } |
| | | |
| | | private void doListen(String orderInfo) { |
| | | JSONObject reqJson = JSONObject.parseObject(orderInfo); |
| | | ReportDataDto reportDataDto = null; |
| | | JSONObject header = reqJson.getJSONObject("header"); |
| | | |
| | | String extCommunityId = header.getString("extCommunityId"); |
| | | |
| | | try { |
| | | //获得小区对象 |
| | | String secure = getExtCommunityCode(extCommunityId); |
| | | //报文合规性校验 |
| | | preValiateOrderInfo(orderInfo); |
| | | //构建对象 |
| | | reportDataDto = freshReportDataDto(orderInfo); |
| | | //签名认证 |
| | | AuthenticationFactory.authReportDataSign(reportDataDto, secure); |
| | | //适配器 |
| | | IReportReturnDataAdapt reportDataAdapt = ApplicationContextFactory.getBean(reportDataDto.getReportDataHeaderDto().getServiceCode() + KAFKA_RETURN, IReportReturnDataAdapt.class); |
| | | if (reportDataAdapt == null) { |
| | | throw new IllegalArgumentException("serviceCode 错误 请检查"); |
| | | } |
| | | //业务处理 |
| | | reportDataAdapt.reportReturn(reportDataDto, extCommunityId); |
| | | |
| | | } catch (Exception e) { |
| | | logger.error("政务回写失败", e); |
| | | }finally { |
| | | //回写日志表 |
| | | baseHcGovSendAsynImpl.updateHcGovLog(reqJson); |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 获取小区密钥 |
| | | * |
| | | * @param extCommunityId |
| | | * @return |
| | | */ |
| | | private String getExtCommunityCode(String extCommunityId) { |
| | | |
| | | CommunityAttrDto communityAttrDto = new CommunityAttrDto(); |
| | | communityAttrDto.setSpecCd(CommunityAttrDto.SPEC_CD_GOV); |
| | | communityAttrDto.setValue(extCommunityId); |
| | | List<CommunityAttrDto> tmpCommunityAttrDtos = communityInnerServiceSMOImpl.getCommunityAttrs(communityAttrDto); |
| | | if (tmpCommunityAttrDtos == null || tmpCommunityAttrDtos.size() < 1) { |
| | | return HcGovConstant.COMMUNITY_SECURE; |
| | | } |
| | | communityAttrDto = new CommunityAttrDto(); |
| | | communityAttrDto.setSpecCd(CommunityAttrDto.SPEC_CD_GOV_SECURE); |
| | | communityAttrDto.setCommunityId(tmpCommunityAttrDtos.get(0).getCommunityId()); |
| | | tmpCommunityAttrDtos = communityInnerServiceSMOImpl.getCommunityAttrs(communityAttrDto); |
| | | if (tmpCommunityAttrDtos == null || tmpCommunityAttrDtos.size() < 1) { |
| | | return HcGovConstant.COMMUNITY_SECURE; |
| | | } |
| | | return tmpCommunityAttrDtos.get(0).getValue(); |
| | | } |
| | | |
| | | /** |
| | | * 构建对象 |
| | | * |
| | | * @param orderInfo |
| | | * @return |
| | | */ |
| | | private ReportDataDto freshReportDataDto(String orderInfo) { |
| | | ReportDataDto reportDataDto = new ReportDataDto(); |
| | | JSONObject reqJson = JSONObject.parseObject(orderInfo); |
| | | ReportDataHeaderDto reportDataHeaderDto = BeanConvertUtil.covertBean(reqJson.getJSONObject("header"), ReportDataHeaderDto.class); |
| | | reportDataDto.setReportDataHeaderDto(reportDataHeaderDto); |
| | | reportDataDto.setReportDataBodyDto(reqJson.getJSONObject("body")); |
| | | |
| | | return reportDataDto; |
| | | } |
| | | |
| | | /** |
| | | * 这里预校验,请求报文中不能有 dataFlowId |