reactive_graph_std_connector/behaviour/relation/complex_connector/
function.rs1use std::sync::Arc;
2use std::sync::LazyLock;
3use std::thread;
4use std::time::Duration;
5
6use reactive_graph_behaviour_model_impl::relation::RelationBehaviourFactoryCreator;
7use reactive_graph_behaviour_model_impl::relation::function::RelationBehaviourFunctions;
8use reactive_graph_behaviour_model_impl::relation::function::RelationBehaviourFunctionsStorage;
9use reactive_graph_graph::PropertyInstanceGetter;
10use reactive_graph_graph::PropertyInstanceSetter;
11use reactive_graph_reactive_model_impl::ReactiveRelation;
12use serde_json::Value;
13use serde_json::json;
14
15use reactive_graph_std_connector_model::BufferProperties::BUFFER;
16use reactive_graph_std_connector_model::BufferProperties::BUFFER_SIZE;
17use reactive_graph_std_connector_model::NAMESPACE_CONNECTOR;
18
19use crate::behaviour::relation::complex_connector::ComplexConnectorFactory;
20
21pub type ComplexConnectorFunction = fn(Value, String, ReactiveRelation);
22
23pub const FN_DELAY_CONNECTOR: ComplexConnectorFunction = |new_value, inbound_property_name, relation_instance| {
24 let delay_in_millis = relation_instance.get("delay").and_then(|v| v.as_u64()).unwrap_or(10);
25 thread::sleep(Duration::from_millis(delay_in_millis));
26 relation_instance.inbound.set(inbound_property_name, new_value);
27};
28
29pub const FN_DEBOUNCE_CONNECTOR: ComplexConnectorFunction = |new_value, inbound_property_name, relation_instance| {
30 if let Some(old_value) = relation_instance.inbound.get(inbound_property_name.clone()) {
31 if old_value != new_value {
32 relation_instance.inbound.set(inbound_property_name, new_value);
34 }
35 }
36};
37
38pub const FN_BUFFERED_FIFO_CONNECTOR: ComplexConnectorFunction = |new_value, inbound_property_name, relation_instance| {
39 let buffer_size = relation_instance
40 .get(BUFFER_SIZE)
41 .and_then(|v| v.as_u64())
42 .and_then(|v| usize::try_from(v).ok())
43 .unwrap_or(10);
44 let mut buffer = relation_instance.get(BUFFER).and_then(|v| v.as_array().cloned()).unwrap_or_default();
45 buffer.insert(0, new_value);
46 while buffer.len() > buffer_size {
48 if let Some(v) = buffer.pop() {
49 relation_instance.inbound.set(&inbound_property_name, v);
50 }
51 }
52 relation_instance.set(BUFFER, json!(buffer));
53};
54
55pub const FN_NUMERIC_INTERPOLATION_CONNECTOR: ComplexConnectorFunction = |new_value, inbound_property_name, relation_instance| {
56 let buffer_size = relation_instance
57 .get(BUFFER_SIZE)
58 .and_then(|v| v.as_u64())
59 .and_then(|v| usize::try_from(v).ok())
60 .unwrap_or(10);
61 let mut buffer = relation_instance.get(BUFFER).and_then(|v| v.as_array().cloned()).unwrap_or_default();
62 buffer.insert(0, new_value);
63 while buffer.len() > buffer_size {
64 buffer.pop();
65 }
66 relation_instance.set(BUFFER, json!(buffer));
67 let average = buffer.iter().filter_map(|v| v.as_f64()).sum::<f64>() / buffer.len() as f64;
69 relation_instance.inbound.set(&inbound_property_name, json!(average));
70};
71
72pub const FN_THREADED_CONNECTOR: ComplexConnectorFunction = |new_value, inbound_property_name, relation_instance| {
73 let relation_instance_2 = relation_instance.clone();
74 thread::spawn(move || {
75 relation_instance_2.inbound.set(inbound_property_name, new_value);
76 });
77};
78
79pub const FN_INCREMENT_BY_CONNECTOR: ComplexConnectorFunction = |increment_by, inbound_property_name, relation_instance| {
80 if let Some(old_value) = relation_instance.inbound.get(&inbound_property_name).and_then(|v| v.as_i64()) {
81 if let Some(increment_by) = increment_by.as_i64() {
82 let new_value = old_value + increment_by;
83 relation_instance.inbound.set(&inbound_property_name, json!(new_value));
84 }
85 }
86};
87
88pub const FN_DECREMENT_BY_CONNECTOR: ComplexConnectorFunction = |decrement_by, inbound_property_name, relation_instance| {
89 if let Some(old_value) = relation_instance.inbound.get(&inbound_property_name).and_then(|v| v.as_i64()) {
90 if let Some(decrement_by) = decrement_by.as_i64() {
91 let new_value = old_value - decrement_by;
92 relation_instance.inbound.set(&inbound_property_name, json!(new_value));
93 }
94 }
95};
96
97const FACTORY_CREATOR: RelationBehaviourFactoryCreator<ComplexConnectorFunction> = |ty, f| Arc::new(ComplexConnectorFactory::new(ty.clone(), f));
98
99pub static COMPLEX_CONNECTOR_BEHAVIOURS: RelationBehaviourFunctionsStorage<ComplexConnectorFunction> = LazyLock::new(|| {
100 RelationBehaviourFunctions::<ComplexConnectorFunction>::with_namespace(NAMESPACE_CONNECTOR, FACTORY_CREATOR)
101 .behaviour("delay_connector", FN_DELAY_CONNECTOR)
102 .behaviour("debounce_connector", FN_DEBOUNCE_CONNECTOR)
103 .behaviour("buffered_fifo_connector", FN_BUFFERED_FIFO_CONNECTOR)
104 .behaviour("numeric_interpolation_connector", FN_NUMERIC_INTERPOLATION_CONNECTOR)
105 .behaviour("threaded_connector", FN_THREADED_CONNECTOR)
106 .behaviour("increment_by_connector", FN_INCREMENT_BY_CONNECTOR)
107 .behaviour("decrement_by_connector", FN_DECREMENT_BY_CONNECTOR)
108 .get()
109});