Files
udmin/frontend/src/flows/services/custom-service.ts
ayou 3362268575 feat(flow): 改进 Rhai 脚本执行错误处理和前后端代码节点映射
- 修改 eval_rhai_expr_json 返回 Result 以提供错误信息
- 统一使用 unwrap_or_else 处理 Rhai 表达式执行错误
- 前后端代码节点类型映射支持 JavaScript 和 Rhai 语言
- 前端代码编辑器添加语言选择器
- 优化 WebSocket 错误处理和关闭逻辑
2025-09-22 06:52:22 +08:00

270 lines
9.1 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
* SPDX-License-Identifier: MIT
*/
import { injectable, inject } from '@flowgram.ai/free-layout-editor';
import {
FreeLayoutPluginContext,
SelectionService,
Playground,
WorkflowDocument,
} from '@flowgram.ai/free-layout-editor';
import { Toast } from '@douyinfe/semi-ui';
import { I18n } from '@flowgram.ai/free-layout-editor';
import api, { type ApiResp } from '../../utils/axios';
import { stringifyFlowDoc } from '../utils/yaml';
import { postSSE } from '../../utils/sse';
import { getToken } from '../../utils/token'
interface RunResult { ok: boolean; ctx: any; logs: string[] }
// 与后端 StreamEvent 保持一致serde(tag = "type")
export type StreamEvent =
| { type: 'node'; node_id?: string; ctx?: any; logs?: string[] }
| { type: 'done'; ok: boolean; ctx: any; logs: string[] }
| { type: 'error'; message: string };
// 兼容 BrowserRouter 与 HashRouter优先从 search 获取,若无则从 hash 的查询串中获取
function getFlowIdFromUrl(): string {
const searchId = new URLSearchParams(window.location.search).get('id');
if (searchId) return searchId;
const hash = window.location.hash || '';
const qIndex = hash.indexOf('?');
if (qIndex >= 0) {
const qs = hash.substring(qIndex + 1);
const hashId = new URLSearchParams(qs).get('id');
if (hashId) return hashId;
}
return '';
}
// 新增:针对后端的 design_json 兼容性转换
// - 将前端 UI 的 type: 'code' 按语言映射javascript -> 'javascript'rhai -> 'script'(后端映射到 script_rhai
// - 其余字段保持不变
function transformDesignJsonForBackend(json: any): any {
try {
const clone = JSON.parse(JSON.stringify(json));
clone.nodes = (clone.nodes || []).map((n: any) => {
if (n && n.type === 'code') {
const lang = n?.data?.script?.language;
if (lang === 'rhai') {
return { ...n, type: 'script' };
}
// 默认或显式 javascript
return { ...n, type: 'javascript' };
}
return n;
});
return clone;
} catch {
return json;
}
}
@injectable()
export class CustomService {
@inject(FreeLayoutPluginContext) ctx!: FreeLayoutPluginContext;
@inject(SelectionService) selectionService!: SelectionService;
@inject(Playground) playground!: Playground;
@inject(WorkflowDocument) document!: WorkflowDocument;
// 新增可选参数,用于静默保存时不弹出 Toast并返回是否保存成功
async save(opts?: { silent?: boolean }): Promise<boolean> {
const silent = !!opts?.silent;
try {
const id = getFlowIdFromUrl();
if (!id) {
if (!silent) Toast.error(I18n.t('Flow ID is missing, cannot save'));
return false;
}
const json = this.document.toJSON() as any;
const yaml = stringifyFlowDoc(json);
// 使用转换后的 design_json以便后端根据语言选择正确的执行器
const designForBackend = transformDesignJsonForBackend(json);
const design_json = JSON.stringify(designForBackend);
const { data } = await api.put<ApiResp<{ saved: boolean }>>(`/flows/${id}`, { yaml, design_json });
if (data?.code === 0) {
if (!silent) Toast.success(I18n.t('Saved'));
try {
const key = (() => {
const hash = window.location.hash || '';
if (hash.startsWith('#/')) {
return hash.slice(1);
}
return window.location.pathname + (window.location.search || '');
})();
window.dispatchEvent(new CustomEvent('flows:doc-dirty', { detail: { key, dirty: false } }));
} catch {}
return true;
} else {
const msg = data?.message || I18n.t('Save failed');
if (!silent) Toast.error(msg);
return false;
}
} catch (e: any) {
const msg = e?.message || I18n.t('Save failed');
if (!silent) Toast.error(msg);
return false;
}
}
async run(input: any = {}) {
try {
const id = getFlowIdFromUrl();
if (!id) {
Toast.error(I18n.t('Flow ID is missing, cannot run'));
return null;
}
const { data } = await api.post<ApiResp<RunResult>>(`/flows/${id}/run`, { input });
if (data?.code === 0) {
return data.data;
}
throw new Error(data?.message || I18n.t('Run failed'));
} catch (e: any) {
Toast.error(e?.message || I18n.t('Run failed'));
return null;
}
}
// 新增WebSocket 流式运行
runStreamWS(
input: any = {},
handlers?: {
onNode?: (e: StreamEvent & { type: 'node' }) => void;
onDone?: (e: StreamEvent & { type: 'done' }) => void;
onError?: (e: StreamEvent & { type: 'error' }) => void;
onFatal?: (err: Error) => void;
}
) {
const id = getFlowIdFromUrl();
if (!id) {
const err = new Error(I18n.t('Flow ID is missing, cannot run'));
handlers?.onFatal?.(err);
return { cancel: () => {}, done: Promise.resolve<RunResult | null>(null) } as const;
}
// 构造 WS URL
const base = (api.defaults.baseURL || '') as string; // 可能是 /api 或 http(s)://host/api
// 解析出 API 的路径前缀(用于生产环境相对路径),默认 /api
let apiPathPrefix = '/api';
if (base.startsWith('http://') || base.startsWith('https://')) {
try {
const u = new URL(base);
apiPathPrefix = (u.pathname.replace(/\/$/, '') || '/api');
} catch {}
} else if (base.startsWith('/')) {
apiPathPrefix = (base.replace(/\/$/, '') || '/api');
}
const path = `/flows/${id}/run/ws`;
// 取 token 放到查询参数WS 握手无法自定义 Authorization 头部)
const token = getToken();
const isDev = !!((import.meta as any).env?.DEV);
// 开发:走 /ws 前缀(由 Vite 代理到 8855 并 rewrite 到 /api
// 生产:走同域 /api 前缀,由 Nginx/网关反代,不显式暴露端口
const prefix = isDev ? '/ws' : apiPathPrefix;
const qs = token ? `?access_token=${encodeURIComponent(token)}` : '';
// 始终构造绝对 WS URL避免浏览器兼容性问题
const loc = window.location;
const proto = loc.protocol === 'https:' ? 'wss:' : 'ws:';
const origin = `${proto}//${loc.host}`; // host 里已包含端口(开发时 5173线上通常无端口
const wsUrl = `${origin}${prefix}${path}${qs}`;
let ws: WebSocket | null = null;
let resolveDone: (v: RunResult | null) => void;
let rejectDone: (e: any) => void;
const done = new Promise<RunResult | null>((resolve, reject) => { resolveDone = resolve; rejectDone = reject; });
let finished = false;
try {
ws = new WebSocket(wsUrl);
} catch (e: any) {
handlers?.onFatal?.(e);
return { cancel: () => {}, done: Promise.resolve<RunResult | null>(null) } as const;
}
ws.onopen = () => {
try {
ws?.send(JSON.stringify({ input }));
} catch (e: any) {
handlers?.onFatal?.(e);
}
};
ws.onmessage = (ev: MessageEvent) => {
try {
const data = typeof ev.data === 'string' ? ev.data : '' + ev.data;
const evt = JSON.parse(data) as StreamEvent;
if (evt.type === 'node') {
handlers?.onNode?.(evt as any);
return;
}
if (evt.type === 'error') {
handlers?.onError?.(evt as any);
return;
}
if (evt.type === 'done') {
finished = true;
handlers?.onDone?.(evt as any);
resolveDone({ ok: evt.ok, ctx: (evt as any).ctx, logs: (evt as any).logs });
ws?.close();
return;
}
} catch (e: any) {
// 忽略解析错误为致命,仅记录
console.warn('WS message parse error', e);
}
};
ws.onerror = (ev: Event) => {
if (!finished) {
handlers?.onFatal?.(new Error('WebSocket error'));
rejectDone(new Error('WebSocket error'));
}
};
ws.onclose = () => {
if (!finished) {
handlers?.onFatal?.(new Error('WebSocket closed'));
rejectDone(new Error('WebSocket closed'));
}
};
return {
cancel: () => { try { ws?.close(); } catch {} },
done,
} as const;
}
// SSE 方案保留
runStream(input: any = {}, handlers?: { onNode?: (e: StreamEvent & { type: 'node' }) => void; onDone?: (e: StreamEvent & { type: 'done' }) => void; onError?: (e: StreamEvent & { type: 'error' }) => void; onFatal?: (err: Error) => void; }) {
const id = getFlowIdFromUrl();
if (!id) {
const err = new Error(I18n.t('Flow ID is missing, cannot run'));
handlers?.onFatal?.(err);
return { cancel: () => {}, done: Promise.resolve<RunResult | null>(null) } as const;
}
const res = postSSE(`/flows/${id}/run/stream`, { input }, {
onMessage: (evt: StreamEvent) => {
try {
const e = evt;
if (e.type === 'node') { handlers?.onNode?.(e as any); return; }
if (e.type === 'error') { handlers?.onError?.(e as any); return; }
if (e.type === 'done') { handlers?.onDone?.(e as any); return; }
} catch (err: unknown) {
// ignore
}
},
onFatal: (err: Error) => handlers?.onFatal?.(err),
});
return res;
}
}