zhangjinyang
2025-05-23 aff1785df0f25c60f0a248154f30886e2b081478
feat: 新增工作流试运行时显示每一步的执行结果。close #IC7KLQ
4个文件已修改
330 ■■■■ 已修改文件
aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiWorkflowController.java 103 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/node/MakeFileNode.java 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
aiflowy-ui-react/src/libs/workflowUtil.tsx 76 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
aiflowy-ui-react/src/pages/ai/workflowDesign/WorkflowDesign.tsx 150 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiWorkflowController.java
@@ -2,18 +2,25 @@
import cn.dev33.satoken.annotation.SaIgnore;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.agentsflex.core.chain.*;
import com.agentsflex.core.chain.event.ChainStatusChangeEvent;
import com.agentsflex.core.chain.event.NodeEndEvent;
import com.agentsflex.core.chain.event.NodeStartEvent;
import com.alibaba.fastjson.JSONObject;
import dev.tinyflow.core.Tinyflow;
import dev.tinyflow.core.parser.NodeParser;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import tech.aiflowy.ai.entity.AiWorkflow;
import tech.aiflowy.ai.service.AiLlmService;
import tech.aiflowy.ai.service.AiWorkflowService;
import tech.aiflowy.common.ai.MySseEmitter;
import tech.aiflowy.common.domain.Result;
import tech.aiflowy.common.web.controller.BaseCurdController;
import tech.aiflowy.common.web.jsonbody.JsonBody;
@@ -21,6 +28,7 @@
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.InputStream;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
@@ -91,31 +99,96 @@
        AiWorkflow workflow = service.getById(id);
        if (workflow == null) {
            return Result.fail(1, "can not find the workflow by id: " + id);
            return Result.fail(1, "工作流不存在");
        }
        Tinyflow tinyflow = workflow.toTinyflow();
        Chain chain = tinyflow.toChain();
        chain.addEventListener(new ChainEventListener() {
            @Override
            public void onEvent(ChainEvent event, Chain chain) {
                System.out.println("onEvent : " + event);
            }
        });
        chain.addOutputListener(new ChainOutputListener() {
            @Override
            public void onOutput(Chain chain, ChainNode node, Object outputMessage) {
                System.out.println("output : " + node.getId() + " : " + outputMessage);
            }
        });
        Map<String, Object> result = chain.executeForResult(variables);
        return Result.success("result", result).set("message", chain.getMessage());
    }
    @PostMapping("tryRunningStream")
    public SseEmitter tryRunningStream(
            @JsonBody(value = "id", required = true) BigInteger id,
            @JsonBody("variables") Map<String, Object> variables,
            HttpServletResponse response) {
        response.setContentType("text/event-stream");
        MySseEmitter emitter = new MySseEmitter((long) (1000 * 60 * 10));
        AiWorkflow workflow = service.getById(id);
        if (workflow == null) {
            throw new RuntimeException("工作流不存在");
        }
        Tinyflow tinyflow = workflow.toTinyflow();
        Chain chain = tinyflow.toChain();
        JSONObject json = new JSONObject();
        chain.addEventListener(new ChainEventListener() {
            @Override
            public void onEvent(ChainEvent event, Chain chain) {
                if (event instanceof NodeStartEvent) {
                    JSONObject content = new JSONObject();
                    ChainNode node = ((NodeStartEvent) event).getNode();
                    content.put("nodeId", node.getId());
                    content.put("status", "start");
                    json.put("content", content);
                    emitter.send(json.toJSONString());
                }
                if (event instanceof NodeEndEvent) {
                    ChainNode node = ((NodeEndEvent) event).getNode();
                    Map<String, Object> result = ((NodeEndEvent) event).getResult();
                    JSONObject content = new JSONObject();
                    content.put("nodeId", node.getId());
                    content.put("status", "end");
                    content.put("res", result);
                    json.put("content", content);
                    emitter.send(json.toJSONString());
                }
                if (event instanceof ChainStatusChangeEvent) {
                    ChainStatus status = ((ChainStatusChangeEvent) event).getStatus();
                    if (ChainStatus.FINISHED_ABNORMAL.equals(status)) {
                        String message = chain.getMessage();
                        JSONObject content = new JSONObject();
                        content.put("status", "error");
                        content.put("errorMsg", message);
                        json.put("content", content);
                        emitter.sendAndComplete(json.toJSONString());
                    }
                }
            }
        });
        chain.addNodeErrorListener(new NodeErrorListener() {
            @Override
            public void onError(Throwable e, ChainNode node, Map<String, Object> map, Chain chain) {
                String message = ExceptionUtil.getRootCauseMessage(e);
                JSONObject content = new JSONObject();
                content.put("nodeId", node.getId());
                content.put("status", "nodeError");
                content.put("errorMsg", message);
                json.put("content", content);
                emitter.sendAndComplete(json.toJSONString());
            }
        });
        ThreadUtil.execAsync(() -> {
            Map<String, Object> result = chain.executeForResult(variables);
            JSONObject content = new JSONObject();
            content.put("execResult", result);
            json.put("content", content);
            emitter.sendAndComplete(json.toJSONString());
        });
        return emitter;
    }
    @SaIgnore
    @GetMapping(value = "/external/getRunningParams", produces = MediaType.APPLICATION_JSON_VALUE)
    @ResponseBody
aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/node/MakeFileNode.java
@@ -31,7 +31,6 @@
    @Override
    protected Map<String, Object> execute(Chain chain) {
        System.out.println("===========make file==============");
        Map<String, Object> map = chain.getParameterValues(this);
        Map<String, Object> res = new HashMap<>();
aiflowy-ui-react/src/libs/workflowUtil.tsx
@@ -63,4 +63,80 @@
        </>
    )
}
export function sortNodes(nodesJson: any): any[] {
    const { nodes, edges } = nodesJson;
    // 创建数据结构
    const nodeMap: any = {};
    const graph: any = {};
    const inDegree: any = {};
    // 初始化
    nodes.forEach((node: any) => {
        const nodeId = node.id;
        nodeMap[nodeId] = {
            key: nodeId,
            label: node.data?.title || nodeId,
            original: node,
            children: "",
            extra: ""
        };
        graph[nodeId] = [];
        inDegree[nodeId] = 0;
    });
    // 处理参数依赖
    nodes.forEach((node:any) => {
        const parameters = node.data?.parameters || [];
        parameters.forEach((param:any) => {
            if (param.ref) {
                const [sourceNodeId] = param.ref.split('.');
                if (nodeMap[sourceNodeId]) {
                    graph[sourceNodeId].push(node.id);
                    inDegree[node.id]++;
                }
            }
        });
    });
    // 处理边依赖
    edges.forEach((edge:any) => {
        const { source, target } = edge;
        if (nodeMap[source] && nodeMap[target]) {
            graph[source].push(target);
            inDegree[target]++;
        }
    });
    // 拓扑排序
    const queue = nodes.filter((node:any) => inDegree[node.id] === 0).map((node:any) => node.id);
    const sortedNodes = [];
    while (queue.length) {
        const nodeId = queue.shift();
        sortedNodes.push(nodeMap[nodeId]);
        graph[nodeId].forEach((neighborId:any) => {
            inDegree[neighborId]--;
            if (inDegree[neighborId] === 0) {
                queue.push(neighborId);
            }
        });
    }
    // 检查循环依赖
    if (sortedNodes.length !== nodes.length) {
        console.error('检测到循环依赖,排序结果可能不完整');
    }
    // 只返回需要的格式
    return sortedNodes.map(node => ({
        key: node.key,
        label: node.label,
        original: node.original,
        children: node.children,
        extra: node.extra
    }));
}
aiflowy-ui-react/src/pages/ai/workflowDesign/WorkflowDesign.tsx
@@ -1,9 +1,17 @@
import {useEffect, useRef, useState} from 'react';
// @ts-ignore
import React, {useEffect, useRef, useState} from 'react';
import {useLayout} from '../../../hooks/useLayout.tsx';
import {App, Button, Drawer, Form, Input, Skeleton, Spin} from "antd";
import {App, Button, Collapse, Drawer, Empty, Form, Input, Skeleton, Spin} from "antd";
import {useParams} from "react-router-dom";
import {useDetail, useGet, useGetManual, usePostManual, useUpdate} from "../../../hooks/useApis.ts";
import {FormOutlined, SendOutlined, UploadOutlined} from "@ant-design/icons";
import {useDetail, useGet, useGetManual, useUpdate} from "../../../hooks/useApis.ts";
import {
    CheckCircleOutlined,
    CloseCircleOutlined, ExclamationCircleOutlined,
    FormOutlined,
    LoadingOutlined,
    SendOutlined,
    UploadOutlined
} from "@ant-design/icons";
import {Tinyflow, TinyflowHandle} from '@tinyflow-ai/react';
import '@tinyflow-ai/react/dist/index.css'
import {Uploader} from "../../../components/Uploader";
@@ -11,6 +19,9 @@
import {PluginNode} from './customNode/pluginNode.ts'
import {PluginTools} from "../botDesign/PluginTools.tsx";
import {SingleRun} from "./SingleRun.tsx";
import JsonView from "react18-json-view";
import {useSse} from "../../../hooks/useSse.ts";
import {sortNodes} from "../../../libs/workflowUtil";
export const WorkflowDesign = () => {
@@ -25,12 +36,29 @@
    const [workflowData, setWorkflowData] = useState<any>({})
    const [saveLoading, setSaveLoading] = useState(false);
    const [runLoading, setRunLoading] = useState(false);
    const [collapseItems, setCollapseItems] = useState<any[]>([])
    const {doGet: getWorkflowInfo} = useGetManual("/api/v1/aiWorkflow/detail")
    const getNodesInfo = (workflowId: any) => {
        setCollapseItems([])
        setRunLoading(true)
        getWorkflowInfo({
            params: {
                id: workflowId,
            }
        }).then(res => {
            setRunLoading(false)
            const nodeJson = JSON.parse(res.data?.data.content);
            setCollapseItems(sortNodes(nodeJson))
        })
    }
    // 添加 useEffect 监听 workflow 变化
    useEffect(() => {
        if (workflow?.data?.content) {
            try {
                setWorkflowData(JSON.parse(workflow.data.content))
                const content = JSON.parse(workflow.data.content);
                setWorkflowData(content)
            } catch (e) {
                setWorkflowData({})
            }
@@ -109,7 +137,7 @@
    const {doGet: getRunningParameters} = useGetManual("/api/v1/aiWorkflow/getRunningParameters");
    const {doPost: tryRunning} = usePostManual("/api/v1/aiWorkflow/tryRunning");
    //const {doPost: tryRunning} = usePostManual("/api/v1/aiWorkflow/tryRunning");
    const [selectedItem, setSelectedItem] = useState<any>([]);
    const showRunningParameters = async () => {
        setRunLoading(true)
@@ -130,13 +158,14 @@
            }
            setRunLoading(false)
        })
        getNodesInfo(params.id)
    }
    const [open, setOpen] = useState(false);
    const [singleRunOpen, setSingleRunOpen] = useState(false);
    const singleRunRef = useRef<any>(null)
    const [currentNode,  setCurrentNode] = useState<any>(null)
    const [currentNode, setCurrentNode] = useState<any>(null)
    const showDrawer = () => {
        setOpen(true);
@@ -146,37 +175,88 @@
        setOpen(false);
    };
    const onSingleRunClose  = () => {
    const onSingleRunClose = () => {
        setSingleRunOpen(false);
        singleRunRef.current.resetForm()
    };
    const {start: runWithStream} = useSse("/api/v1/aiWorkflow/tryRunningStream");
    const onFinish = (values: any) => {
        //console.log('submit', values)
        setSubmitLoading(true)
        tryRunning({
        // tryRunning({
        //     data: {
        //         id: params.id,
        //         variables: values
        //     }
        // }).then((resp) => {
        //     if (resp.data.errorCode === 0) {
        //         message.success("成功")
        //     }
        //     setSubmitLoading(false)
        //     setExecuteResult(JSON.stringify(resp.data))
        // })
        collapseItems.map((item: any) => {
            item.extra = ""
            item.children = ""
        })
        setCollapseItems([...collapseItems])
        runWithStream({
            data: {
                id: params.id,
                variables: values
            }
        }).then((resp) => {
            if (resp.data.errorCode === 0) {
                message.success("成功")
            }
            setSubmitLoading(false)
            setExecuteResult(JSON.stringify(resp.data))
            },
            onMessage: (msg: any) => {
                //console.log(msg)
                if (msg.execResult) {
                    setExecuteResult(msg.execResult)
                }
                if (msg.status === 'error') {
                    setExecuteResult(msg)
                    collapseItems.map((item: any) => {
                        item.extra = <Spin indicator={<ExclamationCircleOutlined style={{color: "#EABB00"}} />} />
                    })
                    setCollapseItems([...collapseItems])
                }
                if (msg.nodeId && msg.status) {
                    collapseItems.map((item: any) => {
                        if (item.key == msg.nodeId) {
                            if (msg.status === 'start') {
                                item.extra = <Spin indicator={<LoadingOutlined/>}/>
                                item.children = ""
                            }
                            if (msg.status === 'end') {
                                item.extra = <Spin indicator={<CheckCircleOutlined style={{color: 'green'}} />} />
                                item.children = msg.res ?
                                    <div style={{wordWrap: "break-word",}}>
                                        <JsonView src={msg.res}/>
                                    </div> : ""
                            }
                            if (msg.status === 'nodeError') {
                                item.extra = <Spin indicator={<CloseCircleOutlined style={{color: 'red'}} />} />
                                item.children = <JsonView src={msg.errorMsg}/>
                            }
                        }
                    })
                    setCollapseItems([...collapseItems])
                }
            },
            onFinished: () => {
                setSubmitLoading(false)
            },
        })
    };
    const onFinishFailed = (errorInfo: any) => {
        setSubmitLoading(false)
        message.error("失败:" + errorInfo)
        //message.error("失败:" + errorInfo)
        console.log('Failed:', errorInfo);
    };
    const [changeNodeData, setChangeNodeData] = useState<any>()
    const handleChosen = (updateNodeData: any,  value: any) => {
    const handleChosen = (updateNodeData: any, value: any) => {
        if (value) {
            setSelectedItem([value])
        }
@@ -228,7 +308,7 @@
                        changeNodeData({
                            pluginId: '',
                            pluginName: '',
                            parameters:[],
                            parameters: [],
                            outputDefs: []
                        })
                        setSelectedItem([])
@@ -267,7 +347,7 @@
                                            if (file.status === 'done') {
                                                let url = file.response?.response.url;
                                                if (url.indexOf('http') < 0) {
                                                    url  = window.location.origin + url;
                                                    url = window.location.origin + url;
                                                }
                                                form.setFieldsValue({
                                                    [item.name]: url,
@@ -302,18 +382,26 @@
                </Form>
                <div>
                    <div>执行结果:</div>
                    <div style={{marginBottom: "10px"}}>执行结果:</div>
                    <div style={{
                        background: "#efefef",
                        padding: "10px",
                        height: "300px",
                        marginTop: "10px",
                        borderRadius: "7px",
                    }}>
                        <pre style={{whiteSpace: "pre-wrap",wordBreak: "break-all"}}>{executeResult || '暂无输出'}</pre>
                        padding: "20px",
                        backgroundColor: "#fafafa",
                        textAlign: "center",
                        wordWrap: "break-word",
                    }}
                    >
                        {executeResult ?
                            <JsonView src={executeResult}/> :
                            <Empty description={'暂无内容'} image={Empty.PRESENTED_IMAGE_SIMPLE}/>
                        }
                    </div>
                </div>
                <div style={{marginTop: "10px"}}>
                    <div>执行步骤:</div>
                    <div style={{marginTop: "10px"}}>
                        <Collapse items={collapseItems}/>
                    </div>
                </div>
            </Drawer>
            <Drawer
@@ -324,7 +412,7 @@
                onClose={onSingleRunClose}
                open={singleRunOpen}
            >
                <SingleRun ref={singleRunRef} workflowId={params.id} node={currentNode} />
            <SingleRun ref={singleRunRef} workflowId={params.id} node={currentNode} />
            </Drawer>
            <div style={{height: 'calc(100vh - 50px)', display: "flex"}} className={"agentsflow"}>
@@ -374,7 +462,7 @@
                                      }}
                                      style={{height: 'calc(100vh - 10px)'}} customNodes={customNodes}/>
                        </Spin>
                        : <div style={{padding: '20px'}}><Skeleton active /></div>
                        : <div style={{padding: '20px'}}><Skeleton active/></div>
                    }
                </div>