From 05683f2b2bdbdbe21cf17ad523c21ab338bd1c54 Mon Sep 17 00:00:00 2001
From: wuxw <928255095@qq.com>
Date: 星期二, 19 七月 2022 21:49:55 +0800
Subject: [PATCH] 优化添加设备 功能
---
service-job/src/main/java/com/java110/job/kafka/JobServiceKafka.java | 118 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
1 files changed, 112 insertions(+), 6 deletions(-)
diff --git a/service-job/src/main/java/com/java110/job/kafka/JobServiceKafka.java b/service-job/src/main/java/com/java110/job/kafka/JobServiceKafka.java
index 2c692b2..a1fd229 100755
--- a/service-job/src/main/java/com/java110/job/kafka/JobServiceKafka.java
+++ b/service-job/src/main/java/com/java110/job/kafka/JobServiceKafka.java
@@ -3,21 +3,34 @@
import com.alibaba.fastjson.JSONObject;
import com.java110.core.base.controller.BaseController;
import com.java110.core.context.BusinessServiceDataFlow;
+import com.java110.core.factory.AuthenticationFactory;
import com.java110.core.factory.DataTransactionFactory;
+import com.java110.dto.community.CommunityAttrDto;
+import com.java110.dto.community.CommunityDto;
+import com.java110.dto.reportData.ReportDataDto;
+import com.java110.dto.reportData.ReportDataHeaderDto;
+import com.java110.intf.community.ICommunityInnerServiceSMO;
+import com.java110.job.adapt.hcGov.HcGovConstant;
+import com.java110.job.adapt.hcGov.IReportReturnDataAdapt;
+import com.java110.job.adapt.hcGov.asyn.BaseHcGovSendAsyn;
import com.java110.job.smo.IJobServiceSMO;
import com.java110.utils.constant.KafkaConstant;
import com.java110.utils.constant.ResponseConstant;
import com.java110.utils.constant.StatusConstant;
import com.java110.utils.exception.InitConfigDataException;
import com.java110.utils.exception.InitDataFlowContextException;
+import com.java110.utils.factory.ApplicationContextFactory;
import com.java110.utils.kafka.KafkaFactory;
+import com.java110.utils.util.Assert;
+import com.java110.utils.util.BeanConvertUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.java110.core.log.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -28,8 +41,14 @@
private final static Logger logger = LoggerFactory.getLogger(JobServiceKafka.class);
+ public static final String KAFKA_RETURN = "_RETURN";
+
@Autowired
private IJobServiceSMO jobServiceSMOImpl;
+ @Autowired
+ private ICommunityInnerServiceSMO communityInnerServiceSMOImpl;
+ @Autowired
+ private BaseHcGovSendAsyn baseHcGovSendAsynImpl;
@KafkaListener(topics = {"jobServiceTopic"})
public void listen(ConsumerRecord<?, ?> record) {
@@ -43,7 +62,7 @@
//棰勬牎楠�
preValiateOrderInfo(orderInfo);
businessServiceDataFlow = this.writeDataToDataFlowContext(orderInfo, headers);
- responseJson = jobServiceSMOImpl.service(businessServiceDataFlow);
+ //responseJson = jobServiceSMOImpl.service(businessServiceDataFlow);
} catch (InitDataFlowContextException e) {
logger.error("璇锋眰鎶ユ枃閿欒,鍒濆鍖� BusinessServiceDataFlow澶辫触" + orderInfo, e);
responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo, ResponseConstant.RESULT_PARAM_ERROR, e.getMessage(), null);
@@ -70,6 +89,88 @@
}
}
+ @KafkaListener(topics = {"${kafka.hcGovTopic}"})
+ public void hcGovListen(ConsumerRecord<?, ?> record) {
+
+ String orderInfo = record.value().toString();
+ logger.debug("hcGovkafka 鎺ユ敹鍒版暟鎹�", orderInfo);
+ doListen(orderInfo);
+
+ }
+
+ private void doListen(String orderInfo) {
+ JSONObject reqJson = JSONObject.parseObject(orderInfo);
+ ReportDataDto reportDataDto = null;
+ JSONObject header = reqJson.getJSONObject("header");
+
+ String extCommunityId = header.getString("extCommunityId");
+
+ try {
+ //鑾峰緱灏忓尯瀵硅薄
+ String secure = getExtCommunityCode(extCommunityId);
+ //鎶ユ枃鍚堣鎬ф牎楠�
+ preValiateOrderInfo(orderInfo);
+ //鏋勫缓瀵硅薄
+ reportDataDto = freshReportDataDto(orderInfo);
+ //绛惧悕璁よ瘉
+ AuthenticationFactory.authReportDataSign(reportDataDto, secure);
+ //閫傞厤鍣�
+ IReportReturnDataAdapt reportDataAdapt = ApplicationContextFactory.getBean(reportDataDto.getReportDataHeaderDto().getServiceCode() + KAFKA_RETURN, IReportReturnDataAdapt.class);
+ if (reportDataAdapt == null) {
+ throw new IllegalArgumentException("serviceCode 閿欒 璇锋鏌�");
+ }
+ //涓氬姟澶勭悊
+ reportDataAdapt.reportReturn(reportDataDto, extCommunityId);
+
+ } catch (Exception e) {
+ logger.error("鏀垮姟鍥炲啓澶辫触", e);
+ }finally {
+ //鍥炲啓鏃ュ織琛�
+ baseHcGovSendAsynImpl.updateHcGovLog(reqJson);
+ }
+
+ }
+
+ /**
+ * 鑾峰彇灏忓尯瀵嗛挜
+ *
+ * @param extCommunityId
+ * @return
+ */
+ private String getExtCommunityCode(String extCommunityId) {
+
+ CommunityAttrDto communityAttrDto = new CommunityAttrDto();
+ communityAttrDto.setSpecCd(CommunityAttrDto.SPEC_CD_GOV);
+ communityAttrDto.setValue(extCommunityId);
+ List<CommunityAttrDto> tmpCommunityAttrDtos = communityInnerServiceSMOImpl.getCommunityAttrs(communityAttrDto);
+ if (tmpCommunityAttrDtos == null || tmpCommunityAttrDtos.size() < 1) {
+ return HcGovConstant.COMMUNITY_SECURE;
+ }
+ communityAttrDto = new CommunityAttrDto();
+ communityAttrDto.setSpecCd(CommunityAttrDto.SPEC_CD_GOV_SECURE);
+ communityAttrDto.setCommunityId(tmpCommunityAttrDtos.get(0).getCommunityId());
+ tmpCommunityAttrDtos = communityInnerServiceSMOImpl.getCommunityAttrs(communityAttrDto);
+ if (tmpCommunityAttrDtos == null || tmpCommunityAttrDtos.size() < 1) {
+ return HcGovConstant.COMMUNITY_SECURE;
+ }
+ return tmpCommunityAttrDtos.get(0).getValue();
+ }
+
+ /**
+ * 鏋勫缓瀵硅薄
+ *
+ * @param orderInfo
+ * @return
+ */
+ private ReportDataDto freshReportDataDto(String orderInfo) {
+ ReportDataDto reportDataDto = new ReportDataDto();
+ JSONObject reqJson = JSONObject.parseObject(orderInfo);
+ ReportDataHeaderDto reportDataHeaderDto = BeanConvertUtil.covertBean(reqJson.getJSONObject("header"), ReportDataHeaderDto.class);
+ reportDataDto.setReportDataHeaderDto(reportDataHeaderDto);
+ reportDataDto.setReportDataBodyDto(reqJson.getJSONObject("body"));
+
+ return reportDataDto;
+ }
/**
* 杩欓噷棰勬牎楠岋紝璇锋眰鎶ユ枃涓笉鑳芥湁 dataFlowId
@@ -77,9 +178,14 @@
* @param orderInfo
*/
private void preValiateOrderInfo(String orderInfo) {
- /* if(JSONObject.parseObject(orderInfo).getJSONObject("orders").containsKey("dataFlowId")){
- throw new BusinessException(ResponseConstant.RESULT_CODE_ERROR,"鎶ユ枃涓笉鑳藉瓨鍦╠ataFlowId鑺傜偣");
- }*/
+ JSONObject reqJson = JSONObject.parseObject(orderInfo);
+ Assert.hasKeyAndValue(reqJson, "header", "璇锋眰鎶ユ枃涓湭鍖呭惈header");
+ Assert.hasKeyAndValue(reqJson, "body", "璇锋眰鎶ユ枃涓湭鍖呭惈body");
+ JSONObject header = reqJson.getJSONObject("header");
+ Assert.hasKeyAndValue(header, "serviceCode", "璇锋眰鎶ユ枃涓湭鍖呭惈serviceCode");
+ Assert.hasKeyAndValue(header, "sign", "璇锋眰鎶ユ枃涓湭鍖呭惈sign");
+ Assert.hasKeyAndValue(header, "resTime", "璇锋眰鎶ユ枃涓湭鍖呭惈reqTime");
+ Assert.hasKeyAndValue(header, "code", "璇锋眰鎶ユ枃涓湭鍖呭惈reqTime");
+ Assert.hasKeyAndValue(header, "msg", "璇锋眰鎶ユ枃涓湭鍖呭惈reqTime");
}
-
}
--
Gitblit v1.8.0