From 35fc26f96c35c19d76dd3881a97d668edc4191e9 Mon Sep 17 00:00:00 2001 From: David Cameron Date: Mon, 6 Apr 2026 13:27:29 -0400 Subject: [PATCH] Add batch mode for processing multiple inputs via JSONL --- src/main.rs | 191 +++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 174 insertions(+), 17 deletions(-) diff --git a/src/main.rs b/src/main.rs index cd5f41a1..06675203 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ use wasmtime::Module; use std::{ fs::File, - io::{stdin, BufReader, Read}, + io::{stdin, BufRead, BufReader, Read}, path::PathBuf, }; @@ -61,6 +61,14 @@ struct Opts { /// Path to graphql file containing Function input query; if omitted, defaults will be used to calculate limits. #[clap(short = 'q', long)] query_path: Option, + + /// Enable batch mode - read multiple JSON inputs (one per line) from stdin/file + #[clap(short, long)] + batch: bool, + + /// In batch mode, continue processing on individual input errors (default: false) + #[clap(long)] + batch_continue_on_error: bool, } impl Opts { @@ -114,6 +122,31 @@ fn read_file_to_string(file_path: &PathBuf) -> Result { fn main() -> Result<()> { let opts: Opts = Opts::parse(); + // Create engine and module once (expensive operations - amortize across all inputs) + let engine = function_runner::engine::new_engine()?; + let module = Module::from_file(&engine, &opts.function) + .map_err(|e| anyhow!("Couldn't load the Function {:?}: {}", &opts.function, e))?; + + // Infer codec from the module based on imported modules + let codec = if function_runner::engine::uses_msgpack_provider(&module) { + Codec::Messagepack + } else { + Codec::Json + }; + + if opts.batch { + run_batch_mode(&opts, &engine, &module, codec) + } else { + run_single_mode(&opts, &engine, &module, codec) + } +} + +fn run_single_mode( + opts: &Opts, + engine: &wasmtime::Engine, + module: &Module, + codec: Codec, +) -> Result<()> { let mut input: Box = if let Some(ref input) = opts.input { Box::new(BufReader::new(File::open(input).map_err(|e| { anyhow!("Couldn't load input {:?}: {}", input, e) @@ -130,20 +163,8 @@ fn main() -> Result<()> { input.read_to_end(&mut buffer)?; let schema_string = opts.read_schema_to_string().transpose()?; - let query_string = opts.read_query_to_string().transpose()?; - let engine = function_runner::engine::new_engine()?; - let module = Module::from_file(&engine, &opts.function) - .map_err(|e| anyhow!("Couldn't load the Function {:?}: {}", &opts.function, e))?; - - // Infer codec from the module based on imported modules - let codec = if function_runner::engine::uses_msgpack_provider(&module) { - Codec::Messagepack - } else { - Codec::Json - }; - let input = BytesContainer::new(BytesContainerType::Input, codec, buffer)?; let scale_factor = if let (Some(schema_string), Some(query_string), Some(json_value)) = (schema_string, query_string, input.json_value.clone()) @@ -156,19 +177,19 @@ fn main() -> Result<()> { &json_value, )? } else { - DEFAULT_SCALE_FACTOR // Use default scale factor when schema or query is missing + DEFAULT_SCALE_FACTOR }; let profile_opts = opts.profile_opts(); let function_run_result = run(FunctionRunParams { - function_path: opts.function, + function_path: opts.function.clone(), input, export: opts.export.as_ref(), profile_opts: profile_opts.as_ref(), scale_factor, - module, - engine, + module: module.clone(), + engine: engine.clone(), })?; if opts.json { @@ -187,3 +208,139 @@ fn main() -> Result<()> { anyhow::bail!("The Function execution failed. Review the logs for more information.") } } + +fn run_batch_mode( + opts: &Opts, + engine: &wasmtime::Engine, + module: &Module, + codec: Codec, +) -> Result<()> { + let input_reader: Box = if let Some(ref input) = opts.input { + Box::new(BufReader::new(File::open(input).map_err(|e| { + anyhow!("Couldn't load input {:?}: {}", input, e) + })?)) + } else if !std::io::stdin().is_terminal() { + Box::new(BufReader::new(stdin())) + } else { + return Err(anyhow!( + "You must provide input via the --input flag or piped via stdin." + )); + }; + + // Pre-calculate scale factor (constant across all inputs in batch) + let schema_string = opts.read_schema_to_string().transpose()?; + let query_string = opts.read_query_to_string().transpose()?; + + // Disable profiling in batch mode for performance + let profile_opts = None; + + let mut line_num = 0; + let mut success_count = 0; + let mut error_count = 0; + + for line_result in input_reader.lines() { + line_num += 1; + + let line = match line_result { + Ok(l) => l, + Err(e) => { + error_count += 1; + if opts.batch_continue_on_error { + eprintln!("Error reading line {}: {}", line_num, e); + println!(r#"{{"success":false,"error":"Error reading input: {}"}}"#, e); + continue; + } else { + return Err(e.into()); + } + } + }; + + // Skip empty lines + if line.trim().is_empty() { + continue; + } + + // Parse input + let input = match BytesContainer::new( + BytesContainerType::Input, + codec, + line.into_bytes(), + ) { + Ok(i) => i, + Err(e) => { + error_count += 1; + if opts.batch_continue_on_error { + eprintln!("Error parsing line {}: {}", line_num, e); + println!(r#"{{"success":false,"error":"Invalid JSON input: {}"}}"#, e); + continue; + } else { + return Err(e); + } + } + }; + + // Calculate scale factor for this input + let scale_factor = if let (Some(ref schema_string), Some(ref query_string), Some(ref json_value)) = + (&schema_string, &query_string, &input.json_value) + { + match BluejaySchemaAnalyzer::analyze_schema_definition( + schema_string, + opts.schema_path.as_ref().and_then(|p| p.to_str()), + query_string, + opts.query_path.as_ref().and_then(|p| p.to_str()), + json_value, + ) { + Ok(sf) => sf, + Err(e) => { + error_count += 1; + if opts.batch_continue_on_error { + eprintln!("Error analyzing schema for line {}: {}", line_num, e); + println!(r#"{{"success":false,"error":"Schema analysis failed: {}"}}"#, e); + continue; + } else { + return Err(e); + } + } + } + } else { + DEFAULT_SCALE_FACTOR + }; + + // Run function (reusing engine/module!) + let result = run(FunctionRunParams { + function_path: opts.function.clone(), + input, + export: opts.export.as_ref(), + profile_opts, + scale_factor, + module: module.clone(), + engine: engine.clone(), + }); + + // Output result immediately (streaming JSONL - compact format for line-by-line parsing) + match result { + Ok(function_result) => { + success_count += 1; + // Use compact JSON (not pretty-printed) for JSONL format + let compact_json = serde_json::to_string(&function_result) + .unwrap_or_else(|error| error.to_string()); + println!("{}", compact_json); + } + Err(e) => { + error_count += 1; + if opts.batch_continue_on_error { + eprintln!("Error executing line {}: {}", line_num, e); + println!(r#"{{"success":false,"error":"Execution failed: {}"}}"#, e); + } else { + return Err(e); + } + } + } + } + + // Log summary to stderr (so it doesn't interfere with JSONL output on stdout) + eprintln!("Batch complete: {} inputs processed, {} successful, {} errors", + line_num, success_count, error_count); + + Ok(()) +}