From 3732bde983f34f5cb729ac1724b9795fb97d8941 Mon Sep 17 00:00:00 2001
From: admin <cgf12138@163.com>
Date: 星期六, 07 六月 2025 18:12:05 +0800
Subject: [PATCH] 0606
---
aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/config/DifyStreamClient.java | 321 +++++++++++++++++++++++++++++++++-------------------
1 files changed, 203 insertions(+), 118 deletions(-)
diff --git a/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/config/DifyStreamClient.java b/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/config/DifyStreamClient.java
index 580a30e..420a25a 100644
--- a/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/config/DifyStreamClient.java
+++ b/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/config/DifyStreamClient.java
@@ -1,31 +1,18 @@
package tech.aiflowy.ai.config;
-import com.agentsflex.core.llm.response.AiMessageResponse;
import com.agentsflex.core.message.AiMessage;
-import com.agentsflex.core.message.Message;
import com.agentsflex.core.prompt.HistoriesPrompt;
-import com.agentsflex.core.prompt.Prompt;
import com.alibaba.fastjson.JSON;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
+import com.google.gson.*;
import com.mybatisflex.core.query.QueryWrapper;
import okhttp3.*;
import okio.BufferedSource;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
-import tech.aiflowy.ai.entity.AiBotConversationMessage;
import tech.aiflowy.ai.entity.AiBotMessage;
-import tech.aiflowy.ai.service.AiBotConversationMessageService;
import tech.aiflowy.ai.service.AiBotMessageService;
-import tech.aiflowy.ai.service.impl.AiBotMessageServiceImpl;
import tech.aiflowy.common.ai.MySseEmitter;
-import tech.aiflowy.common.util.StringUtil;
-import tech.aiflowy.common.web.controller.BaseCurdController;
-import javax.annotation.Resource;
+import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.*;
@@ -37,6 +24,7 @@
private final Gson gson;
private String prompt;
private AiBotMessageService aiBotMessageService;
+ boolean blean = false;
public DifyStreamClient(String apiUrl, String apiKey, AiBotMessageService aiBotMessageService) {
this.apiUrl = apiUrl;
@@ -44,6 +32,192 @@
this.gson = new GsonBuilder().setPrettyPrinting().create();
this.client = new OkHttpClient.Builder().build();
this.aiBotMessageService = aiBotMessageService;
+ }
+
+ // 鍦―ifyStreamClient绫讳腑娣诲姞浠ヤ笅鏂规硶
+ public CompletableFuture<Void> runWorkflow(Map<String, Object> inputs, String message, String userId, MySseEmitter emitter, String sessionId, BigInteger botId) {
+ // 鏋勫缓璇锋眰JSON
+ JsonObject requestBody = new JsonObject();
+ // 娣诲姞inputs鍙傛暟
+ JsonObject inputsJson = new JsonObject();
+ if (inputs != null) {
+ for (Map.Entry<String, Object> entry : inputs.entrySet()) {
+ if (entry.getValue() instanceof String) {
+ inputsJson.addProperty(entry.getKey(), (String) entry.getValue());
+ } else if (entry.getValue() instanceof Number) {
+ inputsJson.addProperty(entry.getKey(), (Number) entry.getValue());
+ } else if (entry.getValue() instanceof Boolean) {
+ inputsJson.addProperty(entry.getKey(), (Boolean) entry.getValue());
+ } else {
+ // 瀵逛簬澶嶆潅瀵硅薄锛岃浆鎹负JSON瀛楃涓�
+ inputsJson.add(entry.getKey(), gson.toJsonTree(entry.getValue()));
+ }
+ }
+ }
+ requestBody.add("inputs", inputsJson);
+
+ // 璁剧疆鍝嶅簲妯″紡鍜岀敤鎴稩D
+ requestBody.addProperty("response_mode", "streaming");
+ requestBody.addProperty("user", userId);
+
+ // 鍒涘缓璇锋眰
+ RequestBody body = RequestBody.create(
+ gson.toJson(requestBody),
+ MediaType.parse("application/json; charset=utf-8")
+ );
+
+ Request request = new Request.Builder()
+ .url(apiUrl)
+ .post(body)
+ .header("Authorization", apiKey)
+ .header("Content-Type", "application/json")
+ .build();
+
+ CompletableFuture<Void> future = new CompletableFuture<>();
+
+ // 璁剧疆SseEmitter鐢熷懡鍛ㄦ湡鍥炶皟
+ emitter.onTimeout(() -> {
+ System.out.println("SSE杩炴帴瓒呮椂");
+ emitter.complete();
+ future.complete(null);
+ });
+
+ emitter.onCompletion(() -> {
+ System.out.println("SSE杩炴帴宸插畬鎴�");
+ future.complete(null);
+ });
+
+ emitter.onError(e -> {
+ System.out.println("SSE杩炴帴閿欒: " + e.getMessage());
+ emitter.completeWithError(e);
+ future.completeExceptionally(e);
+ });
+
+ // 鍙戦�佸紓姝ヨ姹�
+ client.newCall(request).enqueue(new Callback() {
+ @Override
+ public void onFailure(Call call, IOException e) {
+ emitter.completeWithError(e);
+ future.completeExceptionally(e);
+ }
+
+ @Override
+ public void onResponse(Call call, Response response) {
+ try (ResponseBody responseBody = response.body()) {
+ if (!response.isSuccessful()) {
+ emitter.completeWithError(new IOException("API閿欒: " + response.code()));
+ return;
+ }
+
+ // 浣跨敤BufferedSource閫愯璇诲彇鍝嶅簲鍐呭
+ BufferedSource source = responseBody.source();
+ String line;
+
+ while ((line = source.readUtf8Line()) != null) {
+ if (line.startsWith("data: ")) {
+ String data = line.substring(6).trim();
+
+ // 蹇界暐绌烘暟鎹垨缁撴潫鏍囪
+ if (data.isEmpty() || data.equals("[DONE]")) {
+ continue;
+ }
+
+ try {
+ // 杩欓噷闇�瑕佹牴鎹疄闄匒PI杩斿洖缁撴瀯璋冩暣
+ // 鍋囪API杩斿洖鐨勬牸寮忔槸{ "output": "娑堟伅鍐呭" }
+ JsonObject jsonObject = gson.fromJson(data, JsonObject.class);
+ String title = null;
+ if (jsonObject != null && jsonObject.has("data")) {
+ JsonElement dataElement = jsonObject.get("data");
+ if (dataElement != null && !dataElement.isJsonNull()) {
+ JsonObject dataObject = dataElement.getAsJsonObject();
+ if (dataObject != null && dataObject.has("title")) {
+ JsonElement titleElement = dataObject.get("title");
+ if (titleElement != null && !titleElement.isJsonNull()) {
+ title = titleElement.getAsString();
+ }
+ }
+ if (dataObject != null && dataObject.has("text")) {
+ JsonElement titleElement = dataObject.get("text");
+ if (titleElement != null && !titleElement.isJsonNull()) {
+ title = titleElement.getAsString();
+ }
+ }
+ }
+ }
+
+ // 鍒涘缓娑堟伅瀵硅薄骞跺彂閫佺粰鍓嶇
+ AiMessage aiMessage = new AiMessage();
+ aiMessage.setContent(title);
+ System.out.println(gson.fromJson(data, JsonObject.class));
+ // 灏嗘秷鎭彂閫佺粰鍓嶇
+ emitter.send(JSON.toJSONString(aiMessage));
+
+ } catch (Exception e) {
+ // 璁板綍瑙f瀽閿欒浣嗙户缁鐞嗗悗缁暟鎹�
+ System.err.println("瑙f瀽鍝嶅簲鏁版嵁鏃跺嚭閿�: " + e.getMessage());
+ emitter.completeWithError(e);
+ }
+ }
+ }
+
+ // 鎵�鏈夋暟鎹鐞嗗畬姣曪紝鍙戦�佸畬鎴愪俊鍙�
+ emitter.send(SseEmitter.event().name("complete"));
+ emitter.complete();
+
+ } catch (IOException e) {
+ emitter.completeWithError(e);
+ emitter.complete();
+ } catch (Exception e) {
+ // 澶勭悊鍏朵粬寮傚父
+ emitter.completeWithError(e);
+ }
+ }
+ });
+
+ return future;
+ }
+
+ public String fileUpload(String userId, String filePath){
+ // 瑕佷笂浼犵殑鏂囦欢璺緞锛屾浛鎹负瀹為檯鐨勬枃浠惰矾寰�
+// String filePath = "C:\\Users\\admin\\Desktop\\鍥藉姟闄㈡斂绛栨枃浠跺簱.xlsx";
+ // 鐢ㄦ埛鏍囪瘑锛屾浛鎹负瀹為檯鐨勭敤鎴锋爣璇嗭紝瑕佸拰鍙戦�佹秷鎭帴鍙g殑 user 淇濇寔涓�鑷�
+ String user = userId;
+
+ File file = new File(filePath);
+ if (!file.exists()) {
+ System.out.println("鏂囦欢涓嶅瓨鍦細" + filePath);
+ return user;
+ }
+
+ OkHttpClient client = new OkHttpClient();
+
+ // 鏋勫缓 multipart/form-data 璇锋眰浣�
+ RequestBody requestBody = new MultipartBody.Builder()
+ .setType(MultipartBody.FORM)
+ .addFormDataPart("file", file.getName(),
+ RequestBody.create(MediaType.parse("application/octet-stream"), file))
+ .addFormDataPart("user", user)
+ .build();
+
+ Request request = new Request.Builder()
+ .url(apiUrl)
+ .post(requestBody)
+ .header("Authorization", apiKey)
+ .build();
+
+ try (Response response = client.newCall(request).execute()) {
+ if (!response.isSuccessful()) {
+ System.out.println("璇锋眰澶辫触锛岀姸鎬佺爜锛�" + response.code());
+ return user;
+ }
+ String responseBody = response.body().string();
+ System.out.println("涓婁紶缁撴灉锛�" + responseBody);
+ return responseBody;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return e.getMessage();
+ }
}
// 娴佸紡鑱婂ぉ鏂规硶 - 鐩存帴闆嗘垚SseEmitter
@@ -107,7 +281,6 @@
emitter.completeWithError(e);
future.completeExceptionally(e);
});
-
// 鍙戦�佸紓姝ヨ姹�
client.newCall(request).enqueue(new Callback() {
@Override
@@ -161,21 +334,28 @@
// System.out.println("end");
}else{
String context = messageObj.get("answer").getAsString();
+// System.out.println(context);
if (context != null) {
// // 鍙Щ闄TML鏍囩锛屼繚鐣橫arkdown鐗规畩瀛楃
// context = context.replaceAll("(?i)<[^>]*>", "");
context = context.replaceFirst("Thinking...", "");
+ if(context.startsWith("<details")){
+ context = context.replaceAll("(?i)<[^>]*>", "");
+ }
}
- sb.append(context);
aiMessage.setContent(context);
aiMessageResponse.setMessage(aiMessage);
- if (StringUtil.hasText(messageObj.get("answer").getAsString())) {
-// System.out.println(aiMessage);
- if(aiMessage.getContent().startsWith("</details>")){
- aiMessage.setContent(aiMessage.getContent().replaceAll("(?i)<[^>]*>", "</details>"+"\n\n"));
+ if (!messageObj.get("answer").getAsString().isEmpty()) {
+ if(!blean && aiMessage.getContent().startsWith("</details>")){
+ blean = true;
+ aiMessage.setContent(aiMessage.getContent().replaceAll("(?i)<[^>]*>", "\n\n"));
}
- // 鍙戦�佹秷鎭墖娈电粰鍓嶇
- emitter.send(JSON.toJSONString(aiMessage));
+ if(blean){
+ sb.append(aiMessage.getContent());
+// System.out.println(aiMessage);
+ // 鍙戦�佹秷鎭墖娈电粰鍓嶇
+ emitter.send(JSON.toJSONString(aiMessage));
+ }
}
}
@@ -199,101 +379,6 @@
} catch (Exception e) {
// 澶勭悊鍏朵粬寮傚父
emitter.completeWithError(e);
- }
- }
-
-// @Override
- public void onResponses(Call call, Response response) {
-// try (ResponseBody responseBody = response.body()) {
-// if (!response.isSuccessful() || responseBody == null) {
-// IOException e = new IOException("Unexpected code " + response);
-// emitter.completeWithError(e);
-// future.completeExceptionally(e);
-// return;
-// }
-//// String rawResponse = responseBody.string(); // 鎵撳嵃鍘熷鍝嶅簲
-//// System.out.println("Dify鍘熷鍝嶅簲: " + rawResponse);
-//
-// BufferedSource source = responseBody.source();
-// String line;
-//
-// // 澶勭悊娴佸紡鍝嶅簲
-// while ((line = source.readUtf8Line()) != null) {
-// if (line.startsWith("data: ")) {
-// String data = line.substring(6); // 绉婚櫎"data: "鍓嶇紑
-//
-// // 璺宠繃绌烘暟鎹�
-// if (data.trim().equals("[DONE]")) {
-// emitter.complete();
-// future.complete(null);
-// break;
-// }
-//
-// try {
-// // 瑙f瀽JSON鍝嶅簲
-// JsonObject jsonResponse = gson.fromJson(data, JsonObject.class);
-// // 鍙戦�佹秷鎭唴瀹瑰埌瀹㈡埛绔�
-// if (jsonResponse.has("message") && jsonResponse.get("message").isJsonObject()) {
-// String content = jsonResponse.getAsJsonObject("message").get("content").getAsString();
-// emitter.send(SseEmitter.event()
-// .data(content)
-// .name("message")
-// );
-// }
-//
-// // 澶勭悊鍑芥暟璋冪敤
-// if (jsonResponse.has("function_call") && jsonResponse.get("function_call").isJsonObject()) {
-// emitter.send(SseEmitter.event()
-// .data(jsonResponse.get("function_call"))
-// .name("function_call")
-// );
-// }
-//
-// } catch (Exception e) {
-// emitter.completeWithError(e);
-// future.completeExceptionally(e);
-// break;
-// }
-// }
-// }
- try (ResponseBody responseBody = response.body()) {
- if (!response.isSuccessful()) {
- emitter.completeWithError(new IOException("API閿欒"));
- return;
- }
- StringBuilder fullAnswer = new StringBuilder();
- BufferedSource source = responseBody.source();
- String line;
- while ((line = source.readUtf8Line()) != null) {
- if (line.startsWith("data: ")) {
- String data = line.substring(6).trim();
-
- if (data.equals("[DONE]")) {
- // 鍙戦�佸畬鏁村洖绛斿苟缁撴潫娴�
- emitter.send(fullAnswer.toString());
- emitter.complete();
- break;
- }
-
- try {
- JsonObject json = gson.fromJson(data, JsonObject.class);
- System.out.println(json);
- String answerFragment = json.get("answer").getAsString();
- // 杩囨护鏃犳晥瀛楃锛堝锔忊儯銆乗n锛�
- String cleanedFragment = answerFragment.replaceAll("[^\u4e00-\u9fa50-9a-zA-Z\\s\\p{Punct}]", "");
- fullAnswer.append(cleanedFragment); // 鎷兼帴纰庣墖
-
- // 鍙�夛細姣忔嫾鎺ヤ竴閮ㄥ垎鍙戦�佺粰鍓嶇锛堥渶鍓嶇鏀寔澧為噺娓叉煋锛�
- // emitter.send(SseEmitter.event().data(cleanedFragment));
-
- } catch (Exception e) {
- emitter.sendAndComplete("瑙f瀽閿欒: " + e.getMessage());
- }
- }
- }
- } catch (Exception e) {
- emitter.completeWithError(e);
- future.completeExceptionally(e);
}
}
});
--
Gitblit v1.8.0