chengf
2025-10-11 a58221e75d2e38172533e17b6590983a9b3cabb9
aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/config/DifyStreamClient.java
@@ -19,6 +19,8 @@
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class DifyStreamClient {
    private final OkHttpClient client;
    private final String apiUrl;
@@ -32,7 +34,8 @@
        this.apiUrl = apiUrl;
        this.apiKey = apiKey;
        this.gson = new GsonBuilder().setPrettyPrinting().create();
        this.client = new OkHttpClient.Builder().build();
        this.client = new OkHttpClient.Builder().connectTimeout(60, TimeUnit.SECONDS) // 连接超时
                .readTimeout(120, TimeUnit.SECONDS).build();
        this.aiBotMessageService = aiBotMessageService;
    }
@@ -57,11 +60,11 @@
            }
        }
        requestBody.add("inputs", inputsJson);
        // 设置响应模式和用户ID
        requestBody.addProperty("response_mode", "streaming");
        requestBody.addProperty("user", userId);
//        System.out.println(requestBody+"==============================================================================================");
        // 创建请求
        RequestBody body = RequestBody.create(
                gson.toJson(requestBody),
@@ -105,6 +108,7 @@
            @Override
            public void onResponse(Call call, Response response) {
                StringBuffer sb = new StringBuffer();
                try (ResponseBody responseBody = response.body()) {
                    if (!response.isSuccessful()) {
                        emitter.completeWithError(new IOException("API错误: " + response.code()));
@@ -129,6 +133,7 @@
                                // 假设API返回的格式是{ "output": "消息内容" }
                                JsonObject jsonObject = gson.fromJson(data, JsonObject.class);
                                String title = null;
//                                System.out.println(jsonObject);
                                if (jsonObject != null && jsonObject.has("data")) {
                                    JsonElement dataElement = jsonObject.get("data");
                                    if (dataElement != null && !dataElement.isJsonNull()) {
@@ -142,10 +147,36 @@
                                                title = titleElement.getAsString();
                                            }
                                        }
                                        if (dataObject != null && dataObject.has("text")) {
                                        else if (dataObject != null && dataObject.has("text")) {
                                            JsonElement titleElement = dataObject.get("text");
                                            if (titleElement != null && !titleElement.isJsonNull()) {
                                                title = titleElement.getAsString();
                                            }
                                        }else if (dataObject != null && dataObject.has("outputs")) {
                                            JsonElement titleElement = dataObject.get("outputs");
                                            if (titleElement != null && !titleElement.isJsonNull()) {
                                                AiBotMessage aiBotMessage = new AiBotMessage();
                                                aiBotMessage.setBotId(botId);
                                                aiBotMessage.setSessionId(sessionId);
                                                aiBotMessage.setAccountId(new BigInteger(userId));
                                                aiBotMessage.setRole("assistant");
                                                aiBotMessage.setContent(sb.toString());
                                                aiBotMessage.setCreated(new Date());
                                                aiBotMessage.setIsExternalMsg(1);
                                                aiBotMessageService.save(aiBotMessage);
//                                                dataObject = titleElement.getAsJsonObject();
//                                                if (dataObject != null && dataObject.has("text")) {
//                                                    titleElement = dataObject.get("text");
//                                                    if (titleElement != null && !titleElement.isJsonNull()) {
//                                                        title = titleElement.getAsString();
//                                                    }
//                                                }else if (dataObject != null && dataObject.has("data")) {
//                                                    titleElement = dataObject.get("data");
//                                                    if (titleElement != null && !titleElement.isJsonNull()) {
//                                                        title = titleElement.getAsString();
//                                                    }
//                                                }
                                            }
                                        }
                                    }
@@ -155,8 +186,11 @@
                                AiMessage aiMessage = new AiMessage();
                                aiMessage.setContent(title);
                                System.out.println(gson.fromJson(data, JsonObject.class));
                                sb.append(aiMessage.getContent());
                                // 将消息发送给前端
                                emitter.send(JSON.toJSONString(aiMessage));
                                if (aiMessage.getContent() != null){
                                    emitter.send(JSON.toJSONString(aiMessage));
                                }
                            } catch (Exception e) {
                                // 记录解析错误但继续处理后续数据
@@ -252,13 +286,13 @@
        requestBody.addProperty("response_mode", "streaming");
        requestBody.addProperty("conversation_id", "");
        requestBody.addProperty("user", userId);
        requestBody.add("files", new JsonArray());
//        requestBody.add("files", new JsonArray());
        // 添加历史对话信息
        JsonArray historyArray = new JsonArray();
        for (AiBotMessage msg : history) {
            historyArray.add(String.valueOf(msg));
        }
        requestBody.add("history", historyArray);
//        for (AiBotMessage msg : history) {
//            historyArray.add(String.valueOf(msg));
//        }
//        requestBody.add("history", historyArray);
        RequestBody body = RequestBody.create(
                gson.toJson(requestBody),
@@ -301,6 +335,7 @@
            @Override
            public void onResponse(Call call, Response response) {
                int a = 1;
                AiMessage aiMessage = new AiMessage();
                com.agentsflex.core.llm.response.AiMessageResponse aiMessageResponse
                        = new com.agentsflex.core.llm.response.AiMessageResponse(new HistoriesPrompt(), response.message(), aiMessage);
@@ -331,16 +366,42 @@
                                // 这里需要根据具体API调整路径
                                JsonObject messageObj = gson.fromJson(data, JsonObject.class);
//                                System.out.println(messageObj);
                                if(messageObj.get("event").getAsString().equals("message_end")){
                                if(!messageObj.has("answer")){
                                    try {
                                        JsonArray asJsonArray = messageObj.getAsJsonObject("metadata").getAsJsonArray("retriever_resources");
                                        if (asJsonArray.size() > 0) {
                                            aiMessage.setFullContent("-----------------------");
                                            sb.append("\n"+aiMessage.getFullContent());
                                            emitter.send(JSON.toJSONString(aiMessage));
                                            for (int i = 0; i < asJsonArray.size(); i++) {
                                                aiMessage.setFullContent(asJsonArray.get(i).getAsJsonObject().get("document_name").getAsString());
                                                aiMessage.setContent(null);
//                                                aiMessageResponse.setMessage(aiMessage);
                                                sb.append("\n"+aiMessage.getFullContent());
                                                emitter.send(JSON.toJSONString(aiMessage));
                                            }
                                        }
                                    } catch (Exception e) {
                                        System.out.println("meizuo");
                                    }
                                    AiBotMessage aiBotMessage = new AiBotMessage();
                                    aiBotMessage.setBotId(botId);
                                    aiBotMessage.setSessionId(sessionId);
                                    aiBotMessage.setAccountId(new BigInteger(userId));
                                    aiBotMessage.setRole("assistant");
                                    String content = aiBotMessage.getContent();
                                    aiBotMessage.setContent(sb.toString());
                                    aiBotMessage.setCreated(new Date());
                                    aiBotMessage.setIsExternalMsg(1);
                                    aiBotMessageService.save(aiBotMessage);
                                    if(a == 1){
                                        a = 0;
                                        aiBotMessageService.save(aiBotMessage);
                                    }else{
                                        QueryWrapper qw = new QueryWrapper();
                                        qw.eq("content", content);
                                        aiBotMessageService.remove(qw);
                                        aiBotMessageService.save(aiBotMessage);
                                    }
//                                    System.out.println("end");
                                }else{
                                    String context = messageObj.get("answer").getAsString();
@@ -356,16 +417,16 @@
                                    aiMessage.setContent(context);
                                    aiMessageResponse.setMessage(aiMessage);
                                    if (!messageObj.get("answer").getAsString().isEmpty()) {
                                        if(!blean && aiMessage.getContent().startsWith("</details>")){
                                            blean = true;
                                            aiMessage.setContent(aiMessage.getContent().replaceAll("(?i)<[^>]*>", "\n\n"));
                                        }
                                        if(blean){
//                                        if(!blean && aiMessage.getContent().startsWith("</details>")){
//                                            blean = true;
//                                            aiMessage.setContent(aiMessage.getContent().replaceAll("(?i)<[^>]*>", "\n\n"));
//                                        }
//                                        if(blean){
                                            sb.append(aiMessage.getContent());
//                                            System.out.println(aiMessage);
                                            // 发送消息片段给前端
                                            emitter.send(JSON.toJSONString(aiMessage));
                                        }
//                                        }
                                    }
                                }