From 05683f2b2bdbdbe21cf17ad523c21ab338bd1c54 Mon Sep 17 00:00:00 2001
From: wuxw <928255095@qq.com>
Date: 星期二, 19 七月 2022 21:49:55 +0800
Subject: [PATCH] 优化添加设备 功能

---
 service-job/src/main/java/com/java110/job/kafka/JobServiceKafka.java |  118 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 files changed, 112 insertions(+), 6 deletions(-)

diff --git a/service-job/src/main/java/com/java110/job/kafka/JobServiceKafka.java b/service-job/src/main/java/com/java110/job/kafka/JobServiceKafka.java
index 2c692b2..a1fd229 100755
--- a/service-job/src/main/java/com/java110/job/kafka/JobServiceKafka.java
+++ b/service-job/src/main/java/com/java110/job/kafka/JobServiceKafka.java
@@ -3,21 +3,34 @@
 import com.alibaba.fastjson.JSONObject;
 import com.java110.core.base.controller.BaseController;
 import com.java110.core.context.BusinessServiceDataFlow;
+import com.java110.core.factory.AuthenticationFactory;
 import com.java110.core.factory.DataTransactionFactory;
+import com.java110.dto.community.CommunityAttrDto;
+import com.java110.dto.community.CommunityDto;
+import com.java110.dto.reportData.ReportDataDto;
+import com.java110.dto.reportData.ReportDataHeaderDto;
+import com.java110.intf.community.ICommunityInnerServiceSMO;
+import com.java110.job.adapt.hcGov.HcGovConstant;
+import com.java110.job.adapt.hcGov.IReportReturnDataAdapt;
+import com.java110.job.adapt.hcGov.asyn.BaseHcGovSendAsyn;
 import com.java110.job.smo.IJobServiceSMO;
 import com.java110.utils.constant.KafkaConstant;
 import com.java110.utils.constant.ResponseConstant;
 import com.java110.utils.constant.StatusConstant;
 import com.java110.utils.exception.InitConfigDataException;
 import com.java110.utils.exception.InitDataFlowContextException;
+import com.java110.utils.factory.ApplicationContextFactory;
 import com.java110.utils.kafka.KafkaFactory;
+import com.java110.utils.util.Assert;
+import com.java110.utils.util.BeanConvertUtil;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.java110.core.log.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.kafka.annotation.KafkaListener;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -28,8 +41,14 @@
 
     private final static Logger logger = LoggerFactory.getLogger(JobServiceKafka.class);
 
+    public static final String KAFKA_RETURN = "_RETURN";
+
     @Autowired
     private IJobServiceSMO jobServiceSMOImpl;
+    @Autowired
+    private ICommunityInnerServiceSMO communityInnerServiceSMOImpl;
+    @Autowired
+    private BaseHcGovSendAsyn baseHcGovSendAsynImpl;
 
     @KafkaListener(topics = {"jobServiceTopic"})
     public void listen(ConsumerRecord<?, ?> record) {
@@ -43,7 +62,7 @@
             //棰勬牎楠�
             preValiateOrderInfo(orderInfo);
             businessServiceDataFlow = this.writeDataToDataFlowContext(orderInfo, headers);
-            responseJson = jobServiceSMOImpl.service(businessServiceDataFlow);
+            //responseJson = jobServiceSMOImpl.service(businessServiceDataFlow);
         } catch (InitDataFlowContextException e) {
             logger.error("璇锋眰鎶ユ枃閿欒,鍒濆鍖� BusinessServiceDataFlow澶辫触" + orderInfo, e);
             responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo, ResponseConstant.RESULT_PARAM_ERROR, e.getMessage(), null);
@@ -70,6 +89,88 @@
         }
     }
 
