chengf
2025-10-11 a58221e75d2e38172533e17b6590983a9b3cabb9
aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/config/DifyStreamClient.java
@@ -3,10 +3,12 @@
import com.agentsflex.core.message.AiMessage;
import com.agentsflex.core.prompt.HistoriesPrompt;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.*;
import com.mybatisflex.core.query.QueryWrapper;
import okhttp3.*;
import okio.BufferedSource;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import tech.aiflowy.ai.entity.AiBotMessage;
import tech.aiflowy.ai.service.AiBotMessageService;
@@ -17,6 +19,8 @@
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class DifyStreamClient {
    private final OkHttpClient client;
    private final String apiUrl;
@@ -30,7 +34,8 @@
        this.apiUrl = apiUrl;
        this.apiKey = apiKey;
        this.gson = new GsonBuilder().setPrettyPrinting().create();
        this.client = new OkHttpClient.Builder().build();
        this.client = new OkHttpClient.Builder().connectTimeout(60, TimeUnit.SECONDS) // 连接超时
                .readTimeout(120, TimeUnit.SECONDS).build();
        this.aiBotMessageService = aiBotMessageService;
    }
@@ -55,11 +60,11 @@
            }
        }
        requestBody.add("inputs", inputsJson);
        // 设置响应模式和用户ID
        requestBody.addProperty("response_mode", "streaming");
        requestBody.addProperty("user", userId);
