From aff1785df0f25c60f0a248154f30886e2b081478 Mon Sep 17 00:00:00 2001
From: zhangjinyang <409225776@qq.com>
Date: 星期五, 23 五月 2025 11:31:53 +0800
Subject: [PATCH] feat: 新增工作流试运行时显示每一步的执行结果。close #IC7KLQ
---
aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiWorkflowController.java | 103 ++++++++++++++++++++++++++++++++++++++++++++-------
1 files changed, 88 insertions(+), 15 deletions(-)
diff --git a/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiWorkflowController.java b/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiWorkflowController.java
index c9b1e82..45fcb5b 100644
--- a/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiWorkflowController.java
+++ b/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiWorkflowController.java
@@ -2,18 +2,25 @@
import cn.dev33.satoken.annotation.SaIgnore;
import cn.hutool.core.collection.CollectionUtil;
+import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.agentsflex.core.chain.*;
+import com.agentsflex.core.chain.event.ChainStatusChangeEvent;
+import com.agentsflex.core.chain.event.NodeEndEvent;
+import com.agentsflex.core.chain.event.NodeStartEvent;
import com.alibaba.fastjson.JSONObject;
import dev.tinyflow.core.Tinyflow;
import dev.tinyflow.core.parser.NodeParser;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import tech.aiflowy.ai.entity.AiWorkflow;
import tech.aiflowy.ai.service.AiLlmService;
import tech.aiflowy.ai.service.AiWorkflowService;
+import tech.aiflowy.common.ai.MySseEmitter;
import tech.aiflowy.common.domain.Result;
import tech.aiflowy.common.web.controller.BaseCurdController;
import tech.aiflowy.common.web.jsonbody.JsonBody;
@@ -21,6 +28,7 @@
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
import java.io.InputStream;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
@@ -91,31 +99,96 @@
AiWorkflow workflow = service.getById(id);
if (workflow == null) {
- return Result.fail(1, "can not find the workflow by id: " + id);
+ return Result.fail(1, "宸ヤ綔娴佷笉瀛樺湪");
}
Tinyflow tinyflow = workflow.toTinyflow();
Chain chain = tinyflow.toChain();
- chain.addEventListener(new ChainEventListener() {
- @Override
- public void onEvent(ChainEvent event, Chain chain) {
- System.out.println("onEvent : " + event);
- }
- });
-
-
- chain.addOutputListener(new ChainOutputListener() {
- @Override
- public void onOutput(Chain chain, ChainNode node, Object outputMessage) {
- System.out.println("output : " + node.getId() + " : " + outputMessage);
- }
- });
Map<String, Object> result = chain.executeForResult(variables);
return Result.success("result", result).set("message", chain.getMessage());
}
+ @PostMapping("tryRunningStream")
+ public SseEmitter tryRunningStream(
+ @JsonBody(value = "id", required = true) BigInteger id,
+ @JsonBody("variables") Map<String, Object> variables,
+ HttpServletResponse response) {
+
+ response.setContentType("text/event-stream");
+
+ MySseEmitter emitter = new MySseEmitter((long) (1000 * 60 * 10));
+
+ AiWorkflow workflow = service.getById(id);
+ if (workflow == null) {
+ throw new RuntimeException("宸ヤ綔娴佷笉瀛樺湪");
+ }
+
+ Tinyflow tinyflow = workflow.toTinyflow();
+ Chain chain = tinyflow.toChain();
+
+ JSONObject json = new JSONObject();
+
+ chain.addEventListener(new ChainEventListener() {
+ @Override
+ public void onEvent(ChainEvent event, Chain chain) {
+ if (event instanceof NodeStartEvent) {
+ JSONObject content = new JSONObject();
+ ChainNode node = ((NodeStartEvent) event).getNode();
+ content.put("nodeId", node.getId());
+ content.put("status", "start");
+ json.put("content", content);
+ emitter.send(json.toJSONString());
+ }
+ if (event instanceof NodeEndEvent) {
+ ChainNode node = ((NodeEndEvent) event).getNode();
+ Map<String, Object> result = ((NodeEndEvent) event).getResult();
+ JSONObject content = new JSONObject();
+ content.put("nodeId", node.getId());
+ content.put("status", "end");
+ content.put("res", result);
+ json.put("content", content);
+ emitter.send(json.toJSONString());
+ }
+ if (event instanceof ChainStatusChangeEvent) {
+ ChainStatus status = ((ChainStatusChangeEvent) event).getStatus();
+ if (ChainStatus.FINISHED_ABNORMAL.equals(status)) {
+ String message = chain.getMessage();
+ JSONObject content = new JSONObject();
+ content.put("status", "error");
+ content.put("errorMsg", message);
+ json.put("content", content);
+ emitter.sendAndComplete(json.toJSONString());
+ }
+ }
+ }
+ });
+
+ chain.addNodeErrorListener(new NodeErrorListener() {
+ @Override
+ public void onError(Throwable e, ChainNode node, Map<String, Object> map, Chain chain) {
+ String message = ExceptionUtil.getRootCauseMessage(e);
+ JSONObject content = new JSONObject();
+ content.put("nodeId", node.getId());
+ content.put("status", "nodeError");
+ content.put("errorMsg", message);
+ json.put("content", content);
+ emitter.sendAndComplete(json.toJSONString());
+ }
+ });
+
+ ThreadUtil.execAsync(() -> {
+ Map<String, Object> result = chain.executeForResult(variables);
+ JSONObject content = new JSONObject();
+ content.put("execResult", result);
+ json.put("content", content);
+ emitter.sendAndComplete(json.toJSONString());
+ });
+
+ return emitter;
+ }
+
@SaIgnore
@GetMapping(value = "/external/getRunningParams", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
--
Gitblit v1.8.0