muse2/input/process/
flow.rs

1//! Code for reading process flows file
2use super::super::*;
3use crate::commodity::{CommodityID, CommodityMap};
4use crate::process::{FlowType, ProcessFlow, ProcessFlowsMap, ProcessID, ProcessMap};
5use crate::region::{parse_region_str, RegionID};
6use crate::units::{FlowPerActivity, MoneyPerFlow};
7use crate::year::parse_year_str;
8use anyhow::{ensure, Context, Result};
9use itertools::iproduct;
10use serde::Deserialize;
11use std::collections::HashMap;
12use std::path::Path;
13use std::rc::Rc;
14
15const PROCESS_FLOWS_FILE_NAME: &str = "process_flows.csv";
16
17type PrimaryOutputsKeys = (ProcessID, RegionID, u32);
18type PrimaryOutputsValues = Vec<(CommodityID, Option<bool>)>;
19type PrimaryOutputsMap = HashMap<PrimaryOutputsKeys, PrimaryOutputsValues>;
20
21#[derive(PartialEq, Debug, Deserialize)]
22struct ProcessFlowRaw {
23    process_id: String,
24    commodity_id: String,
25    years: String,
26    regions: String,
27    coeff: FlowPerActivity,
28    #[serde(default)]
29    #[serde(rename = "type")]
30    kind: FlowType,
31    cost: Option<MoneyPerFlow>,
32    is_primary_output: Option<bool>,
33}
34
35impl ProcessFlowRaw {
36    fn validate(&self) -> Result<()> {
37        // Check that flow is not infinity, nan, 0 etc.
38        ensure!(
39            self.coeff.is_normal(),
40            "Invalid value for coeff ({})",
41            self.coeff
42        );
43
44        // **TODO**: https://github.com/EnergySystemsModellingLab/MUSE_2.0/issues/300
45        ensure!(
46            self.kind == FlowType::Fixed,
47            "Commodity flexible assets are not currently supported"
48        );
49
50        // Check that flow cost is non-negative
51        if let Some(cost) = self.cost {
52            ensure!(
53                (0.0..f64::INFINITY).contains(&cost.value()),
54                "Invalid value for flow cost ({cost}). Must be >=0."
55            )
56        }
57
58        Ok(())
59    }
60}
61
62/// Read process flows from a CSV file
63pub fn read_process_flows(
64    model_dir: &Path,
65    processes: &ProcessMap,
66    commodities: &CommodityMap,
67) -> Result<HashMap<ProcessID, ProcessFlowsMap>> {
68    let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
69    let process_flow_csv = read_csv(&file_path)?;
70    read_process_flows_from_iter(process_flow_csv, processes, commodities)
71        .with_context(|| input_err_msg(&file_path))
72}
73
74/// Read 'ProcessFlowRaw' records from an iterator and convert them into 'ProcessFlow' records.
75fn read_process_flows_from_iter<I>(
76    iter: I,
77    processes: &ProcessMap,
78    commodities: &CommodityMap,
79) -> Result<HashMap<ProcessID, ProcessFlowsMap>>
80where
81    I: Iterator<Item = ProcessFlowRaw>,
82{
83    let mut flows_map: HashMap<ProcessID, ProcessFlowsMap> = HashMap::new();
84    let mut primary_outputs = PrimaryOutputsMap::new();
85    for record in iter {
86        record.validate()?;
87
88        // Get process
89        let (id, process) = processes
90            .get_key_value(record.process_id.as_str())
91            .with_context(|| format!("Process {} not found", record.process_id))?;
92
93        // Get regions
94        let process_regions = &process.regions;
95        let record_regions =
96            parse_region_str(&record.regions, process_regions).with_context(|| {
97                format!("Invalid region for process {id}. Valid regions are {process_regions:?}")
98            })?;
99
100        // Get years
101        let process_years = &process.years;
102        let record_years = parse_year_str(&record.years, process_years).with_context(|| {
103            format!("Invalid year for process {id}. Valid years are {process_years:?}")
104        })?;
105
106        // Get commodity
107        let commodity = commodities
108            .get(record.commodity_id.as_str())
109            .with_context(|| format!("{} is not a valid commodity ID", &record.commodity_id))?;
110
111        // Create ProcessFlow object
112        let process_flow = ProcessFlow {
113            commodity: Rc::clone(commodity),
114            coeff: record.coeff,
115            kind: FlowType::Fixed,
116            cost: record.cost.unwrap_or(MoneyPerFlow(0.0)),
117            is_primary_output: false, // set to false for now and we'll fix up later
118        };
119
120        // Insert flow into the map
121        let region_year_map = flows_map.entry(id.clone()).or_default();
122        for (year, region_id) in iproduct!(record_years, record_regions.iter()) {
123            let flows_map = region_year_map
124                .entry((region_id.clone(), year))
125                .or_default();
126            let existing = flows_map
127                .insert(commodity.id.clone(), process_flow.clone())
128                .is_some();
129            ensure!(
130                !existing,
131                "Duplicate process flow entry for region {}, year {} and commodity {}",
132                region_id,
133                year,
134                commodity.id
135            );
136
137            primary_outputs
138                .entry((id.clone(), region_id.clone(), year))
139                .or_insert_with(|| Vec::with_capacity(1))
140                .push((commodity.id.clone(), record.is_primary_output))
141        }
142    }
143
144    validate_flows_and_update_primary_output(processes, &mut flows_map, &primary_outputs)?;
145
146    Ok(flows_map)
147}
148
149fn validate_flows_and_update_primary_output(
150    processes: &ProcessMap,
151    flows_map: &mut HashMap<ProcessID, ProcessFlowsMap>,
152    primary_outputs: &PrimaryOutputsMap,
153) -> Result<()> {
154    for (process_id, map) in flows_map.iter_mut() {
155        let process = processes.get(process_id).unwrap();
156        for (&year, region_id) in iproduct!(process.years.iter(), process.regions.iter()) {
157            // Check that the process has flows for this region/year
158            let Some(flows) = map.get_mut(&(region_id.clone(), year)) else {
159                bail!("Missing entry for process {process_id} in {region_id}/{year}");
160            };
161
162            let primary_outputs = primary_outputs
163                .get(&(process_id.clone(), region_id.clone(), year))
164                .unwrap();
165
166            let inferred_primary_output = validate_or_infer_primary_output(flows, primary_outputs)
167                .with_context(|| {
168                    format!(
169                    "Invalid primary output configuration for process {} (region: {}, year: {})",
170                    process_id, region_id, year
171                )
172                })?;
173
174            // The primary output was inferred (i.e. there was one output flow which wasn't assigned
175            // a value for is_primary_output). Update map.
176            if let Some(primary_output) = inferred_primary_output {
177                flows.get_mut(&primary_output).unwrap().is_primary_output = true;
178            }
179        }
180    }
181
182    Ok(())
183}
184
185fn validate_or_infer_primary_output(
186    flows_map: &IndexMap<CommodityID, ProcessFlow>,
187    primary_outputs: &PrimaryOutputsValues,
188) -> Result<Option<CommodityID>> {
189    let mut has_primary = false;
190    let mut output_flow = None;
191    let mut outputs_count = 0;
192    for (commodity_id, is_primary_output) in primary_outputs.iter() {
193        let is_output = flows_map.get(commodity_id).unwrap().is_output();
194        if is_output {
195            outputs_count += 1;
196        }
197        match *is_primary_output {
198            Some(true) => {
199                ensure!(
200                    is_output,
201                    "Commodity {commodity_id} cannot be the primary output as it is an input flow"
202                );
203                ensure!(
204                    !has_primary,
205                    "Multiple commodities designated as primary outputs"
206                );
207                has_primary = true;
208            }
209            None if is_output => {
210                output_flow = Some(commodity_id.clone());
211            }
212            _ => {}
213        }
214    }
215
216    // If all flows are inputs or user has designated a primary output explicitly, we're done
217    if has_primary || outputs_count == 0 {
218        return Ok(None);
219    }
220
221    ensure!(
222        output_flow.is_some(),
223        "There are one or more output flows, but is_primary_output is explicitly set to false for these");
224
225    ensure!(
226        outputs_count == 1,
227        "There is more than one output flow, so one must be explicitly designated as the primary output");
228
229    // We can infer that the one output flow is the primary output
230    Ok(output_flow)
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use crate::commodity::Commodity;
237    use crate::fixture::svd_commodity;
238
239    use rstest::rstest;
240    use std::rc::Rc;
241
242    fn flow(commodity: Rc<Commodity>, coeff: f64) -> ProcessFlow {
243        ProcessFlow {
244            commodity,
245            coeff: FlowPerActivity(coeff),
246            kind: FlowType::Fixed,
247            cost: MoneyPerFlow(0.0),
248            is_primary_output: false,
249        }
250    }
251
252    fn create_process_flow_raw(
253        coeff: FlowPerActivity,
254        cost: Option<MoneyPerFlow>,
255        is_primary_output: Option<bool>,
256    ) -> ProcessFlowRaw {
257        ProcessFlowRaw {
258            process_id: "process".into(),
259            commodity_id: "commodity".into(),
260            years: "2020".into(),
261            regions: "region".into(),
262            coeff,
263            kind: FlowType::Fixed,
264            cost,
265            is_primary_output,
266        }
267    }
268
269    #[test]
270    fn test_validate_flow_raw() {
271        // Valid
272        let valid =
273            create_process_flow_raw(FlowPerActivity(1.0), Some(MoneyPerFlow(0.0)), Some(false));
274        assert!(valid.validate().is_ok());
275
276        // Invalid: Bad flow value
277        let invalid =
278            create_process_flow_raw(FlowPerActivity(0.0), Some(MoneyPerFlow(0.0)), Some(false));
279        assert!(invalid.validate().is_err());
280        let invalid = create_process_flow_raw(
281            FlowPerActivity(f64::NAN),
282            Some(MoneyPerFlow(0.0)),
283            Some(false),
284        );
285        assert!(invalid.validate().is_err());
286        let invalid = create_process_flow_raw(
287            FlowPerActivity(f64::INFINITY),
288            Some(MoneyPerFlow(0.0)),
289            Some(false),
290        );
291        assert!(invalid.validate().is_err());
292        let invalid = create_process_flow_raw(
293            FlowPerActivity(f64::NEG_INFINITY),
294            Some(MoneyPerFlow(0.0)),
295            Some(false),
296        );
297        assert!(invalid.validate().is_err());
298
299        // Invalid: Bad flow cost value
300        let invalid = create_process_flow_raw(
301            FlowPerActivity(1.0),
302            Some(MoneyPerFlow(f64::NAN)),
303            Some(false),
304        );
305        assert!(invalid.validate().is_err());
306        let invalid = create_process_flow_raw(
307            FlowPerActivity(1.0),
308            Some(MoneyPerFlow(f64::NEG_INFINITY)),
309            Some(false),
310        );
311        assert!(invalid.validate().is_err());
312        let invalid = create_process_flow_raw(
313            FlowPerActivity(1.0),
314            Some(MoneyPerFlow(f64::INFINITY)),
315            Some(false),
316        );
317        assert!(invalid.validate().is_err());
318    }
319
320    #[rstest]
321    fn single_output_explicit_primary(#[from(svd_commodity)] commodity: Commodity) {
322        let c1 = Rc::new(commodity);
323        let mut flows = IndexMap::new();
324        flows.insert("commodity1".into(), flow(Rc::clone(&c1), 1.0));
325        let primary_outputs = vec![("commodity1".into(), Some(true))];
326        let res = validate_or_infer_primary_output(&flows, &primary_outputs).unwrap();
327        assert_eq!(res, None);
328    }
329
330    #[rstest]
331    fn multiple_outputs_one_explicit_primary(
332        #[from(svd_commodity)] commodity1: Commodity,
333        #[from(svd_commodity)] commodity2: Commodity,
334    ) {
335        let c1 = Rc::new(Commodity {
336            id: "c1".into(),
337            ..commodity1
338        });
339        let c2 = Rc::new(Commodity {
340            id: "c2".into(),
341            ..commodity2
342        });
343        let mut flows = IndexMap::new();
344        flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
345        flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
346        let primary_outputs = vec![("c1".into(), Some(true)), ("c2".into(), None)];
347        let res = validate_or_infer_primary_output(&flows, &primary_outputs).unwrap();
348        assert_eq!(res, None);
349    }
350
351    #[rstest]
352    fn multiple_outputs_none_explicit_should_error(
353        #[from(svd_commodity)] commodity1: Commodity,
354        #[from(svd_commodity)] commodity2: Commodity,
355    ) {
356        let c1 = Rc::new(Commodity {
357            id: "c1".into(),
358            ..commodity1
359        });
360        let c2 = Rc::new(Commodity {
361            id: "c2".into(),
362            ..commodity2
363        });
364        let mut flows = IndexMap::new();
365        flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
366        flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
367        let primary_outputs = vec![("c1".into(), None), ("c2".into(), None)];
368        let res = validate_or_infer_primary_output(&flows, &primary_outputs);
369        assert!(res.is_err());
370    }
371
372    #[rstest]
373    fn multiple_outputs_all_explicit_false_should_error(
374        #[from(svd_commodity)] commodity1: Commodity,
375        #[from(svd_commodity)] commodity2: Commodity,
376    ) {
377        let c1 = Rc::new(Commodity {
378            id: "c1".into(),
379            ..commodity1
380        });
381        let c2 = Rc::new(Commodity {
382            id: "c2".into(),
383            ..commodity2
384        });
385        let mut flows = IndexMap::new();
386        flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
387        flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
388        let primary_outputs = vec![("c1".into(), Some(false)), ("c2".into(), Some(false))];
389        let res = validate_or_infer_primary_output(&flows, &primary_outputs);
390        assert!(res.is_err());
391    }
392
393    #[rstest]
394    fn all_inputs(
395        #[from(svd_commodity)] commodity1: Commodity,
396        #[from(svd_commodity)] commodity2: Commodity,
397    ) {
398        let c1 = Rc::new(Commodity {
399            id: "c1".into(),
400            ..commodity1
401        });
402        let c2 = Rc::new(Commodity {
403            id: "c2".into(),
404            ..commodity2
405        });
406        let mut flows = IndexMap::new();
407        flows.insert("c1".into(), flow(Rc::clone(&c1), -1.0));
408        flows.insert("c2".into(), flow(Rc::clone(&c2), -2.0));
409        let primary_outputs = vec![("c1".into(), None), ("c2".into(), None)];
410        let res = validate_or_infer_primary_output(&flows, &primary_outputs).unwrap();
411        assert_eq!(res, None);
412    }
413
414    #[rstest]
415    fn multiple_outputs_multiple_explicit_primaries_should_error(
416        #[from(svd_commodity)] commodity1: Commodity,
417        #[from(svd_commodity)] commodity2: Commodity,
418    ) {
419        let c1 = Rc::new(Commodity {
420            id: "c1".into(),
421            ..commodity1
422        });
423        let c2 = Rc::new(Commodity {
424            id: "c2".into(),
425            ..commodity2
426        });
427        let mut flows = IndexMap::new();
428        flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
429        flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
430        let primary_outputs = vec![("c1".into(), Some(true)), ("c2".into(), Some(true))];
431        let res = validate_or_infer_primary_output(&flows, &primary_outputs);
432        assert!(res.is_err());
433    }
434
435    #[rstest]
436    fn explicit_primary_on_input_should_error(
437        #[from(svd_commodity)] commodity1: Commodity,
438        #[from(svd_commodity)] commodity2: Commodity,
439    ) {
440        let c1 = Rc::new(Commodity {
441            id: "c1".into(),
442            ..commodity1
443        });
444        let c2 = Rc::new(Commodity {
445            id: "c2".into(),
446            ..commodity2
447        });
448        let mut flows = IndexMap::new();
449        flows.insert("c1".into(), flow(Rc::clone(&c1), -1.0));
450        flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
451        let primary_outputs = vec![("c1".into(), Some(true)), ("c2".into(), None)];
452        let res = validate_or_infer_primary_output(&flows, &primary_outputs);
453        assert!(res.is_err());
454    }
455}