| aiflowy-modules/aiflowy-module-ai/pom.xml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/config/DifyStreamClient.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiBotController.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiSecondMenuController.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/entity/AiLlm.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/entity/base/AiBotBase.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/service/impl/AiSecondMenuServiceImpl.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| aiflowy-starter/src/main/resources/application.yml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
| sql/aiflowy.sql | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
aiflowy-modules/aiflowy-module-ai/pom.xml
@@ -70,5 +70,15 @@ <version>4.5.14</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> <version>4.11.0</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.10.1</version> </dependency> </dependencies> </project> aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/config/DifyStreamClient.java
New file @@ -0,0 +1,331 @@ package tech.aiflowy.ai.config; import com.agentsflex.core.llm.response.AiMessageResponse; import com.agentsflex.core.message.AiMessage; import com.agentsflex.core.message.Message; import com.agentsflex.core.prompt.HistoriesPrompt; import com.agentsflex.core.prompt.Prompt; import com.alibaba.fastjson.JSON; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.mybatisflex.core.query.QueryWrapper; import okhttp3.*; import okio.BufferedSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import tech.aiflowy.ai.entity.AiBotConversationMessage; import tech.aiflowy.ai.entity.AiBotMessage; import tech.aiflowy.ai.service.AiBotConversationMessageService; import tech.aiflowy.ai.service.AiBotMessageService; import tech.aiflowy.ai.service.impl.AiBotMessageServiceImpl; import tech.aiflowy.common.ai.MySseEmitter; import tech.aiflowy.common.util.StringUtil; import tech.aiflowy.common.web.controller.BaseCurdController; import javax.annotation.Resource; import java.io.IOException; import java.math.BigInteger; import java.util.*; import java.util.concurrent.CompletableFuture; public class DifyStreamClient { private final OkHttpClient client; private final String apiUrl; private final String apiKey; private final Gson gson; private String prompt; private AiBotMessageService aiBotMessageService; public DifyStreamClient(String apiUrl, String apiKey, AiBotMessageService aiBotMessageService) { this.apiUrl = apiUrl; this.apiKey = apiKey; this.gson = new GsonBuilder().setPrettyPrinting().create(); this.client = new OkHttpClient.Builder().build(); this.aiBotMessageService = aiBotMessageService; } // 流式聊天方法 - 直接集成SseEmitter public CompletableFuture<Void> chatStream(String message, String userId, MySseEmitter emitter, String sessionId, BigInteger botId) { prompt = message; QueryWrapper qw = new QueryWrapper(); qw.eq(AiBotMessage::getSessionId, sessionId) .orderBy(AiBotMessage::getId,false) .limit(6); List<AiBotMessage> history = aiBotMessageService.list(qw); // System.out.println("======================history==================\n"); // for (AiBotMessage aiBotMessage : history) { // System.out.println(aiBotMessage.toMessage()); // } // System.out.println("\n======================history=================="); // 构建请求JSON JsonObject requestBody = new JsonObject(); requestBody.add("inputs", new JsonObject()); requestBody.addProperty("query", message); requestBody.addProperty("response_mode", "streaming"); requestBody.addProperty("conversation_id", ""); requestBody.addProperty("user", userId); requestBody.add("files", new JsonArray()); // 添加历史对话信息 JsonArray historyArray = new JsonArray(); for (AiBotMessage msg : history) { historyArray.add(String.valueOf(msg)); } requestBody.add("history", historyArray); RequestBody body = RequestBody.create( gson.toJson(requestBody), MediaType.parse("application/json; charset=utf-8") ); Request request = new Request.Builder() .url(apiUrl) .post(body) .header("Authorization", apiKey) .header("Content-Type", "application/json") .build(); CompletableFuture<Void> future = new CompletableFuture<>(); // 设置SseEmitter生命周期回调 emitter.onTimeout(() -> { System.out.println("SSE连接超时"); emitter.complete(); future.complete(null); }); emitter.onCompletion(() -> { System.out.println("SSE连接已完成"); future.complete(null); }); emitter.onError(e -> { System.out.println("SSE连接错误: " + e.getMessage()); emitter.completeWithError(e); future.completeExceptionally(e); }); // 发送异步请求 client.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { emitter.completeWithError(e); future.completeExceptionally(e); } @Override public void onResponse(Call call, Response response) { AiMessage aiMessage = new AiMessage(); com.agentsflex.core.llm.response.AiMessageResponse aiMessageResponse = new com.agentsflex.core.llm.response.AiMessageResponse(new HistoriesPrompt(), response.message(), aiMessage); try (ResponseBody responseBody = response.body()) { if (!response.isSuccessful()) { emitter.completeWithError(new IOException("API错误: " + response.code())); return; } // 使用BufferedSource逐行读取响应内容 BufferedSource source = responseBody.source(); String line; StringBuffer sb = new StringBuffer(); // 标记是否为第一条有效数据(用于处理某些API的特殊格式) boolean isFirstData = true; while ((line = source.readUtf8Line()) != null) { if (line.startsWith("data: ")) { String data = line.substring(6).trim(); // 忽略空数据或结束标记 if (data.isEmpty() || data.equals("[DONE]")) { continue; } try { // 根据实际API响应结构提取消息内容 // 这里需要根据具体API调整路径 JsonObject messageObj = gson.fromJson(data, JsonObject.class); // System.out.println(messageObj); if(messageObj.get("event").getAsString().equals("message_end")){ AiBotMessage aiBotMessage = new AiBotMessage(); aiBotMessage.setBotId(botId); aiBotMessage.setSessionId(sessionId); aiBotMessage.setAccountId(new BigInteger(userId)); aiBotMessage.setRole("assistant"); aiBotMessage.setContent(sb.toString()); aiBotMessage.setCreated(new Date()); aiBotMessage.setIsExternalMsg(1); aiBotMessageService.save(aiBotMessage); // System.out.println("end"); }else{ String context = messageObj.get("answer").getAsString(); if (context != null) { // // 只移除HTML标签,保留Markdown特殊字符 // context = context.replaceAll("(?i)<[^>]*>", ""); context = context.replaceFirst("Thinking...", ""); } sb.append(context); aiMessage.setContent(context); aiMessageResponse.setMessage(aiMessage); if (StringUtil.hasText(messageObj.get("answer").getAsString())) { // System.out.println(aiMessage); if(aiMessage.getContent().startsWith("</details>")){ aiMessage.setContent(aiMessage.getContent().replaceAll("(?i)<[^>]*>", "</details>"+"\n\n")); } // 发送消息片段给前端 emitter.send(JSON.toJSONString(aiMessage)); } } // 重置第一条数据标记 isFirstData = false; } catch (Exception e) { // 记录解析错误但继续处理后续数据 emitter.completeWithError(e); } } } // 所有数据处理完毕,发送完成信号 emitter.send(SseEmitter.event().name("complete")); emitter.complete(); } catch (IOException e) { emitter.completeWithError(e); emitter.complete(); } catch (Exception e) { // 处理其他异常 emitter.completeWithError(e); } } // @Override public void onResponses(Call call, Response response) { // try (ResponseBody responseBody = response.body()) { // if (!response.isSuccessful() || responseBody == null) { // IOException e = new IOException("Unexpected code " + response); // emitter.completeWithError(e); // future.completeExceptionally(e); // return; // } //// String rawResponse = responseBody.string(); // 打印原始响应 //// System.out.println("Dify原始响应: " + rawResponse); // // BufferedSource source = responseBody.source(); // String line; // // // 处理流式响应 // while ((line = source.readUtf8Line()) != null) { // if (line.startsWith("data: ")) { // String data = line.substring(6); // 移除"data: "前缀 // // // 跳过空数据 // if (data.trim().equals("[DONE]")) { // emitter.complete(); // future.complete(null); // break; // } // // try { // // 解析JSON响应 // JsonObject jsonResponse = gson.fromJson(data, JsonObject.class); // // 发送消息内容到客户端 // if (jsonResponse.has("message") && jsonResponse.get("message").isJsonObject()) { // String content = jsonResponse.getAsJsonObject("message").get("content").getAsString(); // emitter.send(SseEmitter.event() // .data(content) // .name("message") // ); // } // // // 处理函数调用 // if (jsonResponse.has("function_call") && jsonResponse.get("function_call").isJsonObject()) { // emitter.send(SseEmitter.event() // .data(jsonResponse.get("function_call")) // .name("function_call") // ); // } // // } catch (Exception e) { // emitter.completeWithError(e); // future.completeExceptionally(e); // break; // } // } // } try (ResponseBody responseBody = response.body()) { if (!response.isSuccessful()) { emitter.completeWithError(new IOException("API错误")); return; } StringBuilder fullAnswer = new StringBuilder(); BufferedSource source = responseBody.source(); String line; while ((line = source.readUtf8Line()) != null) { if (line.startsWith("data: ")) { String data = line.substring(6).trim(); if (data.equals("[DONE]")) { // 发送完整回答并结束流 emitter.send(fullAnswer.toString()); emitter.complete(); break; } try { JsonObject json = gson.fromJson(data, JsonObject.class); System.out.println(json); String answerFragment = json.get("answer").getAsString(); // 过滤无效字符(如️⃣、\n) String cleanedFragment = answerFragment.replaceAll("[^\u4e00-\u9fa50-9a-zA-Z\\s\\p{Punct}]", ""); fullAnswer.append(cleanedFragment); // 拼接碎片 // 可选:每拼接一部分发送给前端(需前端支持增量渲染) // emitter.send(SseEmitter.event().data(cleanedFragment)); } catch (Exception e) { emitter.sendAndComplete("解析错误: " + e.getMessage()); } } } } catch (Exception e) { emitter.completeWithError(e); future.completeExceptionally(e); } } }); return future; } // 数据模型类 public interface StreamResponseListener { void onMessage(ChatContext context, AiMessageResponse response); void onStop(ChatContext context); void onFailure(ChatContext context, Throwable throwable); } public static class ChatContext { // 上下文信息 } public static class AiMessageResponse { private Message message; private JsonArray functionCallers; public Message getMessage() { return message; } public void setMessage(Message message) { this.message = message; } public JsonArray getFunctionCallers() { return functionCallers; } public void setFunctionCallers(JsonArray functionCallers) { this.functionCallers = functionCallers; } } public static class Message { private String content; public String getContent() { return content; } public void setContent(String content) { this.content = content; } } } aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiBotController.java
@@ -2,8 +2,6 @@ import cn.dev33.satoken.annotation.SaIgnore; import cn.hutool.core.util.ObjectUtil; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import com.agentsflex.core.llm.ChatContext; import com.agentsflex.core.llm.Llm; import com.agentsflex.core.llm.StreamResponseListener; @@ -17,32 +15,28 @@ import com.agentsflex.core.prompt.ToolPrompt; import com.agentsflex.core.util.CollectionUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializeConfig; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.jfinal.template.stat.ast.Break; import com.mybatisflex.core.paginate.Page; import com.mybatisflex.core.query.QueryWrapper; import com.mybatisflex.core.table.TableInfo; import com.mybatisflex.core.table.TableInfoFactory; import io.milvus.param.R; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.*; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.client.RestTemplate; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import tech.aiflowy.ai.config.DifyStreamClient; import tech.aiflowy.ai.entity.*; import tech.aiflowy.ai.mapper.AiBotConversationMessageMapper; import tech.aiflowy.ai.service.*; import tech.aiflowy.ai.vo.ModelConfig; import tech.aiflowy.common.ai.ChatManager; import tech.aiflowy.common.ai.MySseEmitter; import tech.aiflowy.common.domain.Result; @@ -57,14 +51,8 @@ import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.math.BigInteger; import java.time.Duration; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** * 控制层。 @@ -149,6 +137,7 @@ @Autowired private ObjectMapper objectMapper; @PostMapping("chat") public SseEmitter chat(@JsonBody(value = "prompt", required = true) String prompt, @JsonBody(value = "botId", required = true) BigInteger botId, @@ -160,80 +149,55 @@ if (aiBot == null) { return ChatManager.getInstance().sseEmitterForContent("机器人不存在"); } Map<String, Object> llmOptions = aiBot.getLlmOptions(); String systemPrompt = llmOptions != null ? (String) llmOptions.get("systemPrompt") : null; if (StringUtil.hasText(aiBot.getModelAPI())){ // 使用Dify模型的逻辑 String apiUrl = aiBot.getModelAPI(); String bearerToken = aiBot.getModelKey(); String apiUrl = aiBot.getModelAPI(); // 替换为实际API URL String apiKey = aiBot.getModelKEY(); // 替换为实际API Key // 构建请求JSON JSONObject jsonBody = new JSONObject(); jsonBody.put("inputs", new JSONObject()); jsonBody.put("query", prompt); jsonBody.put("response_mode", "blocking"); jsonBody.put("conversation_id", ""); jsonBody.put("user", userId.toString()); DifyStreamClient client = new DifyStreamClient(apiUrl, apiKey, aiBotMessageService); AiBotMessageMemory memory = new AiBotMessageMemory(botId, SaTokenUtil.getLoginAccount().getId(), sessionId, isExternalMsg, aiBotMessageService, aiBotConversationMessageMapper, aiBotConversationMessageService); JSONArray files = new JSONArray(); JSONObject fileObj = new JSONObject(); fileObj.put("type", ""); fileObj.put("transfer_method", ""); fileObj.put("url", ""); files.put(fileObj); jsonBody.put("files", files); final HistoriesPrompt historiesPrompt = new HistoriesPrompt(); if (systemPrompt != null) { historiesPrompt.setSystemMessage(SystemMessage.of(systemPrompt)); } // 发送HTTP请求 HttpClient client = HttpClient.newBuilder() .connectTimeout(Duration.ofSeconds(30)) .build(); historiesPrompt.setMemory(memory); HttpRequest request = HttpRequest.newBuilder() .uri(URI.create(apiUrl)) .header("Content-Type", "application/json") .header("Authorization", "Bearer " + bearerToken) .POST(HttpRequest.BodyPublishers.ofString(jsonBody.toString())) .build(); HumanMessage humanMessage = new HumanMessage(prompt); HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString()); // 添加插件相关的function calling appendPluginToolFunction(botId, humanMessage); // 处理成功响应 JSONObject responseJson = new JSONObject(response.body()); //添加工作流相关的 Function Calling appendWorkflowFunctions(botId, humanMessage); // 保存用户消息到历史记录 ChatHistory userChat = new ChatHistory(); userChat.setContent(prompt); userChat.setRole("user"); userChat.setUserId(userId); userChat.setBotId(botId.intValue()); userChat.setModel("dify"); userChat.setChatId(responseJson.optString("conversation_id", "")); Integer userHistoryId = chatHistoryService.saveChatHistory(userChat); //添加知识库相关的 Function Calling appendKnowledgeFunctions(botId, humanMessage); // 保存AI回复到历史记录 ChatHistory assistantChat = new ChatHistory(); assistantChat.setContent(responseJson.getString("answer")); assistantChat.setRole("assistant"); assistantChat.setUserId(userId); assistantChat.setBotId(botId.intValue()); assistantChat.setModel("dify"); assistantChat.setChatId(userChat.getChatId()); Integer assistantHistoryId = chatHistoryService.saveChatHistory(assistantChat); historiesPrompt.addMessage(humanMessage); // 构建响应数据 int count = chatHistoryService.getChatHistoryCount(userId, botId.intValue()); JSONObject result = new JSONObject(); result.put("Count", count); result.put("Data", responseJson.getString("answer")); result.put("UserId", userHistoryId); result.put("AssistantId", assistantHistoryId); // 发送SSE响应 emitter.send(SseEmitter.event() .name("message") .data(result.toString())); emitter.complete(); }else{ Map<String, Object> llmOptions = aiBot.getLlmOptions(); String systemPrompt = llmOptions != null ? (String) llmOptions.get("systemPrompt") : null; final Boolean[] needClose = {true}; ServletRequestAttributes sra = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); MySseEmitter emitter = new MySseEmitter(1000L * 60 * 2); // 2分钟超时 try { String userId = SaTokenUtil.getLoginAccount().getId() + ""; client.chatStream(prompt, userId, emitter, sessionId, botId); } catch (Exception e) { emitter.completeWithError(e); } System.out.println(emitter.toString()); return emitter; } else{ AiLlm aiLlm = aiLlmService.getById(aiBot.getLlmId()); if (aiLlm == null) { @@ -290,6 +254,7 @@ if (response.getMessage() != null) { String content = response.getMessage().getContent(); if (StringUtil.hasText(content)) { System.out.println(response); emitter.send(JSON.toJSONString(response.getMessage())); } } @@ -315,10 +280,52 @@ } }); System.out.println(emitter.toString()); return emitter; } } public Result save(@RequestBody String jsonStr) { // 解析JSON JSONObject json = JSONObject.parseObject(jsonStr); // 合并所有secondMenuId*字段 List<Integer> menuIds = new ArrayList<>(); for (String key : json.keySet()) { if (key.startsWith("secondMenuId")) { Object value = json.get(key); if (value instanceof Integer) { menuIds.add((Integer) value); } } } // 保留第一个ID(根据需要调整) if (!menuIds.isEmpty()) { json.put("secondMenuId", menuIds.get(0)); } // 转换为实体类 AiBot entity = json.toJavaObject(AiBot.class); // 后续处理保持不变 Result result = onSaveOrUpdateBefore(entity, true); if (result != null) return result; if (entity == null) { throw new NullPointerException("entity is null"); } LoginAccount loginAccount = SaTokenUtil.getLoginAccount(); commonFiled(entity,loginAccount.getId(),loginAccount.getTenantId(),loginAccount.getDeptId()); boolean success = service.save(entity); onSaveOrUpdateAfter(entity, true); TableInfo tableInfo = TableInfoFactory.ofEntityClass(entity.getClass()); Object[] pkArgs = tableInfo.buildPkSqlArgs(entity); return Result.create(success).set("id", pkArgs); } /** * 外部用户调用智能体进行对话 @@ -641,4 +648,4 @@ } } } aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiSecondMenuController.java
@@ -31,7 +31,7 @@ @Override public Result list(AiSecondMenu entity, Boolean asTree, String sortKey, String sortType) { QueryWrapper queryWrapper = QueryWrapper.create(entity, buildOperators(entity)); queryWrapper.orderBy(buildOrderBy(sortKey, sortType, getDefaultOrderBy())); queryWrapper.orderBy(AiSecondMenu::getFirstMenuId); List<AiSecondMenu> list = Tree.tryToTree(aiSecondMenuService.findAll(queryWrapper), asTree); return Result.success(list); } aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/entity/AiLlm.java
@@ -32,7 +32,6 @@ @Table("tb_ai_llm") public class AiLlm extends AiLlmBase { public List<String> getSupportFeatures() { List<String> features = new ArrayList<>(); if (getSupportChat() != null && getSupportChat()) { aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/entity/base/AiBotBase.java
@@ -98,7 +98,7 @@ private String modelAPI; @Column(comment = "KEY") private String modelKey; private String modelKEY; @Column(comment = "一级菜单编号") @@ -123,12 +123,12 @@ this.modelAPI = modelAPI; } public String getModelKey() { return modelKey; public String getModelKEY() { return modelKEY; } public void setModelKey(String modelKey) { this.modelKey = modelKey; public void setModelKEY(String modelKEY) { this.modelKEY = modelKEY; } public Integer getSecondMenuId() { aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/service/impl/AiSecondMenuServiceImpl.java
@@ -10,6 +10,8 @@ import tech.aiflowy.ai.service.AiSecondMenuService; import com.mybatisflex.spring.service.impl.ServiceImpl; import java.util.Collections; import java.util.Comparator; import java.util.List; /** @@ -36,6 +38,7 @@ for (AiSecondMenu record : records) { record.setFirstMenuName(aiSecondMenuMapper.getFMN(record.getFirstMenuId())); } records.sort(Comparator.comparing(AiSecondMenu::getFirstMenuId)); return page1; } } aiflowy-starter/src/main/resources/application.yml
@@ -5,7 +5,7 @@ profiles: active: dev datasource: url: jdbc:mysql://127.0.0.1:3306/aiflowy?useInformationSchema=true&characterEncoding=utf-8 url: jdbc:mysql://localhost:3306/aiflowy?useInformationSchema=true&characterEncoding=utf-8 username: root password: win2020 servlet: sql/aiflowy.sql
New file Diff too large