reactive_graph_std_connector/behaviour/relation/complex_connector/
function.rs

1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::thread;
4use std::time::Duration;
5
6use reactive_graph_behaviour_model_impl::relation::RelationBehaviourFactoryCreator;
7use reactive_graph_behaviour_model_impl::relation::function::RelationBehaviourFunctions;
8use reactive_graph_behaviour_model_impl::relation::function::RelationBehaviourFunctionsStorage;
9use reactive_graph_graph::PropertyInstanceGetter;
10use reactive_graph_graph::PropertyInstanceSetter;
11use reactive_graph_reactive_model_impl::ReactiveRelation;
12use serde_json::Value;
13use serde_json::json;
14
15use reactive_graph_std_connector_model::BufferProperties::BUFFER;
16use reactive_graph_std_connector_model::BufferProperties::BUFFER_SIZE;
17use reactive_graph_std_connector_model::NAMESPACE_CONNECTOR;
18
19use crate::behaviour::relation::complex_connector::ComplexConnectorFactory;
20
21pub type ComplexConnectorFunction = fn(Value, String, ReactiveRelation);
22
23pub const FN_DELAY_CONNECTOR: ComplexConnectorFunction = |new_value, inbound_property_name, relation_instance| {
24    let delay_in_millis = relation_instance.get("delay").and_then(|v| v.as_u64()).unwrap_or(10);
25    thread::sleep(Duration::from_millis(delay_in_millis));
26    relation_instance.inbound.set(inbound_property_name, new_value);
27};
28
29pub const FN_DEBOUNCE_CONNECTOR: ComplexConnectorFunction = |new_value, inbound_property_name, relation_instance| {
30    if let Some(old_value) = relation_instance.inbound.get(inbound_property_name.clone()) {
31        if old_value != new_value {
32            // Only update inbound if inbound value not equals the outbound value
33            relation_instance.inbound.set(inbound_property_name, new_value);
34        }
35    }
36};
37
38pub const FN_BUFFERED_FIFO_CONNECTOR: ComplexConnectorFunction = |new_value, inbound_property_name, relation_instance| {
39    let buffer_size = relation_instance
40        .get(BUFFER_SIZE)
41        .and_then(|v| v.as_u64())
42        .and_then(|v| usize::try_from(v).ok())
43        .unwrap_or(10);
44    let mut buffer = relation_instance.get(BUFFER).and_then(|v| v.as_array().cloned()).unwrap_or_default();
45    buffer.insert(0, new_value);
46    // Only update inbound if FIFO is full
47    while buffer.len() > buffer_size {
48        if let Some(v) = buffer.pop() {
49            relation_instance.inbound.set(&inbound_property_name, v);
50        }
51    }
52    relation_instance.set(BUFFER, json!(buffer));
53};
54
55pub const FN_NUMERIC_INTERPOLATION_CONNECTOR: ComplexConnectorFunction = |new_value, inbound_property_name, relation_instance| {
56    let buffer_size = relation_instance
57        .get(BUFFER_SIZE)
58        .and_then(|v| v.as_u64())
59        .and_then(|v| usize::try_from(v).ok())
60        .unwrap_or(10);
61    let mut buffer = relation_instance.get(BUFFER).and_then(|v| v.as_array().cloned()).unwrap_or_default();
62    buffer.insert(0, new_value);
63    while buffer.len() > buffer_size {
64        buffer.pop();
65    }
66    relation_instance.set(BUFFER, json!(buffer));
67    // Calculate average
68    let average = buffer.iter().filter_map(|v| v.as_f64()).sum::<f64>() / buffer.len() as f64;
69    relation_instance.inbound.set(&inbound_property_name, json!(average));
70};
71
72pub const FN_THREADED_CONNECTOR: ComplexConnectorFunction = |new_value, inbound_property_name, relation_instance| {
73    let relation_instance_2 = relation_instance.clone();
74    thread::spawn(move || {
75        relation_instance_2.inbound.set(inbound_property_name, new_value);
76    });
77};
78
79pub const FN_INCREMENT_BY_CONNECTOR: ComplexConnectorFunction = |increment_by, inbound_property_name, relation_instance| {
80    if let Some(old_value) = relation_instance.inbound.get(&inbound_property_name).and_then(|v| v.as_i64()) {
81        if let Some(increment_by) = increment_by.as_i64() {
82            let new_value = old_value + increment_by;
83            relation_instance.inbound.set(&inbound_property_name, json!(new_value));
84        }
85    }
86};
87
88pub const FN_DECREMENT_BY_CONNECTOR: ComplexConnectorFunction = |decrement_by, inbound_property_name, relation_instance| {
89    if let Some(old_value) = relation_instance.inbound.get(&inbound_property_name).and_then(|v| v.as_i64()) {
90        if let Some(decrement_by) = decrement_by.as_i64() {
91            let new_value = old_value - decrement_by;
92            relation_instance.inbound.set(&inbound_property_name, json!(new_value));
93        }
94    }
95};
96
97const FACTORY_CREATOR: RelationBehaviourFactoryCreator<ComplexConnectorFunction> = |ty, f| Arc::new(ComplexConnectorFactory::new(ty.clone(), f));
98
99pub static COMPLEX_CONNECTOR_BEHAVIOURS: RelationBehaviourFunctionsStorage<ComplexConnectorFunction> = LazyLock::new(|| {
100    RelationBehaviourFunctions::<ComplexConnectorFunction>::with_namespace(NAMESPACE_CONNECTOR, FACTORY_CREATOR)
101        .behaviour("delay_connector", FN_DELAY_CONNECTOR)
102        .behaviour("debounce_connector", FN_DEBOUNCE_CONNECTOR)
103        .behaviour("buffered_fifo_connector", FN_BUFFERED_FIFO_CONNECTOR)
104        .behaviour("numeric_interpolation_connector", FN_NUMERIC_INTERPOLATION_CONNECTOR)
105        .behaviour("threaded_connector", FN_THREADED_CONNECTOR)
106        .behaviour("increment_by_connector", FN_INCREMENT_BY_CONNECTOR)
107        .behaviour("decrement_by_connector", FN_DECREMENT_BY_CONNECTOR)
108        .get()
109});