| | |
| | | import com.java110.core.factory.DataTransactionFactory; |
| | | import com.java110.log.smo.ILogServiceSMO; |
| | | import org.apache.kafka.clients.consumer.ConsumerRecord; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.kafka.annotation.KafkaListener; |
| | | |
| | |
| | | |
| | | /** |
| | | * kafka侦听 |
| | | * 接受到的信息报文格式为: |
| | | * { |
| | | * "transactionId":"交易流水号", |
| | | * "dataFlowId":"上下文对象", |
| | | * "ip":"ip", |
| | | * "port":"端口", |
| | | * "srcIp":"调用方IP", |
| | | * "srcPort":"调用方端口", |
| | | * "appId":"应用ID", |
| | | * "userId":"用户ID", |
| | | * "serviceCode":"服务编码", |
| | | * "serviceName":"服务名称", |
| | | * "timestamp":"时间储", |
| | | * "logStatus":"记录状态", |
| | | * "requestMessage":"请求信息", |
| | | * "responseMessage":"返回信息" |
| | | * } |
| | | * Created by wuxw on 2018/4/15. |
| | | */ |
| | | public class LogServiceKafka extends BaseController { |
| | | |
| | | private final static Logger logger = LoggerFactory.getLogger(LogServiceKafka.class); |
| | | |
| | | |
| | | @Autowired |
| | | private ILogServiceSMO logServiceSMOImpl; |
| | | |
| | | @KafkaListener(topics = {KafkaConstant.TOPIC_LOG_NAME}) |
| | | @KafkaListener(topics = {"LOG"}) |
| | | public void listen(ConsumerRecord<?, ?> record) { |
| | | logger.info("kafka的key: " + record.key()); |
| | | logger.info("kafka的value: " + record.value().toString()); |
| | | String orderInfo = record.value().toString(); |
| | | BusinessServiceDataFlow businessServiceDataFlow = null; |
| | | JSONObject responseJson = null; |
| | | try { |
| | | Map<String, String> headers = new HashMap<String, String>(); |
| | | //预校验 |
| | | preValiateOrderInfo(orderInfo); |
| | | businessServiceDataFlow = this.writeDataToDataFlowContext(orderInfo, headers); |
| | | //responseJson = logServiceSMOImpl.service(businessServiceDataFlow); |
| | | }catch (InitDataFlowContextException e){ |
| | | logger.error("请求报文错误,初始化 BusinessServiceDataFlow失败"+orderInfo,e); |
| | | responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo,ResponseConstant.RESULT_PARAM_ERROR,e.getMessage(),null); |
| | | }catch (InitConfigDataException e){ |
| | | logger.error("请求报文错误,加载配置信息失败"+orderInfo,e); |
| | | responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo,ResponseConstant.RESULT_PARAM_ERROR,e.getMessage(),null); |
| | | }catch (Exception e){ |
| | | logger.error("请求订单异常",e); |
| | | responseJson = DataTransactionFactory.createBusinessResponseJson(businessServiceDataFlow,ResponseConstant.RESULT_CODE_ERROR,e.getMessage()+e, |
| | | null); |
| | | }finally { |
| | | logger.debug("当前请求报文:" + orderInfo +", 当前返回报文:" +responseJson.toJSONString()); |
| | | //只有business 和 instance 过程才做通知消息 |
| | | if(!StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(responseJson.getString("businessType")) |
| | | && !StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(responseJson.getString("businessType"))){ |
| | | return ; |
| | | } |
| | | try { |
| | | KafkaFactory.sendKafkaMessage(KafkaConstant.TOPIC_NOTIFY_CENTER_SERVICE_NAME, "", responseJson.toJSONString()); |
| | | }catch (Exception e){ |
| | | logger.error("用户服务通知centerService失败"+responseJson,e); |
| | | //这里保存异常信息 |
| | | } |
| | | } |
| | | logger.info("LogServiceKafka receive message: {}", record.value().toString()); |
| | | String logMessage = record.value().toString(); |
| | | logServiceSMOImpl.saveLogMessage(logMessage); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 这里预校验,请求报文中不能有 dataFlowId |
| | | * @param orderInfo |
| | | */ |
| | | private void preValiateOrderInfo(String orderInfo) { |
| | | /* if(JSONObject.parseObject(orderInfo).getJSONObject("orders").containsKey("dataFlowId")){ |
| | | throw new BusinessException(ResponseConstant.RESULT_CODE_ERROR,"报文中不能存在dataFlowId节点"); |
| | | }*/ |
| | | } |
| | | |
| | | public ILogServiceSMO getLogServiceSMOImpl() { |
| | | return logServiceSMOImpl; |