aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/controller/AiWorkflowController.java
@@ -2,18 +2,25 @@ import cn.dev33.satoken.annotation.SaIgnore; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.exceptions.ExceptionUtil; import cn.hutool.core.io.IoUtil; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.StrUtil; import com.agentsflex.core.chain.*; import com.agentsflex.core.chain.event.ChainStatusChangeEvent; import com.agentsflex.core.chain.event.NodeEndEvent; import com.agentsflex.core.chain.event.NodeStartEvent; import com.alibaba.fastjson.JSONObject; import dev.tinyflow.core.Tinyflow; import dev.tinyflow.core.parser.NodeParser; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; import org.springframework.web.multipart.MultipartFile; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import tech.aiflowy.ai.entity.AiWorkflow; import tech.aiflowy.ai.service.AiLlmService; import tech.aiflowy.ai.service.AiWorkflowService; import tech.aiflowy.common.ai.MySseEmitter; import tech.aiflowy.common.domain.Result; import tech.aiflowy.common.web.controller.BaseCurdController; import tech.aiflowy.common.web.jsonbody.JsonBody; @@ -21,6 +28,7 @@ import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.InputStream; import java.math.BigInteger; import java.nio.charset.StandardCharsets; @@ -91,31 +99,96 @@ AiWorkflow workflow = service.getById(id); if (workflow == null) { return Result.fail(1, "can not find the workflow by id: " + id); return Result.fail(1, "工作流不存在"); } Tinyflow tinyflow = workflow.toTinyflow(); Chain chain = tinyflow.toChain(); chain.addEventListener(new ChainEventListener() { @Override public void onEvent(ChainEvent event, Chain chain) { System.out.println("onEvent : " + event); } }); chain.addOutputListener(new ChainOutputListener() { @Override public void onOutput(Chain chain, ChainNode node, Object outputMessage) { System.out.println("output : " + node.getId() + " : " + outputMessage); } }); Map<String, Object> result = chain.executeForResult(variables); return Result.success("result", result).set("message", chain.getMessage()); } @PostMapping("tryRunningStream") public SseEmitter tryRunningStream( @JsonBody(value = "id", required = true) BigInteger id, @JsonBody("variables") Map<String, Object> variables, HttpServletResponse response) { response.setContentType("text/event-stream"); MySseEmitter emitter = new MySseEmitter((long) (1000 * 60 * 10)); AiWorkflow workflow = service.getById(id); if (workflow == null) { throw new RuntimeException("工作流不存在"); } Tinyflow tinyflow = workflow.toTinyflow(); Chain chain = tinyflow.toChain(); JSONObject json = new JSONObject(); chain.addEventListener(new ChainEventListener() { @Override public void onEvent(ChainEvent event, Chain chain) { if (event instanceof NodeStartEvent) { JSONObject content = new JSONObject(); ChainNode node = ((NodeStartEvent) event).getNode(); content.put("nodeId", node.getId()); content.put("status", "start"); json.put("content", content); emitter.send(json.toJSONString()); } if (event instanceof NodeEndEvent) { ChainNode node = ((NodeEndEvent) event).getNode(); Map<String, Object> result = ((NodeEndEvent) event).getResult(); JSONObject content = new JSONObject(); content.put("nodeId", node.getId()); content.put("status", "end"); content.put("res", result); json.put("content", content); emitter.send(json.toJSONString()); } if (event instanceof ChainStatusChangeEvent) { ChainStatus status = ((ChainStatusChangeEvent) event).getStatus(); if (ChainStatus.FINISHED_ABNORMAL.equals(status)) { String message = chain.getMessage(); JSONObject content = new JSONObject(); content.put("status", "error"); content.put("errorMsg", message); json.put("content", content); emitter.sendAndComplete(json.toJSONString()); } } } }); chain.addNodeErrorListener(new NodeErrorListener() { @Override public void onError(Throwable e, ChainNode node, Map<String, Object> map, Chain chain) { String message = ExceptionUtil.getRootCauseMessage(e); JSONObject content = new JSONObject(); content.put("nodeId", node.getId()); content.put("status", "nodeError"); content.put("errorMsg", message); json.put("content", content); emitter.sendAndComplete(json.toJSONString()); } }); ThreadUtil.execAsync(() -> { Map<String, Object> result = chain.executeForResult(variables); JSONObject content = new JSONObject(); content.put("execResult", result); json.put("content", content); emitter.sendAndComplete(json.toJSONString()); }); return emitter; } @SaIgnore @GetMapping(value = "/external/getRunningParams", produces = MediaType.APPLICATION_JSON_VALUE) @ResponseBody aiflowy-modules/aiflowy-module-ai/src/main/java/tech/aiflowy/ai/node/MakeFileNode.java
@@ -31,7 +31,6 @@ @Override protected Map<String, Object> execute(Chain chain) { System.out.println("===========make file=============="); Map<String, Object> map = chain.getParameterValues(this); Map<String, Object> res = new HashMap<>(); aiflowy-ui-react/src/libs/workflowUtil.tsx
@@ -63,4 +63,80 @@ </> ) } export function sortNodes(nodesJson: any): any[] { const { nodes, edges } = nodesJson; // 创建数据结构 const nodeMap: any = {}; const graph: any = {}; const inDegree: any = {}; // 初始化 nodes.forEach((node: any) => { const nodeId = node.id; nodeMap[nodeId] = { key: nodeId, label: node.data?.title || nodeId, original: node, children: "", extra: "" }; graph[nodeId] = []; inDegree[nodeId] = 0; }); // 处理参数依赖 nodes.forEach((node:any) => { const parameters = node.data?.parameters || []; parameters.forEach((param:any) => { if (param.ref) { const [sourceNodeId] = param.ref.split('.'); if (nodeMap[sourceNodeId]) { graph[sourceNodeId].push(node.id); inDegree[node.id]++; } } }); }); // 处理边依赖 edges.forEach((edge:any) => { const { source, target } = edge; if (nodeMap[source] && nodeMap[target]) { graph[source].push(target); inDegree[target]++; } }); // 拓扑排序 const queue = nodes.filter((node:any) => inDegree[node.id] === 0).map((node:any) => node.id); const sortedNodes = []; while (queue.length) { const nodeId = queue.shift(); sortedNodes.push(nodeMap[nodeId]); graph[nodeId].forEach((neighborId:any) => { inDegree[neighborId]--; if (inDegree[neighborId] === 0) { queue.push(neighborId); } }); } // 检查循环依赖 if (sortedNodes.length !== nodes.length) { console.error('检测到循环依赖,排序结果可能不完整'); } // 只返回需要的格式 return sortedNodes.map(node => ({ key: node.key, label: node.label, original: node.original, children: node.children, extra: node.extra })); } aiflowy-ui-react/src/pages/ai/workflowDesign/WorkflowDesign.tsx
@@ -1,9 +1,17 @@ import {useEffect, useRef, useState} from 'react'; // @ts-ignore import React, {useEffect, useRef, useState} from 'react'; import {useLayout} from '../../../hooks/useLayout.tsx'; import {App, Button, Drawer, Form, Input, Skeleton, Spin} from "antd"; import {App, Button, Collapse, Drawer, Empty, Form, Input, Skeleton, Spin} from "antd"; import {useParams} from "react-router-dom"; import {useDetail, useGet, useGetManual, usePostManual, useUpdate} from "../../../hooks/useApis.ts"; import {FormOutlined, SendOutlined, UploadOutlined} from "@ant-design/icons"; import {useDetail, useGet, useGetManual, useUpdate} from "../../../hooks/useApis.ts"; import { CheckCircleOutlined, CloseCircleOutlined, ExclamationCircleOutlined, FormOutlined, LoadingOutlined, SendOutlined, UploadOutlined } from "@ant-design/icons"; import {Tinyflow, TinyflowHandle} from '@tinyflow-ai/react'; import '@tinyflow-ai/react/dist/index.css' import {Uploader} from "../../../components/Uploader"; @@ -11,6 +19,9 @@ import {PluginNode} from './customNode/pluginNode.ts' import {PluginTools} from "../botDesign/PluginTools.tsx"; import {SingleRun} from "./SingleRun.tsx"; import JsonView from "react18-json-view"; import {useSse} from "../../../hooks/useSse.ts"; import {sortNodes} from "../../../libs/workflowUtil"; export const WorkflowDesign = () => { @@ -25,12 +36,29 @@ const [workflowData, setWorkflowData] = useState<any>({}) const [saveLoading, setSaveLoading] = useState(false); const [runLoading, setRunLoading] = useState(false); const [collapseItems, setCollapseItems] = useState<any[]>([]) const {doGet: getWorkflowInfo} = useGetManual("/api/v1/aiWorkflow/detail") const getNodesInfo = (workflowId: any) => { setCollapseItems([]) setRunLoading(true) getWorkflowInfo({ params: { id: workflowId, } }).then(res => { setRunLoading(false) const nodeJson = JSON.parse(res.data?.data.content); setCollapseItems(sortNodes(nodeJson)) }) } // 添加 useEffect 监听 workflow 变化 useEffect(() => { if (workflow?.data?.content) { try { setWorkflowData(JSON.parse(workflow.data.content)) const content = JSON.parse(workflow.data.content); setWorkflowData(content) } catch (e) { setWorkflowData({}) } @@ -109,7 +137,7 @@ const {doGet: getRunningParameters} = useGetManual("/api/v1/aiWorkflow/getRunningParameters"); const {doPost: tryRunning} = usePostManual("/api/v1/aiWorkflow/tryRunning"); //const {doPost: tryRunning} = usePostManual("/api/v1/aiWorkflow/tryRunning"); const [selectedItem, setSelectedItem] = useState<any>([]); const showRunningParameters = async () => { setRunLoading(true) @@ -130,13 +158,14 @@ } setRunLoading(false) }) getNodesInfo(params.id) } const [open, setOpen] = useState(false); const [singleRunOpen, setSingleRunOpen] = useState(false); const singleRunRef = useRef<any>(null) const [currentNode, setCurrentNode] = useState<any>(null) const [currentNode, setCurrentNode] = useState<any>(null) const showDrawer = () => { setOpen(true); @@ -146,37 +175,88 @@ setOpen(false); }; const onSingleRunClose = () => { const onSingleRunClose = () => { setSingleRunOpen(false); singleRunRef.current.resetForm() }; const {start: runWithStream} = useSse("/api/v1/aiWorkflow/tryRunningStream"); const onFinish = (values: any) => { //console.log('submit', values) setSubmitLoading(true) tryRunning({ // tryRunning({ // data: { // id: params.id, // variables: values // } // }).then((resp) => { // if (resp.data.errorCode === 0) { // message.success("成功") // } // setSubmitLoading(false) // setExecuteResult(JSON.stringify(resp.data)) // }) collapseItems.map((item: any) => { item.extra = "" item.children = "" }) setCollapseItems([...collapseItems]) runWithStream({ data: { id: params.id, variables: values } }).then((resp) => { if (resp.data.errorCode === 0) { message.success("成功") } setSubmitLoading(false) setExecuteResult(JSON.stringify(resp.data)) }, onMessage: (msg: any) => { //console.log(msg) if (msg.execResult) { setExecuteResult(msg.execResult) } if (msg.status === 'error') { setExecuteResult(msg) collapseItems.map((item: any) => { item.extra = <Spin indicator={<ExclamationCircleOutlined style={{color: "#EABB00"}} />} /> }) setCollapseItems([...collapseItems]) } if (msg.nodeId && msg.status) { collapseItems.map((item: any) => { if (item.key == msg.nodeId) { if (msg.status === 'start') { item.extra = <Spin indicator={<LoadingOutlined/>}/> item.children = "" } if (msg.status === 'end') { item.extra = <Spin indicator={<CheckCircleOutlined style={{color: 'green'}} />} /> item.children = msg.res ? <div style={{wordWrap: "break-word",}}> <JsonView src={msg.res}/> </div> : "" } if (msg.status === 'nodeError') { item.extra = <Spin indicator={<CloseCircleOutlined style={{color: 'red'}} />} /> item.children = <JsonView src={msg.errorMsg}/> } } }) setCollapseItems([...collapseItems]) } }, onFinished: () => { setSubmitLoading(false) }, }) }; const onFinishFailed = (errorInfo: any) => { setSubmitLoading(false) message.error("失败:" + errorInfo) //message.error("失败:" + errorInfo) console.log('Failed:', errorInfo); }; const [changeNodeData, setChangeNodeData] = useState<any>() const handleChosen = (updateNodeData: any, value: any) => { const handleChosen = (updateNodeData: any, value: any) => { if (value) { setSelectedItem([value]) } @@ -228,7 +308,7 @@ changeNodeData({ pluginId: '', pluginName: '', parameters:[], parameters: [], outputDefs: [] }) setSelectedItem([]) @@ -267,7 +347,7 @@ if (file.status === 'done') { let url = file.response?.response.url; if (url.indexOf('http') < 0) { url = window.location.origin + url; url = window.location.origin + url; } form.setFieldsValue({ [item.name]: url, @@ -302,18 +382,26 @@ </Form> <div> <div>执行结果:</div> <div style={{marginBottom: "10px"}}>执行结果:</div> <div style={{ background: "#efefef", padding: "10px", height: "300px", marginTop: "10px", borderRadius: "7px", }}> <pre style={{whiteSpace: "pre-wrap",wordBreak: "break-all"}}>{executeResult || '暂无输出'}</pre> padding: "20px", backgroundColor: "#fafafa", textAlign: "center", wordWrap: "break-word", }} > {executeResult ? <JsonView src={executeResult}/> : <Empty description={'暂无内容'} image={Empty.PRESENTED_IMAGE_SIMPLE}/> } </div> </div> <div style={{marginTop: "10px"}}> <div>执行步骤:</div> <div style={{marginTop: "10px"}}> <Collapse items={collapseItems}/> </div> </div> </Drawer> <Drawer @@ -324,7 +412,7 @@ onClose={onSingleRunClose} open={singleRunOpen} > <SingleRun ref={singleRunRef} workflowId={params.id} node={currentNode} /> <SingleRun ref={singleRunRef} workflowId={params.id} node={currentNode} /> </Drawer> <div style={{height: 'calc(100vh - 50px)', display: "flex"}} className={"agentsflow"}> @@ -374,7 +462,7 @@ }} style={{height: 'calc(100vh - 10px)'}} customNodes={customNodes}/> </Spin> : <div style={{padding: '20px'}}><Skeleton active /></div> : <div style={{padding: '20px'}}><Skeleton active/></div> } </div>