package com.java110.job.kafka; import com.alibaba.fastjson.JSONObject; import com.java110.core.base.controller.BaseController; import com.java110.core.context.BusinessServiceDataFlow; import com.java110.core.factory.DataTransactionFactory; import com.java110.job.smo.IJobServiceSMO; import com.java110.utils.constant.KafkaConstant; import com.java110.utils.constant.ResponseConstant; import com.java110.utils.constant.StatusConstant; import com.java110.utils.exception.InitConfigDataException; import com.java110.utils.exception.InitDataFlowContextException; import com.java110.utils.kafka.KafkaFactory; import com.java110.utils.util.Assert; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import java.util.HashMap; import java.util.Map; /** * kafka侦听 * Created by wuxw on 2018/4/15. */ public class JobServiceKafka extends BaseController { private final static Logger logger = LoggerFactory.getLogger(JobServiceKafka.class); @Autowired private IJobServiceSMO jobServiceSMOImpl; @KafkaListener(topics = {"jobServiceTopic"}) public void listen(ConsumerRecord record) { logger.info("kafka的key: " + record.key()); logger.info("kafka的value: " + record.value().toString()); String orderInfo = record.value().toString(); BusinessServiceDataFlow businessServiceDataFlow = null; JSONObject responseJson = null; try { Map headers = new HashMap(); //预校验 preValiateOrderInfo(orderInfo); businessServiceDataFlow = this.writeDataToDataFlowContext(orderInfo, headers); //responseJson = jobServiceSMOImpl.service(businessServiceDataFlow); } catch (InitDataFlowContextException e) { logger.error("请求报文错误,初始化 BusinessServiceDataFlow失败" + orderInfo, e); responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo, ResponseConstant.RESULT_PARAM_ERROR, e.getMessage(), null); } catch (InitConfigDataException e) { logger.error("请求报文错误,加载配置信息失败" + orderInfo, e); responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo, ResponseConstant.RESULT_PARAM_ERROR, e.getMessage(), null); } catch (Exception e) { logger.error("请求订单异常", e); responseJson = DataTransactionFactory.createBusinessResponseJson(businessServiceDataFlow, ResponseConstant.RESULT_CODE_ERROR, e.getMessage() + e, null); } finally { logger.debug("当前请求报文:" + orderInfo + ", 当前返回报文:" + responseJson.toJSONString()); //只有business 和 instance 过程才做通知消息 if (!StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(responseJson.getString("businessType")) && !StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(responseJson.getString("businessType"))) { return; } try { KafkaFactory.sendKafkaMessage(KafkaConstant.TOPIC_NOTIFY_CENTER_SERVICE_NAME, "", responseJson.toJSONString()); } catch (Exception e) { logger.error("用户服务通知centerService失败" + responseJson, e); //这里保存异常信息 } } } @KafkaListener(topics = {"${kafka.hcGovTopic}"}) public void hcGovListen(ConsumerRecord record) { logger.info("kafka的key: " + record.key()); logger.info("kafka的value: " + record.value().toString()); String orderInfo = record.value().toString(); try { logger.debug("hcGovkafka 接收到数据", orderInfo); //responseJson = jobServiceSMOImpl.service(businessServiceDataFlow); } catch (Exception e) { logger.error("请求订单异常", e); } finally { } } /** * 这里预校验,请求报文中不能有 dataFlowId * * @param orderInfo */ private void preValiateOrderInfo(String orderInfo) { JSONObject reqJson = JSONObject.parseObject(orderInfo); Assert.hasKeyAndValue(reqJson, "header", "请求报文中未包含header"); Assert.hasKeyAndValue(reqJson, "body", "请求报文中未包含body"); JSONObject header = reqJson.getJSONObject("header"); Assert.hasKeyAndValue(header, "serviceCode", "请求报文中未包含serviceCode"); Assert.hasKeyAndValue(header, "sign", "请求报文中未包含sign"); Assert.hasKeyAndValue(header, "resTime", "请求报文中未包含reqTime"); Assert.hasKeyAndValue(header, "code", "请求报文中未包含reqTime"); Assert.hasKeyAndValue(header, "msg", "请求报文中未包含reqTime"); } }