From 7afa32638bd4c1eec05a19a6586af3f4b1ce8ccb Mon Sep 17 00:00:00 2001
From: wuxw7 <wuxw7@asiainfo.com>
Date: 星期六, 16 六月 2018 01:25:43 +0800
Subject: [PATCH] 日志服务实现
---
LogService/src/main/java/com/java110/log/kafka/LogServiceKafka.java | 64 ++++++++++----------------------
1 files changed, 20 insertions(+), 44 deletions(-)
diff --git a/LogService/src/main/java/com/java110/log/kafka/LogServiceKafka.java b/LogService/src/main/java/com/java110/log/kafka/LogServiceKafka.java
index 8db6487..6405fb2 100644
--- a/LogService/src/main/java/com/java110/log/kafka/LogServiceKafka.java
+++ b/LogService/src/main/java/com/java110/log/kafka/LogServiceKafka.java
@@ -20,6 +20,23 @@
/**
* kafka渚﹀惉
+ * 鎺ュ彈鍒扮殑淇℃伅鎶ユ枃鏍煎紡涓猴細
+ * {
+ * "transactionId":"浜ゆ槗娴佹按鍙�",
+ * "dataFlowId":"涓婁笅鏂囧璞�",
+ * "ip":"ip",
+ * "port":"绔彛",
+ * "srcIp":"璋冪敤鏂笽P",
+ * "srcPort":"璋冪敤鏂圭鍙�",
+ * "appId":"搴旂敤ID",
+ * "userId":"鐢ㄦ埛ID",
+ * "serviceCode":"鏈嶅姟缂栫爜",
+ * "serviceName":"鏈嶅姟鍚嶇О",
+ * "timestamp":"鏃堕棿鍌�",
+ * "logStatus":"璁板綍鐘舵��",
+ * "requestMessage":"璇锋眰淇℃伅",
+ * "responseMessage":"杩斿洖淇℃伅"
+ * }
* Created by wuxw on 2018/4/15.
*/
public class LogServiceKafka extends BaseController {
@@ -29,53 +46,12 @@
@KafkaListener(topics = {KafkaConstant.TOPIC_LOG_NAME})
public void listen(ConsumerRecord<?, ?> record) {
- logger.info("kafka鐨刱ey: " + record.key());
- logger.info("kafka鐨剉alue: " + record.value().toString());
- String orderInfo = record.value().toString();
- BusinessServiceDataFlow businessServiceDataFlow = null;
- JSONObject responseJson = null;
- try {
- Map<String, String> headers = new HashMap<String, String>();
- //棰勬牎楠�
- preValiateOrderInfo(orderInfo);
- businessServiceDataFlow = this.writeDataToDataFlowContext(orderInfo, headers);
- //responseJson = logServiceSMOImpl.service(businessServiceDataFlow);
- }catch (InitDataFlowContextException e){
- logger.error("璇锋眰鎶ユ枃閿欒,鍒濆鍖� BusinessServiceDataFlow澶辫触"+orderInfo,e);
- responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo,ResponseConstant.RESULT_PARAM_ERROR,e.getMessage(),null);
- }catch (InitConfigDataException e){
- logger.error("璇锋眰鎶ユ枃閿欒,鍔犺浇閰嶇疆淇℃伅澶辫触"+orderInfo,e);
- responseJson = DataTransactionFactory.createNoBusinessTypeBusinessResponseJson(orderInfo,ResponseConstant.RESULT_PARAM_ERROR,e.getMessage(),null);
- }catch (Exception e){
- logger.error("璇锋眰璁㈠崟寮傚父",e);
- responseJson = DataTransactionFactory.createBusinessResponseJson(businessServiceDataFlow,ResponseConstant.RESULT_CODE_ERROR,e.getMessage()+e,
- null);
- }finally {
- logger.debug("褰撳墠璇锋眰鎶ユ枃锛�" + orderInfo +", 褰撳墠杩斿洖鎶ユ枃锛�" +responseJson.toJSONString());
- //鍙湁business 鍜� instance 杩囩▼鎵嶅仛閫氱煡娑堟伅
- if(!StatusConstant.REQUEST_BUSINESS_TYPE_BUSINESS.equals(responseJson.getString("businessType"))
- && !StatusConstant.REQUEST_BUSINESS_TYPE_INSTANCE.equals(responseJson.getString("businessType"))){
- return ;
- }
- try {
- KafkaFactory.sendKafkaMessage(KafkaConstant.TOPIC_NOTIFY_CENTER_SERVICE_NAME, "", responseJson.toJSONString());
- }catch (Exception e){
- logger.error("鐢ㄦ埛鏈嶅姟閫氱煡centerService澶辫触"+responseJson,e);
- //杩欓噷淇濆瓨寮傚父淇℃伅
- }
- }
+ logger.info("LogServiceKafka receive message: {}", record.value().toString());
+ String logMessage = record.value().toString();
+ logServiceSMOImpl.saveLogMessage(logMessage);
}
- /**
- * 杩欓噷棰勬牎楠岋紝璇锋眰鎶ユ枃涓笉鑳芥湁 dataFlowId
- * @param orderInfo
- */
- private void preValiateOrderInfo(String orderInfo) {
- /* if(JSONObject.parseObject(orderInfo).getJSONObject("orders").containsKey("dataFlowId")){
- throw new BusinessException(ResponseConstant.RESULT_CODE_ERROR,"鎶ユ枃涓笉鑳藉瓨鍦╠ataFlowId鑺傜偣");
- }*/
- }
public ILogServiceSMO getLogServiceSMOImpl() {
return logServiceSMOImpl;
--
Gitblit v1.8.0