From aff1785df0f25c60f0a248154f30886e2b081478 Mon Sep 17 00:00:00 2001
From: zhangjinyang <409225776@qq.com>
Date: 星期五, 23 五月 2025 11:31:53 +0800
Subject: [PATCH] feat: 新增工作流试运行时显示每一步的执行结果。close #IC7KLQ

---
 aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiWorkflowController.java |  103 ++++++++++++++++++++++++++++++++++++++++++++-------
 1 files changed, 88 insertions(+), 15 deletions(-)

diff --git a/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiWorkflowController.java b/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiWorkflowController.java
index c9b1e82..45fcb5b 100644
--- a/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiWorkflowController.java
+++ b/aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiWorkflowController.java
@@ -2,18 +2,25 @@
 
 import cn.dev33.satoken.annotation.SaIgnore;
 import cn.hutool.core.collection.CollectionUtil;
+import cn.hutool.core.exceptions.ExceptionUtil;
 import cn.hutool.core.io.IoUtil;
+import cn.hutool.core.thread.ThreadUtil;
 import cn.hutool.core.util.StrUtil;
 import com.agentsflex.core.chain.*;
+import com.agentsflex.core.chain.event.ChainStatusChangeEvent;
+import com.agentsflex.core.chain.event.NodeEndEvent;
+import com.agentsflex.core.chain.event.NodeStartEvent;
 import com.alibaba.fastjson.JSONObject;
 import dev.tinyflow.core.Tinyflow;
 import dev.tinyflow.core.parser.NodeParser;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.multipart.MultipartFile;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 import tech.aiflowy.ai.entity.AiWorkflow;
 import tech.aiflowy.ai.service.AiLlmService;
 import tech.aiflowy.ai.service.AiWorkflowService;
+import tech.aiflowy.common.ai.MySseEmitter;
 import tech.aiflowy.common.domain.Result;
 import tech.aiflowy.common.web.controller.BaseCurdController;
 import tech.aiflowy.common.web.jsonbody.JsonBody;
@@ -21,6 +28,7 @@
 
 import javax.annotation.Resource;
 import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
 import java.io.InputStream;
 import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
@@ -91,31 +99,96 @@
         AiWorkflow workflow = service.getById(id);
 
         if (workflow == null) {
-            return Result.fail(1, "can not find the workflow by id: " + id);
+            return Result.fail(1, "宸ヤ綔娴佷笉瀛樺湪");
         }
 
         Tinyflow tinyflow = workflow.toTinyflow();
         Chain chain = tinyflow.toChain();
-        chain.addEventListener(new ChainEventListener() {
-            @Override
-            public void onEvent(ChainEvent event, Chain chain) {
-                System.out.println("onEvent : " + event);
-            }
-        });
-
-
-        chain.addOutputListener(new ChainOutputListener() {
-            @Override
-            public void onOutput(Chain chain, ChainNode node, Object outputMessage) {
-                System.out.println("output : " + node.getId() + " : " + outputMessage);
-            }
-        });
 
         Map<String, Object> result = chain.executeForResult(variables);
 
         return Result.success("result", result).set("message", chain.getMessage());
     }
 
+    @PostMapping("tryRunningStream")
+    public SseEmitter tryRunningStream(
+            @JsonBody(value = "id", required = true) BigInteger id,
+            @JsonBody("variables") Map<String, Object> variables,
+            HttpServletResponse response) {
+
+        response.setContentType("text/event-stream");
+
+        MySseEmitter emitter = new MySseEmitter((long) (1000 * 60 * 10));
+
+        AiWorkflow workflow = service.getById(id);
+        if (workflow == null) {
+            throw new RuntimeException("宸ヤ綔娴佷笉瀛樺湪");
+        }
+
+        Tinyflow tinyflow = workflow.toTinyflow();
+        Chain chain = tinyflow.toChain();
+
+        JSONObject json = new JSONObject();
+
+        chain.addEventListener(new ChainEventListener() {
+            @Override
+            public void onEvent(ChainEvent event, Chain chain) {
+                if (event instanceof NodeStartEvent) {
+                    JSONObject content = new JSONObject();
+                    ChainNode node = ((NodeStartEvent) event).getNode();
+                    content.put("nodeId", node.getId());
+                    content.put("status", "start");
+                    json.put("content", content);
+                    emitter.send(json.toJSONString());
+                }
+                if (event instanceof NodeEndEvent) {
+                    ChainNode node = ((NodeEndEvent) event).getNode();
+                    Map<String, Object> result = ((NodeEndEvent) event).getResult();
+                    JSONObject content = new JSONObject();
+                    content.put("nodeId", node.getId());
+                    content.put("status", "end");
+                    content.put("res", result);
+                    json.put("content", content);
+                    emitter.send(json.toJSONString());
+                }
+                if (event instanceof ChainStatusChangeEvent) {
+                    ChainStatus status = ((ChainStatusChangeEvent) event).getStatus();
+                    if (ChainStatus.FINISHED_ABNORMAL.equals(status)) {
+                        String message = chain.getMessage();
+                        JSONObject content = new JSONObject();
+                        content.put("status", "error");
+                        content.put("errorMsg", message);
+                        json.put("content", content);
+                        emitter.sendAndComplete(json.toJSONString());
+                    }
+                }
+            }
+        });
+
+        chain.addNodeErrorListener(new NodeErrorListener() {
+            @Override
+            public void onError(Throwable e, ChainNode node, Map<String, Object> map, Chain chain) {
+                String message = ExceptionUtil.getRootCauseMessage(e);
+                JSONObject content = new JSONObject();
+                content.put("nodeId", node.getId());
+                content.put("status", "nodeError");
+                content.put("errorMsg", message);
+                json.put("content", content);
+                emitter.sendAndComplete(json.toJSONString());
+            }
+        });
+
+        ThreadUtil.execAsync(() -> {
+            Map<String, Object> result = chain.executeForResult(variables);
+            JSONObject content = new JSONObject();
+            content.put("execResult", result);
+            json.put("content", content);
+            emitter.sendAndComplete(json.toJSONString());
+        });
+
+        return emitter;
+    }
+
     @SaIgnore
     @GetMapping(value = "/external/getRunningParams", produces = MediaType.APPLICATION_JSON_VALUE)
     @ResponseBody

--
Gitblit v1.8.0