From a58221e75d2e38172533e17b6590983a9b3cabb9 Mon Sep 17 00:00:00 2001
From: chengf <cgf12138@163.com>
Date: 星期六, 11 十月 2025 18:46:52 +0800
Subject: [PATCH] 修复仁智企对话保存功能问题
---
aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/config/DifyStreamClient.java | 450 +++++++++++++++++++++++++++++++++++++------------------
1 files changed, 303 insertions(+), 147 deletions(-)
diff --git a/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/config/DifyStreamClient.java b/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/config/DifyStreamClient.java
index 580a30e..9999eff 100644
--- a/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/config/DifyStreamClient.java
+++ b/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/config/DifyStreamClient.java
@@ -1,35 +1,26 @@
package tech.aiflowy.ai.config;
-import com.agentsflex.core.llm.response.AiMessageResponse;
import com.agentsflex.core.message.AiMessage;
-import com.agentsflex.core.message.Message;
import com.agentsflex.core.prompt.HistoriesPrompt;
-import com.agentsflex.core.prompt.Prompt;
import com.alibaba.fastjson.JSON;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.*;
import com.mybatisflex.core.query.QueryWrapper;
import okhttp3.*;
import okio.BufferedSource;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
+import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
-import tech.aiflowy.ai.entity.AiBotConversationMessage;
import tech.aiflowy.ai.entity.AiBotMessage;
-import tech.aiflowy.ai.service.AiBotConversationMessageService;
import tech.aiflowy.ai.service.AiBotMessageService;
-import tech.aiflowy.ai.service.impl.AiBotMessageServiceImpl;
import tech.aiflowy.common.ai.MySseEmitter;
-import tech.aiflowy.common.util.StringUtil;
-import tech.aiflowy.common.web.controller.BaseCurdController;
-import javax.annotation.Resource;
+import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
public class DifyStreamClient {
private final OkHttpClient client;
private final String apiUrl;
@@ -37,45 +28,44 @@
private final Gson gson;
private String prompt;
private AiBotMessageService aiBotMessageService;
+ boolean blean = false;
public DifyStreamClient(String apiUrl, String apiKey, AiBotMessageService aiBotMessageService) {
this.apiUrl = apiUrl;
this.apiKey = apiKey;
this.gson = new GsonBuilder().setPrettyPrinting().create();
- this.client = new OkHttpClient.Builder().build();
+ this.client = new OkHttpClient.Builder().connectTimeout(60, TimeUnit.SECONDS) // 杩炴帴瓒呮椂
+ .readTimeout(120, TimeUnit.SECONDS).build();
this.aiBotMessageService = aiBotMessageService;
}
- // 娴佸紡鑱婂ぉ鏂规硶 - 鐩存帴闆嗘垚SseEmitter
- public CompletableFuture<Void> chatStream(String message, String userId, MySseEmitter emitter, String sessionId, BigInteger botId) {
- prompt = message;
- QueryWrapper qw = new QueryWrapper();
- qw.eq(AiBotMessage::getSessionId, sessionId)
- .orderBy(AiBotMessage::getId,false)
- .limit(6);
-
- List<AiBotMessage> history = aiBotMessageService.list(qw);
-// System.out.println("======================history==================\n");
-// for (AiBotMessage aiBotMessage : history) {
-// System.out.println(aiBotMessage.toMessage());
-// }
-// System.out.println("\n======================history==================");
-
+ // 鍦―ifyStreamClient绫讳腑娣诲姞浠ヤ笅鏂规硶
+ public CompletableFuture<Void> runWorkflow(Map<String, Object> inputs, String message, String userId, MySseEmitter emitter, String sessionId, BigInteger botId) {
// 鏋勫缓璇锋眰JSON
JsonObject requestBody = new JsonObject();
- requestBody.add("inputs", new JsonObject());
- requestBody.addProperty("query", message);
- requestBody.addProperty("response_mode", "streaming");
- requestBody.addProperty("conversation_id", "");
- requestBody.addProperty("user", userId);
- requestBody.add("files", new JsonArray());
- // 娣诲姞鍘嗗彶瀵硅瘽淇℃伅
- JsonArray historyArray = new JsonArray();
- for (AiBotMessage msg : history) {
- historyArray.add(String.valueOf(msg));
+ // 娣诲姞inputs鍙傛暟
+ JsonObject inputsJson = new JsonObject();
+ if (inputs != null) {
+ for (Map.Entry<String, Object> entry : inputs.entrySet()) {
+ if (entry.getValue() instanceof String) {
+ inputsJson.addProperty(entry.getKey(), (String) entry.getValue());
+ } else if (entry.getValue() instanceof Number) {
+ inputsJson.addProperty(entry.getKey(), (Number) entry.getValue());
+ } else if (entry.getValue() instanceof Boolean) {
+ inputsJson.addProperty(entry.getKey(), (Boolean) entry.getValue());
+ } else {
+ // 瀵逛簬澶嶆潅瀵硅薄锛岃浆鎹负JSON瀛楃涓�
+ inputsJson.add(entry.getKey(), gson.toJsonTree(entry.getValue()));
+ }
+ }
}
- requestBody.add("history", historyArray);
+ requestBody.add("inputs", inputsJson);
+ // 璁剧疆鍝嶅簲妯″紡鍜岀敤鎴稩D
+ requestBody.addProperty("response_mode", "streaming");
+ requestBody.addProperty("user", userId);
+// System.out.println(requestBody+"==============================================================================================");
+ // 鍒涘缓璇锋眰
RequestBody body = RequestBody.create(
gson.toJson(requestBody),
MediaType.parse("application/json; charset=utf-8")
@@ -118,6 +108,234 @@
@Override
public void onResponse(Call call, Response response) {
+ StringBuffer sb = new StringBuffer();
+ try (ResponseBody responseBody = response.body()) {
+ if (!response.isSuccessful()) {
+ emitter.completeWithError(new IOException("API閿欒: " + response.code()));
+ return;
+ }
+
+ // 浣跨敤BufferedSource閫愯璇诲彇鍝嶅簲鍐呭
+ BufferedSource source = responseBody.source();
+ String line;
+
+ while ((line = source.readUtf8Line()) != null) {
+ if (line.startsWith("data: ")) {
+ String data = line.substring(6).trim();
+
+ // 蹇界暐绌烘暟鎹垨缁撴潫鏍囪
+ if (data.isEmpty() || data.equals("[DONE]")) {
+ continue;
+ }
+
+ try {
+ // 杩欓噷闇�瑕佹牴鎹疄闄匒PI杩斿洖缁撴瀯璋冩暣
+ // 鍋囪API杩斿洖鐨勬牸寮忔槸{ "output": "娑堟伅鍐呭" }
+ JsonObject jsonObject = gson.fromJson(data, JsonObject.class);
+ String title = null;
+// System.out.println(jsonObject);
+ if (jsonObject != null && jsonObject.has("data")) {
+ JsonElement dataElement = jsonObject.get("data");
+ if (dataElement != null && !dataElement.isJsonNull()) {
+ JsonObject dataObject = dataElement.getAsJsonObject();
+ if (dataObject != null && dataObject.has("node_type")) {
+ continue;
+ }
+ if (dataObject != null && dataObject.has("title")) {
+ JsonElement titleElement = dataObject.get("title");
+ if (titleElement != null && !titleElement.isJsonNull()) {
+ title = titleElement.getAsString();
+ }
+ }
+ else if (dataObject != null && dataObject.has("text")) {
+ JsonElement titleElement = dataObject.get("text");
+ if (titleElement != null && !titleElement.isJsonNull()) {
+ title = titleElement.getAsString();
+ }
+ }else if (dataObject != null && dataObject.has("outputs")) {
+ JsonElement titleElement = dataObject.get("outputs");
+ if (titleElement != null && !titleElement.isJsonNull()) {
+
+ AiBotMessage aiBotMessage = new AiBotMessage();
+ aiBotMessage.setBotId(botId);
+ aiBotMessage.setSessionId(sessionId);
+ aiBotMessage.setAccountId(new BigInteger(userId));
+ aiBotMessage.setRole("assistant");
+ aiBotMessage.setContent(sb.toString());
+ aiBotMessage.setCreated(new Date());
+ aiBotMessage.setIsExternalMsg(1);
+ aiBotMessageService.save(aiBotMessage);
+// dataObject = titleElement.getAsJsonObject();
+// if (dataObject != null && dataObject.has("text")) {
+// titleElement = dataObject.get("text");
+// if (titleElement != null && !titleElement.isJsonNull()) {
+// title = titleElement.getAsString();
+// }
+// }else if (dataObject != null && dataObject.has("data")) {
+// titleElement = dataObject.get("data");
+// if (titleElement != null && !titleElement.isJsonNull()) {
+// title = titleElement.getAsString();
+// }
+// }
+ }
+ }
+ }
+ }
+
+ // 鍒涘缓娑堟伅瀵硅薄骞跺彂閫佺粰鍓嶇
+ AiMessage aiMessage = new AiMessage();
+ aiMessage.setContent(title);
+ System.out.println(gson.fromJson(data, JsonObject.class));
+ sb.append(aiMessage.getContent());
+ // 灏嗘秷鎭彂閫佺粰鍓嶇
+ if (aiMessage.getContent() != null){
+ emitter.send(JSON.toJSONString(aiMessage));
+ }
+
+ } catch (Exception e) {
+ // 璁板綍瑙f瀽閿欒浣嗙户缁鐞嗗悗缁暟鎹�
+ System.err.println("瑙f瀽鍝嶅簲鏁版嵁鏃跺嚭閿�: " + e.getMessage());
+ emitter.completeWithError(e);
+ }
+ }
+ }
+
+ // 鎵�鏈夋暟鎹鐞嗗畬姣曪紝鍙戦�佸畬鎴愪俊鍙�
+ emitter.send(SseEmitter.event().name("complete"));
+ emitter.complete();
+
+ } catch (IOException e) {
+ emitter.completeWithError(e);
+ emitter.complete();
+ } catch (Exception e) {
+ // 澶勭悊鍏朵粬寮傚父
+ emitter.completeWithError(e);
+ }
+ }
+ });
+
+ return future;
+ }
+
+ public String fileUpload(String userId, MultipartFile file) {
+ // 鐢ㄦ埛鏍囪瘑锛岃鍜屽彂閫佹秷鎭帴鍙g殑 user 淇濇寔涓�鑷�
+ String user = userId;
+
+ // 鍒ゆ柇鏂囦欢鏄惁涓虹┖
+ if (file.isEmpty()) {
+ System.out.println("涓婁紶鏂囦欢涓虹┖");
+ return "涓婁紶鏂囦欢涓虹┖";
+ }
+
+ OkHttpClient client = new OkHttpClient();
+
+ // 鏋勫缓 multipart/form-data 璇锋眰浣�
+ MultipartBody.Builder requestBodyBuilder = null;
+ try {
+ requestBodyBuilder = new MultipartBody.Builder()
+ .setType(MultipartBody.FORM)
+ // 娣诲姞涓婁紶鏂囦欢锛岃繖閲岀洿鎺ョ敤 MultipartFile 鐨勫瓧鑺傛暟缁勬瀯寤鸿姹備綋
+ .addFormDataPart("file", file.getOriginalFilename(),
+ RequestBody.create(MediaType.parse("application/octet-stream"), file.getBytes()))
+ .addFormDataPart("user", user);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ RequestBody requestBody = requestBodyBuilder.build();
+
+ Request request = new Request.Builder()
+ .url(apiUrl) // 鏇挎崲涓哄疄闄呯殑鎺ュ彛鍦板潃甯搁噺
+ .post(requestBody)
+ .header("Authorization", apiKey) // 鏇挎崲涓哄疄闄呯殑鎺堟潈瀵嗛挜甯搁噺
+ .build();
+
+ try (Response response = client.newCall(request).execute()) {
+ if (!response.isSuccessful()) {
+ System.out.println("璇锋眰澶辫触锛岀姸鎬佺爜锛�" + response.code());
+ return user;
+ }
+ String responseBody = response.body().string();
+ System.out.println("涓婁紶缁撴灉锛�" + responseBody);
+ return responseBody;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return "鏂囦欢涓婁紶澶辫触锛�" + e.getMessage();
+ }
+ }
+
+ // 娴佸紡鑱婂ぉ鏂规硶 - 鐩存帴闆嗘垚SseEmitter
+ public CompletableFuture<Void> chatStream(String message, String userId, MySseEmitter emitter, String sessionId, BigInteger botId) {
+ prompt = message;
+ QueryWrapper qw = new QueryWrapper();
+ qw.eq(AiBotMessage::getSessionId, sessionId)
+ .orderBy(AiBotMessage::getId,false)
+ .limit(6);
+
+ List<AiBotMessage> history = aiBotMessageService.list(qw);
+// System.out.println("======================history==================\n");
+// for (AiBotMessage aiBotMessage : history) {
+// System.out.println(aiBotMessage.toMessage());
+// }
+// System.out.println("\n======================history==================");
+
+ // 鏋勫缓璇锋眰JSON
+ JsonObject requestBody = new JsonObject();
+ requestBody.add("inputs", new JsonObject());
+ requestBody.addProperty("query", message);
+ requestBody.addProperty("response_mode", "streaming");
+ requestBody.addProperty("conversation_id", "");
+ requestBody.addProperty("user", userId);
+// requestBody.add("files", new JsonArray());
+ // 娣诲姞鍘嗗彶瀵硅瘽淇℃伅
+ JsonArray historyArray = new JsonArray();
+// for (AiBotMessage msg : history) {
+// historyArray.add(String.valueOf(msg));
+// }
+// requestBody.add("history", historyArray);
+
+ RequestBody body = RequestBody.create(
+ gson.toJson(requestBody),
+ MediaType.parse("application/json; charset=utf-8")
+ );
+
+ Request request = new Request.Builder()
+ .url(apiUrl)
+ .post(body)
+ .header("Authorization", apiKey)
+ .header("Content-Type", "application/json")
+ .build();
+
+ CompletableFuture<Void> future = new CompletableFuture<>();
+
+ // 璁剧疆SseEmitter鐢熷懡鍛ㄦ湡鍥炶皟
+ emitter.onTimeout(() -> {
+ System.out.println("SSE杩炴帴瓒呮椂");
+ emitter.complete();
+ future.complete(null);
+ });
+
+ emitter.onCompletion(() -> {
+ System.out.println("SSE杩炴帴宸插畬鎴�");
+ future.complete(null);
+ });
+
+ emitter.onError(e -> {
+ System.out.println("SSE杩炴帴閿欒: " + e.getMessage());
+ emitter.completeWithError(e);
+ future.completeExceptionally(e);
+ });
+ // 鍙戦�佸紓姝ヨ姹�
+ client.newCall(request).enqueue(new Callback() {
+ @Override
+ public void onFailure(Call call, IOException e) {
+ emitter.completeWithError(e);
+ future.completeExceptionally(e);
+ }
+
+ @Override
+ public void onResponse(Call call, Response response) {
+ int a = 1;
AiMessage aiMessage = new AiMessage();
com.agentsflex.core.llm.response.AiMessageResponse aiMessageResponse
= new com.agentsflex.core.llm.response.AiMessageResponse(new HistoriesPrompt(), response.message(), aiMessage);
@@ -148,34 +366,67 @@
// 杩欓噷闇�瑕佹牴鎹叿浣揂PI璋冩暣璺緞
JsonObject messageObj = gson.fromJson(data, JsonObject.class);
// System.out.println(messageObj);
- if(messageObj.get("event").getAsString().equals("message_end")){
+ if(!messageObj.has("answer")){
+ try {
+ JsonArray asJsonArray = messageObj.getAsJsonObject("metadata").getAsJsonArray("retriever_resources");
+ if (asJsonArray.size() > 0) {
+ aiMessage.setFullContent("-----------------------");
+ sb.append("\n"+aiMessage.getFullContent());
+ emitter.send(JSON.toJSONString(aiMessage));
+ for (int i = 0; i < asJsonArray.size(); i++) {
+ aiMessage.setFullContent(asJsonArray.get(i).getAsJsonObject().get("document_name").getAsString());
+ aiMessage.setContent(null);
+// aiMessageResponse.setMessage(aiMessage);
+ sb.append("\n"+aiMessage.getFullContent());
+ emitter.send(JSON.toJSONString(aiMessage));
+ }
+ }
+ } catch (Exception e) {
+ System.out.println("meizuo");
+ }
AiBotMessage aiBotMessage = new AiBotMessage();
aiBotMessage.setBotId(botId);
aiBotMessage.setSessionId(sessionId);
aiBotMessage.setAccountId(new BigInteger(userId));
aiBotMessage.setRole("assistant");
+ String content = aiBotMessage.getContent();
aiBotMessage.setContent(sb.toString());
aiBotMessage.setCreated(new Date());
aiBotMessage.setIsExternalMsg(1);
- aiBotMessageService.save(aiBotMessage);
+ if(a == 1){
+ a = 0;
+ aiBotMessageService.save(aiBotMessage);
+ }else{
+ QueryWrapper qw = new QueryWrapper();
+ qw.eq("content", content);
+ aiBotMessageService.remove(qw);
+ aiBotMessageService.save(aiBotMessage);
+ }
// System.out.println("end");
}else{
String context = messageObj.get("answer").getAsString();
+// System.out.println(context);
if (context != null) {
// // 鍙Щ闄TML鏍囩锛屼繚鐣橫arkdown鐗规畩瀛楃
// context = context.replaceAll("(?i)<[^>]*>", "");
context = context.replaceFirst("Thinking...", "");
+ if(context.startsWith("<details")){
+ context = context.replaceAll("(?i)<[^>]*>", "");
+ }
}
- sb.append(context);
aiMessage.setContent(context);
aiMessageResponse.setMessage(aiMessage);
- if (StringUtil.hasText(messageObj.get("answer").getAsString())) {
-// System.out.println(aiMessage);
- if(aiMessage.getContent().startsWith("</details>")){
- aiMessage.setContent(aiMessage.getContent().replaceAll("(?i)<[^>]*>", "</details>"+"\n\n"));
- }
- // 鍙戦�佹秷鎭墖娈电粰鍓嶇
- emitter.send(JSON.toJSONString(aiMessage));
+ if (!messageObj.get("answer").getAsString().isEmpty()) {
+// if(!blean && aiMessage.getContent().startsWith("</details>")){
+// blean = true;
+// aiMessage.setContent(aiMessage.getContent().replaceAll("(?i)<[^>]*>", "\n\n"));
+// }
+// if(blean){
+ sb.append(aiMessage.getContent());
+// System.out.println(aiMessage);
+ // 鍙戦�佹秷鎭墖娈电粰鍓嶇
+ emitter.send(JSON.toJSONString(aiMessage));
+// }
}
}
@@ -199,101 +450,6 @@
} catch (Exception e) {
// 澶勭悊鍏朵粬寮傚父
emitter.completeWithError(e);
- }
- }
-
-// @Override
- public void onResponses(Call call, Response response) {
-// try (ResponseBody responseBody = response.body()) {
-// if (!response.isSuccessful() || responseBody == null) {
-// IOException e = new IOException("Unexpected code " + response);
-// emitter.completeWithError(e);
-// future.completeExceptionally(e);
-// return;
-// }
-//// String rawResponse = responseBody.string(); // 鎵撳嵃鍘熷鍝嶅簲
-//// System.out.println("Dify鍘熷鍝嶅簲: " + rawResponse);
-//
-// BufferedSource source = responseBody.source();
-// String line;
-//
-// // 澶勭悊娴佸紡鍝嶅簲
-// while ((line = source.readUtf8Line()) != null) {
-// if (line.startsWith("data: ")) {
-// String data = line.substring(6); // 绉婚櫎"data: "鍓嶇紑
-//
-// // 璺宠繃绌烘暟鎹�
-// if (data.trim().equals("[DONE]")) {
-// emitter.complete();
-// future.complete(null);
-// break;
-// }
-//
-// try {
-// // 瑙f瀽JSON鍝嶅簲
-// JsonObject jsonResponse = gson.fromJson(data, JsonObject.class);
-// // 鍙戦�佹秷鎭唴瀹瑰埌瀹㈡埛绔�
-// if (jsonResponse.has("message") && jsonResponse.get("message").isJsonObject()) {
-// String content = jsonResponse.getAsJsonObject("message").get("content").getAsString();
-// emitter.send(SseEmitter.event()
-// .data(content)
-// .name("message")
-// );
-// }
-//
-// // 澶勭悊鍑芥暟璋冪敤
-// if (jsonResponse.has("function_call") && jsonResponse.get("function_call").isJsonObject()) {
-// emitter.send(SseEmitter.event()
-// .data(jsonResponse.get("function_call"))
-// .name("function_call")
-// );
-// }
-//
-// } catch (Exception e) {
-// emitter.completeWithError(e);
-// future.completeExceptionally(e);
-// break;
-// }
-// }
-// }
- try (ResponseBody responseBody = response.body()) {
- if (!response.isSuccessful()) {
- emitter.completeWithError(new IOException("API閿欒"));
- return;
- }
- StringBuilder fullAnswer = new StringBuilder();
- BufferedSource source = responseBody.source();
- String line;
- while ((line = source.readUtf8Line()) != null) {
- if (line.startsWith("data: ")) {
- String data = line.substring(6).trim();
-
- if (data.equals("[DONE]")) {
- // 鍙戦�佸畬鏁村洖绛斿苟缁撴潫娴�
- emitter.send(fullAnswer.toString());
- emitter.complete();
- break;
- }
-
- try {
- JsonObject json = gson.fromJson(data, JsonObject.class);
- System.out.println(json);
- String answerFragment = json.get("answer").getAsString();
- // 杩囨护鏃犳晥瀛楃锛堝锔忊儯銆乗n锛�
- String cleanedFragment = answerFragment.replaceAll("[^\u4e00-\u9fa50-9a-zA-Z\\s\\p{Punct}]", "");
- fullAnswer.append(cleanedFragment); // 鎷兼帴纰庣墖
-
- // 鍙�夛細姣忔嫾鎺ヤ竴閮ㄥ垎鍙戦�佺粰鍓嶇锛堥渶鍓嶇鏀寔澧為噺娓叉煋锛�
- // emitter.send(SseEmitter.event().data(cleanedFragment));
-
- } catch (Exception e) {
- emitter.sendAndComplete("瑙f瀽閿欒: " + e.getMessage());
- }
- }
- }
- } catch (Exception e) {
- emitter.completeWithError(e);
- future.completeExceptionally(e);
}
}
});
--
Gitblit v1.8.0