package com.java110.log.kafka; import com.java110.core.base.controller.BaseController; import com.java110.log.smo.ILogServiceSMO; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; /** * kafka侦听 * 接受到的信息报文格式为: * { * "transactionId":"交易流水号", * "dataFlowId":"上下文对象", * "ip":"ip", * "port":"端口", * "srcIp":"调用方IP", * "srcPort":"调用方端口", * "appId":"应用ID", * "userId":"用户ID", * "serviceCode":"服务编码", * "serviceName":"服务名称", * "timestamp":"时间储", * "logStatus":"记录状态", * "requestMessage":"请求信息", * "responseMessage":"返回信息" * } * Created by wuxw on 2018/4/15. */ public class LogServiceKafka extends BaseController { private final static Logger logger = LoggerFactory.getLogger(LogServiceKafka.class); @Autowired private ILogServiceSMO logServiceSMOImpl; @KafkaListener(topics = {"LOG"}) public void listen(ConsumerRecord record) { logger.info("LogServiceKafka receive message: {}", record.value().toString()); String logMessage = record.value().toString(); logServiceSMOImpl.saveLogMessage(logMessage); } public ILogServiceSMO getLogServiceSMOImpl() { return logServiceSMOImpl; } public void setLogServiceSMOImpl(ILogServiceSMO logServiceSMOImpl) { this.logServiceSMOImpl = logServiceSMOImpl; } }