+    @KafkaListener(topics = {"${kafka.hcGovTopic}"})
+    public void hcGovListen(ConsumerRecord<?, ?> record) {
+
+        String orderInfo = record.value().toString();
+        logger.debug("hcGovkafka 鎺ユ敹鍒版暟鎹�", orderInfo);
+        doListen(orderInfo);
+
+    }
+
+    private void doListen(String orderInfo) {
+        JSONObject reqJson = JSONObject.parseObject(orderInfo);
+        ReportDataDto reportDataDto = null;
+        JSONObject header = reqJson.getJSONObject("header");
+
+        String extCommunityId = header.getString("extCommunityId");
+
+        try {
+            //鑾峰緱灏忓尯瀵硅薄
+            String secure = getExtCommunityCode(extCommunityId);
+            //鎶ユ枃鍚堣鎬ф牎楠�
+            preValiateOrderInfo(orderInfo);
+            //鏋勫缓瀵硅薄
+            reportDataDto = freshReportDataDto(orderInfo);
+            //绛惧悕璁よ瘉
+            AuthenticationFactory.authReportDataSign(reportDataDto, secure);
+            //閫傞厤鍣�
+            IReportReturnDataAdapt reportDataAdapt = ApplicationContextFactory.getBean(reportDataDto.getReportDataHeaderDto().getServiceCode() + KAFKA_RETURN, IReportReturnDataAdapt.class);
+            if (reportDataAdapt == null) {
+                throw new IllegalArgumentException("serviceCode 閿欒 璇锋鏌�");
+            }
+            //涓氬姟澶勭悊
+            reportDataAdapt.reportReturn(reportDataDto, extCommunityId);
+
+        } catch (Exception e) {
+            logger.error("鏀垮姟鍥炲啓澶辫触", e);
+        }finally {
+            //鍥炲啓鏃ュ織琛�
+            baseHcGovSendAsynImpl.updateHcGovLog(reqJson);
+        }
+
+    }
+
+    /**
+     * 鑾峰彇灏忓尯瀵嗛挜
+     *
+     * @param extCommunityId
+     * @return
+     */
+    private String getExtCommunityCode(String extCommunityId) {
+
+        CommunityAttrDto communityAttrDto = new CommunityAttrDto();
+        communityAttrDto.setSpecCd(CommunityAttrDto.SPEC_CD_GOV);
+        communityAttrDto.setValue(extCommunityId);
+        List<CommunityAttrDto> tmpCommunityAttrDtos = communityInnerServiceSMOImpl.getCommunityAttrs(communityAttrDto);
+        if (tmpCommunityAttrDtos == null || tmpCommunityAttrDtos.size() < 1) {
+            return HcGovConstant.COMMUNITY_SECURE;
+        }
+        communityAttrDto = new CommunityAttrDto();
+        communityAttrDto.setSpecCd(CommunityAttrDto.SPEC_CD_GOV_SECURE);
+        communityAttrDto.setCommunityId(tmpCommunityAttrDtos.get(0).getCommunityId());
+        tmpCommunityAttrDtos = communityInnerServiceSMOImpl.getCommunityAttrs(communityAttrDto);
+        if (tmpCommunityAttrDtos == null || tmpCommunityAttrDtos.size() < 1) {
+            return HcGovConstant.COMMUNITY_SECURE;
+        }
+        return tmpCommunityAttrDtos.get(0).getValue();
+    }
+
+    /**
+     * 鏋勫缓瀵硅薄
+     *
+     * @param orderInfo
+     * @return
+     */
+    private ReportDataDto freshReportDataDto(String orderInfo) {
+        ReportDataDto reportDataDto = new ReportDataDto();
+        JSONObject reqJson = JSONObject.parseObject(orderInfo);
+        ReportDataHeaderDto reportDataHeaderDto = BeanConvertUtil.covertBean(reqJson.getJSONObject("header"), ReportDataHeaderDto.class);
+        reportDataDto.setReportDataHeaderDto(reportDataHeaderDto);
+        reportDataDto.setReportDataBodyDto(reqJson.getJSONObject("body"));
+
+        return reportDataDto;
+    }
 
     /**
      * 杩欓噷棰勬牎楠岋紝璇锋眰鎶ユ枃涓笉鑳芥湁 dataFlowId
@@ -77,9 +178,14 @@
      * @param orderInfo
      */
     private void preValiateOrderInfo(String orderInfo) {
-       /* if(JSONObject.parseObject(orderInfo).getJSONObject("orders").containsKey("dataFlowId")){
-            throw new BusinessException(ResponseConstant.RESULT_CODE_ERROR,"鎶ユ枃涓笉鑳藉瓨鍦╠ataFlowId鑺傜偣");
-        }*/
+        JSONObject reqJson = JSONObject.parseObject(orderInfo);
+        Assert.hasKeyAndValue(reqJson, "header", "璇锋眰鎶ユ枃涓湭鍖呭惈header");
+        Assert.hasKeyAndValue(reqJson, "body", "璇锋眰鎶ユ枃涓湭鍖呭惈body");
+        JSONObject header = reqJson.getJSONObject("header");
+        Assert.hasKeyAndValue(header, "serviceCode", "璇锋眰鎶ユ枃涓湭鍖呭惈serviceCode");
+        Assert.hasKeyAndValue(header, "sign", "璇锋眰鎶ユ枃涓湭鍖呭惈sign");
+        Assert.hasKeyAndValue(header, "resTime", "璇锋眰鎶ユ枃涓湭鍖呭惈reqTime");
+        Assert.hasKeyAndValue(header, "code", "璇锋眰鎶ユ枃涓湭鍖呭惈reqTime");
+        Assert.hasKeyAndValue(header, "msg", "璇锋眰鎶ユ枃涓湭鍖呭惈reqTime");
     }
-
 }

--
Gitblit v1.8.0