//        System.out.println(requestBody+"==============================================================================================");
        // 创建请求
        RequestBody body = RequestBody.create(
                gson.toJson(requestBody),
@@ -103,6 +108,7 @@
            @Override
            public void onResponse(Call call, Response response) {
                StringBuffer sb = new StringBuffer();
                try (ResponseBody responseBody = response.body()) {
                    if (!response.isSuccessful()) {
                        emitter.completeWithError(new IOException("API错误: " + response.code()));
@@ -127,20 +133,50 @@
                                // 假设API返回的格式是{ "output": "消息内容" }
                                JsonObject jsonObject = gson.fromJson(data, JsonObject.class);
                                String title = null;
//                                System.out.println(jsonObject);
                                if (jsonObject != null && jsonObject.has("data")) {
                                    JsonElement dataElement = jsonObject.get("data");
                                    if (dataElement != null && !dataElement.isJsonNull()) {
                                        JsonObject dataObject = dataElement.getAsJsonObject();
                                        if (dataObject != null && dataObject.has("node_type")) {
                                            continue;
                                        }
                                        if (dataObject != null && dataObject.has("title")) {
                                            JsonElement titleElement = dataObject.get("title");
                                            if (titleElement != null && !titleElement.isJsonNull()) {
                                                title = titleElement.getAsString();
                                            }
                                        }
                                        if (dataObject != null && dataObject.has("text")) {
                                        else if (dataObject != null && dataObject.has("text")) {
                                            JsonElement titleElement = dataObject.get("text");
                                            if (titleElement != null && !titleElement.isJsonNull()) {
                                                title = titleElement.getAsString();
                                            }
                                        }else if (dataObject != null && dataObject.has("outputs")) {
                                            JsonElement titleElement = dataObject.get("outputs");
                                            if (titleElement != null && !titleElement.isJsonNull()) {
                                                AiBotMessage aiBotMessage = new AiBotMessage();
                                                aiBotMessage.setBotId(botId);
                                                aiBotMessage.setSessionId(sessionId);
                                                aiBotMessage.setAccountId(new BigInteger(userId));
                                                aiBotMessage.setRole("assistant");
                                                aiBotMessage.setContent(sb.toString());
                                                aiBotMessage.setCreated(new Date());
                                                aiBotMessage.setIsExternalMsg(1);
                                                aiBotMessageService.save(aiBotMessage);
//                                                dataObject = titleElement.getAsJsonObject();
//                                                if (dataObject != null && dataObject.has("text")) {
//                                                    titleElement = dataObject.get("text");
//                                                    if (titleElement != null && !titleElement.isJsonNull()) {
//                                                        title = titleElement.getAsString();
//                                                    }
//                                                }else if (dataObject != null && dataObject.has("data")) {
//                                                    titleElement = dataObject.get("data");
//                                                    if (titleElement != null && !titleElement.isJsonNull()) {
//                                                        title = titleElement.getAsString();
//                                                    }
//                                                }
                                            }
                                        }
                                    }
@@ -150,8 +186,11 @@
                                AiMessage aiMessage = new AiMessage();
                                aiMessage.setContent(title);
                                System.out.println(gson.fromJson(data, JsonObject.class));
                                sb.append(aiMessage.getContent());
                                // 将消息发送给前端
                                emitter.send(JSON.toJSONString(aiMessage));
                                if (aiMessage.getContent() != null){
                                    emitter.send(JSON.toJSONString(aiMessage));
                                }
                            } catch (Exception e) {
                                // 记录解析错误但继续处理后续数据
@@ -178,32 +217,37 @@
        return future;
    }
    public String fileUpload(String userId, String filePath){
        // 要上传的文件路径,替换为实际的文件路径
//        String filePath = "C:\\Users\\admin\\Desktop\\国务院政策文件库.xlsx";
        // 用户标识,替换为实际的用户标识,要和发送消息接口的 user 保持一致
    public String fileUpload(String userId, MultipartFile file) {
        // 用户标识,要和发送消息接口的 user 保持一致
        String user = userId;
        File file = new File(filePath);
        if (!file.exists()) {
            System.out.println("文件不存在:" + filePath);
            return user;
        // 判断文件是否为空
        if (file.isEmpty()) {
            System.out.println("上传文件为空");
            return "上传文件为空";
        }
        OkHttpClient client = new OkHttpClient();
        // 构建 multipart/form-data 请求体
        RequestBody requestBody = new MultipartBody.Builder()
                .setType(MultipartBody.FORM)
                .addFormDataPart("file", file.getName(),
                        RequestBody.create(MediaType.parse("application/octet-stream"), file))
                .addFormDataPart("user", user)
                .build();
        MultipartBody.Builder requestBodyBuilder = null;
        try {
            requestBodyBuilder = new MultipartBody.Builder()
                    .setType(MultipartBody.FORM)
                    // 添加上传文件,这里直接用 MultipartFile 的字节数组构建请求体
                    .addFormDataPart("file", file.getOriginalFilename(),
                            RequestBody.create(MediaType.parse("application/octet-stream"), file.getBytes()))
                    .addFormDataPart("user", user);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        RequestBody requestBody = requestBodyBuilder.build();
        Request request = new Request.Builder()
                .url(apiUrl)
                .url(apiUrl)  // 替换为实际的接口地址常量
                .post(requestBody)
                .header("Authorization", apiKey)
                .header("Authorization", apiKey)  // 替换为实际的授权密钥常量
                .build();
        try (Response response = client.newCall(request).execute()) {
@@ -216,7 +260,7 @@
            return responseBody;
        } catch (IOException e) {
            e.printStackTrace();
            return e.getMessage();
            return "文件上传失败:" + e.getMessage();
        }
    }
@@ -242,13 +286,13 @@
        requestBody.addProperty("response_mode", "streaming");
        requestBody.addProperty("conversation_id", "");
        requestBody.addProperty("user", userId);
        requestBody.add("files", new JsonArray());
//        requestBody.add("files", new JsonArray());
        // 添加历史对话信息
        JsonArray historyArray = new JsonArray();
        for (AiBotMessage msg : history) {
            historyArray.add(String.valueOf(msg));
        }
        requestBody.add("history", historyArray);
//        for (AiBotMessage msg : history) {
//            historyArray.add(String.valueOf(msg));
//        }
//        requestBody.add("history", historyArray);
        RequestBody body = RequestBody.create(
                gson.toJson(requestBody),
@@ -291,6 +335,7 @@
            @Override
            public void onResponse(Call call, Response response) {
                int a = 1;
                AiMessage aiMessage = new AiMessage();
                com.agentsflex.core.llm.response.AiMessageResponse aiMessageResponse
                        = new com.agentsflex.core.llm.response.AiMessageResponse(new HistoriesPrompt(), response.message(), aiMessage);
@@ -321,16 +366,42 @@
                                // 这里需要根据具体API调整路径
                                JsonObject messageObj = gson.fromJson(data, JsonObject.class);
//                                System.out.println(messageObj);
                                if(messageObj.get("event").getAsString().equals("message_end")){
                                if(!messageObj.has("answer")){
                                    try {
                                        JsonArray asJsonArray = messageObj.getAsJsonObject("metadata").getAsJsonArray("retriever_resources");
                                        if (asJsonArray.size() > 0) {
                                            aiMessage.setFullContent("-----------------------");
                                            sb.append("\n"+aiMessage.getFullContent());
                                            emitter.send(JSON.toJSONString(aiMessage));
                                            for (int i = 0; i < asJsonArray.size(); i++) {
                                                aiMessage.setFullContent(asJsonArray.get(i).getAsJsonObject().get("document_name").getAsString());
                                                aiMessage.setContent(null);
//                                                aiMessageResponse.setMessage(aiMessage);
                                                sb.append("\n"+aiMessage.getFullContent());
                                                emitter.send(JSON.toJSONString(aiMessage));
                                            }
                                        }
                                    } catch (Exception e) {
                                        System.out.println("meizuo");
                                    }
                                    AiBotMessage aiBotMessage = new AiBotMessage();
                                    aiBotMessage.setBotId(botId);
                                    aiBotMessage.setSessionId(sessionId);
                                    aiBotMessage.setAccountId(new BigInteger(userId));
                                    aiBotMessage.setRole("assistant");
                                    String content = aiBotMessage.getContent();
                                    aiBotMessage.setContent(sb.toString());
                                    aiBotMessage.setCreated(new Date());
                                    aiBotMessage.setIsExternalMsg(1);
                                    aiBotMessageService.save(aiBotMessage);
                                    if(a == 1){
                                        a = 0;
                                        aiBotMessageService.save(aiBotMessage);
                                    }else{
                                        QueryWrapper qw = new QueryWrapper();
                                        qw.eq("content", content);
                                        aiBotMessageService.remove(qw);
                                        aiBotMessageService.save(aiBotMessage);
                                    }
//                                    System.out.println("end");
                                }else{
                                    String context = messageObj.get("answer").getAsString();
@@ -346,16 +417,16 @@
                                    aiMessage.setContent(context);
                                    aiMessageResponse.setMessage(aiMessage);
                                    if (!messageObj.get("answer").getAsString().isEmpty()) {
                                        if(!blean && aiMessage.getContent().startsWith("</details>")){
                                            blean = true;
                                            aiMessage.setContent(aiMessage.getContent().replaceAll("(?i)<[^>]*>", "\n\n"));
                                        }
                                        if(blean){
//                                        if(!blean && aiMessage.getContent().startsWith("</details>")){
//                                            blean = true;
//                                            aiMessage.setContent(aiMessage.getContent().replaceAll("(?i)<[^>]*>", "\n\n"));
//                                        }
//                                        if(blean){
                                            sb.append(aiMessage.getContent());
//                                            System.out.println(aiMessage);
                                            // 发送消息片段给前端
                                            emitter.send(JSON.toJSONString(aiMessage));
                                        }
//                                        }
                                    }
                                }