use fathom_function::{forms::TableCellValue, tracing}; use pipeline_application::{ application::{ Application, AsF64, ContinuousScoreMatcher, IliComparisonOutput, SurfaceLocationCriteria as SLC, TwoSidedContinuousScorer, }, serialization::{serialize_meter, serialize_orientation_min}, units::{frequency::percent_per_year, velocity::millimeter_per_year}, }; use uom::si::f64::{Angle, Frequency, Length, Velocity}; use uuid::Uuid; #[fathom_function::function] async fn ili_compare_all(input: Input) -> Result { let mut output = Output::new(input.org_id, &input.project_id); let app = Application::new_from_compile_env(input.org_id, &input.project_id).unwrap(); for pipeline_id in input.pipeline_id { let result = app .ili_compare_all(pipeline_id, ContinuousScoreMatcher::from(&input.criteria)) .await .map_err(|err| { tracing::error!(%pipeline_id, ?err, "Error running comparison algorithm"); format!("{err:?}") })?; output.push_result(pipeline_id, result); } Ok(output) } #[derive(Debug, serde::Serialize, Default)] struct Output { // TODO: remove org_id, project_id, unique_pipeline_ids once function outputs from previous // nodes are all visible in downstream nodes (FTHM-11016). org_id: Uuid, unique_pipeline_ids: Vec, project_id: String, pipeline_ids: Vec, matched_ids: Vec, unmatched_ids: Vec, summary_ids: Vec, } impl Output { fn new(org_id: Uuid, project_id: impl ToString) -> Self { Self { org_id, project_id: project_id.to_string(), ..Default::default() } } fn push_result(&mut self, pipeline_id: Uuid, ili_comparison_result: Vec) { self.unique_pipeline_ids.push(pipeline_id); ili_comparison_result.into_iter().for_each(|res| { self.pipeline_ids.push(pipeline_id); self.matched_ids.push(res.matched_id); self.unmatched_ids.push(res.unmatched_id); self.summary_ids.push(res.summary_id); }); } } #[derive(Debug, serde::Deserialize)] struct Input { org_id: Uuid, project_id: String, pipeline_id: Vec, #[serde(flatten)] criteria: Criteria, } #[derive(Debug, serde::Deserialize)] struct Criteria { #[serde(with = "serialize_meter")] weld_location_threshold: Length, #[serde(with = "serialize_meter")] feature_location_threshold: Length, #[serde(with = "serialize_meter")] upstream_girth_threshold: Length, #[serde(with = "serialize_orientation_min")] orientation_threshold: Angle, anomaly_size: AnomalySizeCriteria, surface_location_criteria: SurfaceLocationCriteria, } #[derive(Debug, serde::Deserialize)] struct AnomalySizeCriteria { depth: SizeCriteria, length: SizeCriteria, } #[derive(Debug, serde::Deserialize)] struct SizeCriteria { left_width: T, center: T, right_width: T, } impl SizeCriteria { fn transform(&self, f: impl Fn(f64) -> T) -> Option> { Some(SizeCriteria { left_width: Option::::try_from(&self.left_width).ok()?.map(&f)?, center: Option::::try_from(&self.center).ok()?.map(&f)?, right_width: Option::::try_from(&self.right_width).ok()?.map(&f)?, }) } } impl From> for TwoSidedContinuousScorer where T: AsF64 + Default, { fn from(value: SizeCriteria) -> Self { TwoSidedContinuousScorer::default() .with_left_width(value.left_width) .with_center(value.center) .with_right_width(value.right_width) } } #[derive(Debug, Clone, Copy, serde::Deserialize)] #[serde(rename_all = "snake_case")] enum SurfaceLocationCriteria { Matching, Any, } impl From for SLC { fn from(value: SurfaceLocationCriteria) -> Self { match value { SurfaceLocationCriteria::Matching => Self::Matching, SurfaceLocationCriteria::Any => Self::Any, } } } impl From<&Criteria> for ContinuousScoreMatcher { fn from(value: &Criteria) -> Self { let length = value .anomaly_size .length .transform(Velocity::new::) .map(TwoSidedContinuousScorer::from); let depth = value .anomaly_size .depth .transform(Frequency::new::) .map(TwoSidedContinuousScorer::from); ContinuousScoreMatcher::default() .with_weld_location_threshold(value.weld_location_threshold) .with_feature_location_threshold(value.feature_location_threshold) .with_upstream_girth_threshold(value.upstream_girth_threshold) .with_orientation_threshold(value.orientation_threshold) .with_depth_scorer(depth) .with_length_scorer(length) .with_surface_location_criteria(value.surface_location_criteria) } }