muse2/input/process/
flow.rs

1//! Code for reading process flows file
2use super::super::*;
3use crate::commodity::{CommodityID, CommodityMap};
4use crate::id::IDCollection;
5use crate::process::{FlowType, ProcessFlow, ProcessID};
6use anyhow::{ensure, Context, Result};
7use indexmap::IndexSet;
8use serde::Deserialize;
9use std::collections::{HashMap, HashSet};
10use std::path::Path;
11use std::rc::Rc;
12
13const PROCESS_FLOWS_FILE_NAME: &str = "process_flows.csv";
14
15#[derive(PartialEq, Debug, Deserialize)]
16struct ProcessFlowRaw {
17    process_id: String,
18    commodity_id: String,
19    flow: f64,
20    #[serde(default)]
21    flow_type: FlowType,
22    flow_cost: Option<f64>,
23    is_pac: bool,
24}
25
26/// Read process flows from a CSV file
27pub fn read_process_flows(
28    model_dir: &Path,
29    process_ids: &IndexSet<ProcessID>,
30    commodities: &CommodityMap,
31) -> Result<HashMap<ProcessID, Vec<ProcessFlow>>> {
32    let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
33    let process_flow_csv = read_csv(&file_path)?;
34    read_process_flows_from_iter(process_flow_csv, process_ids, commodities)
35        .with_context(|| input_err_msg(&file_path))
36}
37
38/// Read 'ProcessFlowRaw' records from an iterator and convert them into 'ProcessFlow' records.
39fn read_process_flows_from_iter<I>(
40    iter: I,
41    process_ids: &IndexSet<ProcessID>,
42    commodities: &CommodityMap,
43) -> Result<HashMap<ProcessID, Vec<ProcessFlow>>>
44where
45    I: Iterator<Item = ProcessFlowRaw>,
46{
47    let mut flows = HashMap::new();
48    for flow in iter {
49        let commodity = commodities
50            .get(flow.commodity_id.as_str())
51            .with_context(|| format!("{} is not a valid commodity ID", &flow.commodity_id))?;
52
53        ensure!(flow.flow != 0.0, "Flow cannot be zero");
54
55        // Check that flow is not infinity, nan, etc.
56        ensure!(
57            flow.flow.is_normal(),
58            "Invalid value for flow ({})",
59            flow.flow
60        );
61
62        // **TODO**: https://github.com/EnergySystemsModellingLab/MUSE_2.0/issues/300
63        ensure!(
64            flow.flow_type == FlowType::Fixed,
65            "Commodity flexible assets are not currently supported"
66        );
67
68        if let Some(flow_cost) = flow.flow_cost {
69            ensure!(
70                (0.0..f64::INFINITY).contains(&flow_cost),
71                "Invalid value for flow cost ({flow_cost}). Must be >=0."
72            )
73        }
74
75        // Create ProcessFlow object
76        let process_id = process_ids.get_id_by_str(&flow.process_id)?;
77        let process_flow = ProcessFlow {
78            process_id: flow.process_id,
79            commodity: Rc::clone(commodity),
80            flow: flow.flow,
81            flow_type: flow.flow_type,
82            flow_cost: flow.flow_cost.unwrap_or(0.0),
83            is_pac: flow.is_pac,
84        };
85
86        // Insert into the map
87        flows
88            .entry(process_id)
89            .or_insert_with(Vec::new)
90            .push(process_flow);
91    }
92
93    validate_flows(&flows)?;
94    validate_pac_flows(&flows)?;
95
96    Ok(flows)
97}
98
99/// Validate that no process has multiple flows for the same commodity.
100///
101/// # Arguments
102/// * `flows` - A map of process IDs to process flows
103///
104/// # Returns
105/// An `Ok(())` if the check is successful, or an error.
106fn validate_flows(flows: &HashMap<ProcessID, Vec<ProcessFlow>>) -> Result<()> {
107    for (process_id, flows) in flows.iter() {
108        let mut commodities: HashSet<CommodityID> = HashSet::new();
109
110        for flow in flows.iter() {
111            let commodity_id = &flow.commodity.id;
112            ensure!(
113                commodities.insert(commodity_id.clone()),
114                "Process {process_id} has multiple flows for commodity {commodity_id}",
115            );
116        }
117    }
118
119    Ok(())
120}
121
122/// Validate that the PACs for each process are either all inputs or all outputs.
123///
124/// # Arguments
125///
126/// * `flows` - A map of process IDs to process flows
127///
128/// # Returns
129/// An `Ok(())` if the check is successful, or an error.
130fn validate_pac_flows(flows: &HashMap<ProcessID, Vec<ProcessFlow>>) -> Result<()> {
131    for (process_id, flows) in flows.iter() {
132        let mut flow_sign: Option<bool> = None; // False for inputs, true for outputs
133
134        for flow in flows.iter().filter(|flow| flow.is_pac) {
135            // Check that flow sign is consistent
136            let current_flow_sign = flow.flow > 0.0;
137            if let Some(flow_sign) = flow_sign {
138                ensure!(
139                    current_flow_sign == flow_sign,
140                    "PACs for process {process_id} are a mix of inputs and outputs",
141                );
142            }
143            flow_sign = Some(current_flow_sign);
144        }
145
146        ensure!(
147            flow_sign.is_some(),
148            "No PACs defined for process {process_id}"
149        );
150    }
151
152    Ok(())
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use crate::commodity::{Commodity, CommodityCostMap, CommodityType, DemandMap};
159    use crate::time_slice::TimeSliceLevel;
160    use std::iter;
161
162    #[test]
163    fn test_read_process_flows_from_iter_good() {
164        let process_ids = ["id1".into(), "id2".into()].into_iter().collect();
165        let commodities: CommodityMap = ["commodity1", "commodity2"]
166            .into_iter()
167            .map(|id| {
168                let commodity = Commodity {
169                    id: id.into(),
170                    description: "Some description".into(),
171                    kind: CommodityType::InputCommodity,
172                    time_slice_level: TimeSliceLevel::Annual,
173                    costs: CommodityCostMap::new(),
174                    demand: DemandMap::new(),
175                };
176
177                (commodity.id.clone(), commodity.into())
178            })
179            .collect();
180
181        let flows_raw = [
182            ProcessFlowRaw {
183                process_id: "id1".into(),
184                commodity_id: "commodity1".into(),
185                flow: 1.0,
186                flow_type: FlowType::Fixed,
187                flow_cost: Some(1.0),
188                is_pac: true,
189            },
190            ProcessFlowRaw {
191                process_id: "id1".into(),
192                commodity_id: "commodity2".into(),
193                flow: 1.0,
194                flow_type: FlowType::Fixed,
195                flow_cost: Some(1.0),
196                is_pac: false,
197            },
198            ProcessFlowRaw {
199                process_id: "id2".into(),
200                commodity_id: "commodity1".into(),
201                flow: 1.0,
202                flow_type: FlowType::Fixed,
203                flow_cost: Some(1.0),
204                is_pac: true,
205            },
206        ];
207
208        let expected = HashMap::from([
209            (
210                "id1".into(),
211                vec![
212                    ProcessFlow {
213                        process_id: "id1".into(),
214                        commodity: commodities.get("commodity1").unwrap().clone(),
215                        flow: 1.0,
216                        flow_type: FlowType::Fixed,
217                        flow_cost: 1.0,
218                        is_pac: true,
219                    },
220                    ProcessFlow {
221                        process_id: "id1".into(),
222                        commodity: commodities.get("commodity2").unwrap().clone(),
223                        flow: 1.0,
224                        flow_type: FlowType::Fixed,
225                        flow_cost: 1.0,
226                        is_pac: false,
227                    },
228                ],
229            ),
230            (
231                "id2".into(),
232                vec![ProcessFlow {
233                    process_id: "id2".into(),
234                    commodity: commodities.get("commodity1").unwrap().clone(),
235                    flow: 1.0,
236                    flow_type: FlowType::Fixed,
237                    flow_cost: 1.0,
238                    is_pac: true,
239                }],
240            ),
241        ]);
242
243        let actual =
244            read_process_flows_from_iter(flows_raw.into_iter(), &process_ids, &commodities)
245                .unwrap();
246        assert_eq!(expected, actual);
247    }
248
249    #[test]
250    fn test_read_process_flows_from_iter_bad_commodity_id() {
251        let process_ids = ["id1".into(), "id2".into()].into_iter().collect();
252        let commodities = ["commodity1", "commodity2"]
253            .into_iter()
254            .map(|id| {
255                let commodity = Commodity {
256                    id: id.into(),
257                    description: "Some description".into(),
258                    kind: CommodityType::InputCommodity,
259                    time_slice_level: TimeSliceLevel::Annual,
260                    costs: CommodityCostMap::new(),
261                    demand: DemandMap::new(),
262                };
263
264                (commodity.id.clone(), commodity.into())
265            })
266            .collect();
267
268        let flows_raw = [
269            ProcessFlowRaw {
270                process_id: "id1".into(),
271                commodity_id: "commodity1".into(),
272                flow: 1.0,
273                flow_type: FlowType::Fixed,
274                flow_cost: Some(1.0),
275                is_pac: true,
276            },
277            ProcessFlowRaw {
278                process_id: "id1".into(),
279                commodity_id: "commodity3".into(),
280                flow: 1.0,
281                flow_type: FlowType::Fixed,
282                flow_cost: Some(1.0),
283                is_pac: false,
284            },
285        ];
286
287        assert!(
288            read_process_flows_from_iter(flows_raw.into_iter(), &process_ids, &commodities)
289                .is_err()
290        );
291    }
292
293    #[test]
294    fn test_read_process_flows_from_iter_bad_flow() {
295        let process_ids = iter::once("id1".into()).collect();
296        let commodities = iter::once(Commodity {
297            id: "commodity1".into(),
298            description: "Some description".into(),
299            kind: CommodityType::InputCommodity,
300            time_slice_level: TimeSliceLevel::Annual,
301            costs: CommodityCostMap::new(),
302            demand: DemandMap::new(),
303        })
304        .map(|c| (c.id.clone(), Rc::new(c)))
305        .collect();
306
307        macro_rules! check_bad_flow {
308            ($flow:expr) => {
309                let flow = ProcessFlowRaw {
310                    process_id: "id1".into(),
311                    commodity_id: "commodity1".into(),
312                    flow: $flow,
313                    flow_type: FlowType::Fixed,
314                    flow_cost: Some(1.0),
315                    is_pac: true,
316                };
317                assert!(
318                    read_process_flows_from_iter(iter::once(flow), &process_ids, &commodities)
319                        .is_err()
320                );
321            };
322        }
323
324        check_bad_flow!(0.0);
325        check_bad_flow!(f64::NEG_INFINITY);
326        check_bad_flow!(f64::INFINITY);
327        check_bad_flow!(f64::NAN);
328    }
329
330    #[test]
331    fn test_read_process_flows_from_iter_bad_pacs() {
332        let process_ids = ["id1".into(), "id2".into()].into_iter().collect();
333        let commodities = ["commodity1", "commodity2"]
334            .into_iter()
335            .map(|id| {
336                let commodity = Commodity {
337                    id: id.into(),
338                    description: "Some description".into(),
339                    kind: CommodityType::InputCommodity,
340                    time_slice_level: TimeSliceLevel::Annual,
341                    costs: CommodityCostMap::new(),
342                    demand: DemandMap::new(),
343                };
344
345                (commodity.id.clone(), commodity.into())
346            })
347            .collect();
348
349        let flows_raw = [
350            ProcessFlowRaw {
351                process_id: "id1".into(),
352                commodity_id: "commodity1".into(),
353                flow: 1.0,
354                flow_type: FlowType::Fixed,
355                flow_cost: Some(1.0),
356                is_pac: true,
357            },
358            ProcessFlowRaw {
359                process_id: "id1".into(),
360                commodity_id: "commodity2".into(),
361                flow: -1.0,
362                flow_type: FlowType::Fixed,
363                flow_cost: Some(1.0),
364                is_pac: true,
365            },
366        ];
367
368        assert!(
369            read_process_flows_from_iter(flows_raw.into_iter(), &process_ids, &commodities)
370                .is_err()
371        );
372    }
373
374    #[test]
375    fn test_read_process_flows_from_iter_no_pacs() {
376        let process_ids = ["id1".into(), "id2".into()].into_iter().collect();
377        let commodities = ["commodity1", "commodity2"]
378            .into_iter()
379            .map(|id| {
380                let commodity = Commodity {
381                    id: id.into(),
382                    description: "Some description".into(),
383                    kind: CommodityType::InputCommodity,
384                    time_slice_level: TimeSliceLevel::Annual,
385                    costs: CommodityCostMap::new(),
386                    demand: DemandMap::new(),
387                };
388
389                (commodity.id.clone(), commodity.into())
390            })
391            .collect();
392
393        let flows_raw = [
394            ProcessFlowRaw {
395                process_id: "id1".into(),
396                commodity_id: "commodity1".into(),
397                flow: 1.0,
398                flow_type: FlowType::Fixed,
399                flow_cost: Some(1.0),
400                is_pac: false,
401            },
402            ProcessFlowRaw {
403                process_id: "id1".into(),
404                commodity_id: "commodity2".into(),
405                flow: 1.0,
406                flow_type: FlowType::Fixed,
407                flow_cost: Some(1.0),
408                is_pac: false,
409            },
410        ];
411
412        assert!(
413            read_process_flows_from_iter(flows_raw.into_iter(), &process_ids, &commodities)
414                .is_err()
415        );
416    }
417
418    #[test]
419    fn test_read_process_flows_from_iter_flow_cost() {
420        let process_ids = iter::once("id1".into()).collect();
421        let commodities = iter::once(Commodity {
422            id: "commodity1".into(),
423            description: "Some description".into(),
424            kind: CommodityType::InputCommodity,
425            time_slice_level: TimeSliceLevel::Annual,
426            costs: CommodityCostMap::new(),
427            demand: DemandMap::new(),
428        })
429        .map(|c| (c.id.clone(), Rc::new(c)))
430        .collect();
431
432        macro_rules! is_flow_cost_ok {
433            ($flow_cost:expr) => {{
434                let flow = ProcessFlowRaw {
435                    process_id: "id1".into(),
436                    commodity_id: "commodity1".into(),
437                    flow: 1.0,
438                    flow_type: FlowType::Fixed,
439                    flow_cost: Some($flow_cost),
440                    is_pac: true,
441                };
442
443                read_process_flows_from_iter(iter::once(flow), &process_ids, &commodities).is_ok()
444            }};
445        }
446
447        assert!(is_flow_cost_ok!(0.0));
448        assert!(is_flow_cost_ok!(1.0));
449        assert!(is_flow_cost_ok!(100.0));
450        assert!(!is_flow_cost_ok!(f64::NEG_INFINITY));
451        assert!(!is_flow_cost_ok!(f64::INFINITY));
452        assert!(!is_flow_cost_ok!(f64::NAN));
453    }
454
455    #[test]
456    fn test_read_process_flows_from_iter_duplicate_flow() {
457        let process_ids = iter::once("id1".into()).collect();
458        let commodities = ["commodity1"]
459            .into_iter()
460            .map(|id| {
461                let commodity = Commodity {
462                    id: id.into(),
463                    description: "Some description".into(),
464                    kind: CommodityType::InputCommodity,
465                    time_slice_level: TimeSliceLevel::Annual,
466                    costs: CommodityCostMap::new(),
467                    demand: DemandMap::new(),
468                };
469
470                (commodity.id.clone(), commodity.into())
471            })
472            .collect();
473
474        let flows_raw = [
475            ProcessFlowRaw {
476                process_id: "id1".into(),
477                commodity_id: "commodity1".into(),
478                flow: 1.0,
479                flow_type: FlowType::Fixed,
480                flow_cost: Some(1.0),
481                is_pac: true,
482            },
483            ProcessFlowRaw {
484                process_id: "id1".into(),
485                commodity_id: "commodity1".into(),
486                flow: 1.0,
487                flow_type: FlowType::Fixed,
488                flow_cost: Some(1.0),
489                is_pac: false,
490            },
491        ];
492
493        assert!(
494            read_process_flows_from_iter(flows_raw.into_iter(), &process_ids, &commodities)
495                .is_err()
496        );
497    }
498}