@ -1,14 +1,10 @@
// std
use std ::fs ;
use std ::time ::Instant ;
// third-party
use async_trait ::async_trait ;
use serde_json ::Value ;
use tracing ::{ debug , info } ;
use anyhow ::anyhow ;
// crate
use crate ::flow ::domain ::{ NodeDef , NodeId } ;
use crate ::flow ::engine ::eval_rhai_expr_json ;
use crate ::flow ::task ::Executor ;
@ -16,125 +12,127 @@ use crate::flow::task::Executor;
#[ derive(Default) ]
pub struct ScriptRhaiTask ;
/// 截断长字符串(去掉换行),用于日志预览
fn truncate_str ( s : & str , max : usize ) -> String {
let s = s . replace ( '\n' , " " ) . replace ( '\r' , " " ) ;
if s . len ( ) < = max { s } else { format! ( " {} … " , & s [ .. max ] ) }
let s = s . replace ( [ '\n' , '\r' ] , " " ) ;
if s . len ( ) < = max {
s
} else {
format! ( " {} … " , & s [ .. max ] )
}
}
/// 对比两个 JSON( 仅浅层) , 返回 (新增字段, 删除字段, 修改字段)
fn shallow_diff ( before : & Value , after : & Value ) -> ( Vec < String > , Vec < String > , Vec < String > ) {
use std ::collections ::BTreeSet ;
let mut added = Vec ::new ( ) ;
let mut removed = Vec ::new ( ) ;
let mut modified = Vec ::new ( ) ;
let ( Some ( bm ) , Some ( am ) ) = ( before . as_object ( ) , after . as_object ( ) ) else {
if before ! = after { modified . push ( " <root> " . to_string ( ) ) ; }
if before ! = after {
modified . push ( " <root> " . to_string ( ) ) ;
}
return ( added , removed , modified ) ;
} ;
let bkeys : BTreeSet < _ > = bm . keys ( ) . cloned ( ) . collect ( ) ;
let akeys : BTreeSet < _ > = am . keys ( ) . cloned ( ) . collect ( ) ;
for k in akeys . difference ( & bkeys ) { added . push ( ( * k ) . to_string ( ) ) ; }
for k in b keys. difference ( & a keys) { removed . push ( ( * k ) . to_string ( ) ) ; }
for k in a keys. difference ( & b keys) {
added . push ( k . to_string ( ) ) ;
}
for k in bkeys . difference ( & akeys ) {
removed . push ( k . to_string ( ) ) ;
}
for k in akeys . intersection ( & bkeys ) {
let key = ( * k ) . to_string ( ) ;
if bm . get ( & key ) ! = am . get ( & key ) { modified . push ( key ) ; }
if bm . get ( k ) ! = am . get ( k ) {
modified . push ( k. to_string ( ) ) ;
}
}
( added , removed , modified )
}
pub fn exec_rhai_file ( node_id : & NodeId , path : & str , ctx : & mut Value ) -> anyhow ::Result < ( ) > {
let start = Instant ::now ( ) ;
let code = match fs ::read_to_string ( path ) {
Ok ( s ) = > s ,
Err ( e ) = > {
info! ( target = " udmin.flow " , node = % node_id . 0 , err = % e . to_string ( ) , " script task: failed to read Rhai file " ) ;
return Err ( anyhow! ( " failed to read Rhai file: {} " , e ) ) ;
}
} ;
let script = code ;
/// 核心执行逻辑:运行 Rhai 脚本,返回更新后的 ctx
fn exec_rhai_code ( node_id : & NodeId , script : & str , ctx : & mut Value , source : & str ) -> anyhow ::Result < ( ) > {
if script . trim ( ) . is_empty ( ) {
info! ( target = " udmin.flow " , node = % node_id . 0 , " script task: empty Rhai file , skip " ) ;
info! ( target = " udmin.flow " , node = % node_id . 0 , source , " script_rhai task: empty script , skip " ) ;
return Ok ( ( ) ) ;
}
let preview = truncate_str ( & script , 200 ) ;
debug! ( targe t = " udmin.flow " , node = % node_id . 0 , preview = % preview , " script task: will execute Rhai file " ) ;
let s tart = Instant ::now ( ) ;
let preview = truncate_str ( script , 200 ) ;
debug! ( target = " udmin.flow " , node = % node_id . 0 , source , preview = % preview , " script_rhai task: will execute script " ) ;
let before_ctx = ctx . clone ( ) ;
let wrapped = format! ( " {{ {} ; ctx }} " , script ) ;
let res = eval_rhai_expr_json ( & wrapped , ctx ) ;
let dur_ms = start . el aps ed( ) . as_millis ( ) ;
match res {
match eval_rhai_expr_json ( & wr app ed, ctx ) {
Ok ( new_ctx ) = > {
let dur_ms = start . elapsed ( ) . as_millis ( ) ;
let ( added , removed , modified ) = shallow_diff ( & before_ctx , & new_ctx ) ;
* ctx = new_ctx ;
info! ( target = " udmin.flow " , node = % node_id . 0 , ms = % dur_ms , added = % added . len ( ) , removed = % removed . len ( ) , modified = % modified . len ( ) , " script task: Rhai file executed and ctx updated " ) ;
info! ( target = " udmin.flow " , node = % node_id . 0 , source , ms = % dur_ms , added = % added . len ( ) , removed = % removed . len ( ) , modified = % modified . len ( ) , " script_rhai task: executed and ctx updated " ) ;
if ! ( added . is_empty ( ) & & removed . is_empty ( ) & & modified . is_empty ( ) ) {
debug! ( target = " udmin.flow " , node = % node_id . 0 , ? added , ? removed , ? modified , " script task: ctx shallow diff " ) ;
}
}
Err ( err ) = > {
info! ( target = " udmin.flow " , node = % node_id . 0 , ms = % dur_ms , preview = % preview , err = % err . to_string ( ) , " script task: Rhai file execution failed, ctx unchanged " ) ;
return Err ( anyhow! ( " Rhai file execution failed: {} " , err ) ) ;
}
debug! ( target = " udmin.flow " , node = % node_id . 0 , source , ? added , ? removed , ? modified , " script_rhai task: ctx shallow diff " ) ;
}
Ok ( ( ) )
}
Err ( err ) = > {
let dur_ms = start . elapsed ( ) . as_millis ( ) ;
info! ( target = " udmin.flow " , node = % node_id . 0 , source , ms = % dur_ms , preview = % preview , err = % err . to_string ( ) , " script_rhai task: execution failed, ctx unchanged " ) ;
Err ( anyhow! ( " Rhai script execution failed: {} " , err ) )
}
}
}
/// 读取节点配置里的脚本文件路径
fn read_node_script_file ( ctx : & Value , node_id : & str ) -> Option < String > {
if let Some ( nodes ) = ctx . get ( " nodes " ) . and_then ( | v | v . as_object ( ) ) {
if let Some ( m ) = nodes . get ( node_id ) . and_then ( | v | v . get ( " scripts " ) ) . and_then ( | v | v . as_object ( ) ) {
return m . get ( " rhai " ) . and_then ( | v | v . as_str ( ) ) . map ( | s | s . to_string ( ) ) ;
}
}
None
ctx . get ( " nodes " )
. and_then ( | v | v . get ( node_id ) )
. and_then ( | n | n . get ( " scripts " ) )
. and_then ( | v | v . get ( " rhai " ) )
. and_then ( | v | v . as_str ( ) )
. map ( | s | s . to_string ( ) )
}
/// 读取节点配置里的 inline 脚本
fn read_node_inline_script ( ctx : & Value , node_id : & str ) -> Option < String > {
ctx . get ( " nodes " )
. and_then ( | nodes | nodes . get ( node_id ) )
. and_then ( | n | n . get ( " script " ) . or_else ( | | n . get ( " expr " ) ) )
. and_then ( | v | match v {
Value ::String ( s ) = > Some ( s . clone ( ) ) ,
Value ::Object ( m ) = > m
. get ( " script " )
. or_else ( | | m . get ( " expr " ) )
. and_then ( | x | x . as_str ( ) )
. map ( | s | s . to_string ( ) ) ,
_ = > None ,
} )
. or_else ( | | ctx . get ( " script " ) . and_then ( | v | v . as_str ( ) ) . map ( | s | s . to_string ( ) ) )
. or_else ( | | ctx . get ( " expr " ) . and_then ( | v | v . as_str ( ) ) . map ( | s | s . to_string ( ) ) )
}
#[ async_trait ]
impl Executor for ScriptRhaiTask {
async fn execute ( & self , node_id : & NodeId , _node : & NodeDef , ctx : & mut Value ) -> anyhow ::Result < ( ) > {
let start = Instant ::now ( ) ;
// 1) 文件脚本优先: nodes.<id>.scripts.rhai -> 直接执行文件
// 1) 优先执行文件脚本: nodes.<id>.scripts.rhai
if let Some ( path ) = read_node_script_file ( ctx , & node_id . 0 ) {
return exec_rhai_file ( node_id , & path , ctx ) ;
let code = fs ::read_to_string ( & path ) . map_err ( | e | {
info! ( target = " udmin.flow " , node = % node_id . 0 , err = % e . to_string ( ) , path , " script_rhai task: failed to read Rhai file " ) ;
anyhow! ( " failed to read Rhai file: {} " , e )
} ) ? ;
return exec_rhai_code ( node_id , & code , ctx , " file " ) ;
}
// 2) inline 脚本(支持 S tring 或 { script | expr })
let cfg : Option < String > = ctx . get ( " nodes " )
. and_then ( | nodes | nodes . get ( & node_id . 0 ) )
. and_then ( | n | n . get ( " script " ) . or_else ( | | n . get ( " expr " ) ) )
. and_then ( | v | match v { Value ::String ( s ) = > Some ( s . clone ( ) ) , Value ::Object ( m ) = > m . get ( " script " ) . or_else ( | | m . get ( " expr " ) ) . and_then ( | x | x . as_str ( ) ) . map ( | s | s . to_string ( ) ) , _ = > None } )
. or_else ( | | ctx . get ( " script " ) . and_then ( | v | v . as_str ( ) ) . map ( | s | s . to_string ( ) ) )
. or_else ( | | ctx . get ( " expr " ) . and_then ( | v | v . as_str ( ) ) . map ( | s | s . to_string ( ) ) ) ;
if let Some ( script ) = cfg {
if script . trim ( ) . is_empty ( ) {
info! ( target = " udmin.flow " , node = % node_id . 0 , " script_rhai task: empty inline script, skip " ) ;
return Ok ( ( ) ) ;
}
let script_preview = truncate_str ( & script , 200 ) ;
debug! ( target = " udmin.flow " , node = % node_id . 0 , preview = % script_preview , " script_rhai task: will execute Rhai inline script " ) ;
let before_ctx = ctx . clone ( ) ;
let wrapped = format! ( " {{ {} ; ctx }} " , script ) ;
let res = eval_rhai_expr_json ( & wrapped , ctx ) ;
let dur_ms = start . elapsed ( ) . as_millis ( ) ;
match res {
Ok ( new_ctx ) = > {
let ( added , removed , modified ) = shallow_diff ( & before_ctx , & new_ctx ) ;
* ctx = new_ctx ;
info! ( target = " udmin.flow " , node = % node_id . 0 , ms = % dur_ms , added = % added . len ( ) , removed = % removed . len ( ) , modified = % modified . len ( ) , " script_rhai task: inline executed and ctx updated " ) ;
if ! ( added . is_empty ( ) & & removed . is_empty ( ) & & modified . is_empty ( ) ) {
debug! ( target = " udmin.flow " , node = % node_id . 0 , ? added , ? removed , ? modified , " script_rhai task: ctx shallow diff " ) ;
}
}
Err ( err ) = > {
info! ( target = " udmin.flow " , node = % node_id . 0 , ms = % dur_ms , preview = % script_preview , err = % err . to_string ( ) , " script_rhai task: inline execution failed, ctx unchanged " ) ;
return Err ( anyhow! ( " Rhai inline execution failed: {} " , err ) ) ;
}
}
return Ok ( ( ) ) ;
// 2) 其次执行 inline 脚本(支持 s tring 或 {script| expr})
if let Some ( script ) = read_node_inline_script ( ctx , & node_id . 0 ) {
return exec_rhai_code ( node_id , & script , ctx , " inline " ) ;
}
// 3) 没有脚本 → 跳过
info! ( target = " udmin.flow " , node = % node_id . 0 , " script_rhai task: no script found, skip " ) ;
Ok ( ( ) )
}