muse2/input/process/
flow.rs

1//! Code for reading process flows file
2use super::super::*;
3use crate::commodity::{CommodityID, CommodityMap};
4use crate::process::{FlowType, ProcessFlow, ProcessFlowsMap, ProcessID, ProcessMap};
5use crate::region::{parse_region_str, RegionID};
6use crate::units::{FlowPerActivity, MoneyPerFlow};
7use crate::year::parse_year_str;
8use anyhow::{ensure, Context, Result};
9use itertools::iproduct;
10use serde::Deserialize;
11use std::collections::HashMap;
12use std::path::Path;
13use std::rc::Rc;
14
15const PROCESS_FLOWS_FILE_NAME: &str = "process_flows.csv";
16
17type PrimaryOutputsKeys = (ProcessID, RegionID, u32);
18type PrimaryOutputsValues = Vec<(CommodityID, Option<bool>)>;
19type PrimaryOutputsMap = HashMap<PrimaryOutputsKeys, PrimaryOutputsValues>;
20
21#[derive(PartialEq, Debug, Deserialize)]
22struct ProcessFlowRaw {
23    process_id: String,
24    commodity_id: String,
25    years: String,
26    regions: String,
27    coeff: FlowPerActivity,
28    #[serde(default)]
29    #[serde(rename = "type")]
30    kind: FlowType,
31    cost: Option<MoneyPerFlow>,
32    is_primary_output: Option<bool>,
33}
34
35impl ProcessFlowRaw {
36    fn validate(&self) -> Result<()> {
37        // Check that flow is not infinity, nan, 0 etc.
38        ensure!(
39            self.coeff.is_normal(),
40            "Invalid value for coeff ({})",
41            self.coeff
42        );
43
44        // **TODO**: https://github.com/EnergySystemsModellingLab/MUSE_2.0/issues/300
45        ensure!(
46            self.kind == FlowType::Fixed,
47            "Commodity flexible assets are not currently supported"
48        );
49
50        // Check that flow cost is non-negative
51        if let Some(cost) = self.cost {
52            ensure!(
53                (0.0..f64::INFINITY).contains(&cost.value()),
54                "Invalid value for flow cost ({cost}). Must be >=0."
55            )
56        }
57
58        Ok(())
59    }
60}
61
62/// Read process flows from a CSV file
63pub fn read_process_flows(
64    model_dir: &Path,
65    processes: &ProcessMap,
66    commodities: &CommodityMap,
67) -> Result<HashMap<ProcessID, ProcessFlowsMap>> {
68    let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
69    let process_flow_csv = read_csv(&file_path)?;
70    read_process_flows_from_iter(process_flow_csv, processes, commodities)
71        .with_context(|| input_err_msg(&file_path))
72}
73
74/// Read 'ProcessFlowRaw' records from an iterator and convert them into 'ProcessFlow' records.
75fn read_process_flows_from_iter<I>(
76    iter: I,
77    processes: &ProcessMap,
78    commodities: &CommodityMap,
79) -> Result<HashMap<ProcessID, ProcessFlowsMap>>
80where
81    I: Iterator<Item = ProcessFlowRaw>,
82{
83    let mut flows_map: HashMap<ProcessID, ProcessFlowsMap> = HashMap::new();
84    let mut primary_outputs = PrimaryOutputsMap::new();
85    for record in iter {
86        record.validate()?;
87
88        // Get process
89        let (id, process) = processes
90            .get_key_value(record.process_id.as_str())
91            .with_context(|| format!("Process {} not found", record.process_id))?;
92
93        // Get regions
94        let process_regions = &process.regions;
95        let record_regions =
96            parse_region_str(&record.regions, process_regions).with_context(|| {
97                format!("Invalid region for process {id}. Valid regions are {process_regions:?}")
98            })?;
99
100        // Get years
101        let process_years = &process.years;
102        let record_years = parse_year_str(&record.years, process_years).with_context(|| {
103            format!("Invalid year for process {id}. Valid years are {process_years:?}")
104        })?;
105
106        // Get commodity
107        let commodity = commodities
108            .get(record.commodity_id.as_str())
109            .with_context(|| format!("{} is not a valid commodity ID", &record.commodity_id))?;
110
111        // Create ProcessFlow object
112        let process_flow = ProcessFlow {
113            commodity: Rc::clone(commodity),
114            coeff: record.coeff,
115            kind: FlowType::Fixed,
116            cost: record.cost.unwrap_or(MoneyPerFlow(0.0)),
117            is_primary_output: false, // set to false for now and we'll fix up later
118        };
119
120        // Insert flow into the map
121        let region_year_map = flows_map.entry(id.clone()).or_default();
122        for (year, region_id) in iproduct!(record_years, record_regions.iter()) {
123            let flows_map = region_year_map
124                .entry((region_id.clone(), year))
125                .or_default();
126            let existing = flows_map
127                .insert(commodity.id.clone(), process_flow.clone())
128                .is_some();
129            ensure!(
130                !existing,
131                "Duplicate process flow entry for region {}, year {} and commodity {}",
132                region_id,
133                year,
134                commodity.id
135            );
136
137            primary_outputs
138                .entry((id.clone(), region_id.clone(), year))
139                .or_insert_with(|| Vec::with_capacity(1))
140                .push((commodity.id.clone(), record.is_primary_output))
141        }
142    }
143
144    validate_flows_and_update_primary_output(processes, &mut flows_map, &primary_outputs)?;
145
146    Ok(flows_map)
147}
148
149fn validate_flows_and_update_primary_output(
150    processes: &ProcessMap,
151    flows_map: &mut HashMap<ProcessID, ProcessFlowsMap>,
152    primary_outputs: &PrimaryOutputsMap,
153) -> Result<()> {
154    for (process_id, map) in flows_map.iter_mut() {
155        let process = processes.get(process_id).unwrap();
156        for (&year, region_id) in iproduct!(process.years.iter(), process.regions.iter()) {
157            // Check that the process has flows for this region/year
158            let Some(flows) = map.get_mut(&(region_id.clone(), year)) else {
159                bail!("Missing entry for process {process_id} in {region_id}/{year}");
160            };
161
162            let primary_outputs = primary_outputs
163                .get(&(process_id.clone(), region_id.clone(), year))
164                .unwrap();
165
166            let inferred_primary_output = validate_or_infer_primary_output(flows, primary_outputs)
167                .with_context(|| {
168                    format!(
169                    "Invalid primary output configuration for process {process_id} (region: {region_id}, year: {year})"
170                )
171                })?;
172
173            // The primary output was inferred (i.e. there was one output flow which wasn't assigned
174            // a value for is_primary_output). Update map.
175            if let Some(primary_output) = inferred_primary_output {
176                flows.get_mut(&primary_output).unwrap().is_primary_output = true;
177            }
178        }
179    }
180
181    Ok(())
182}
183
184fn validate_or_infer_primary_output(
185    flows_map: &IndexMap<CommodityID, ProcessFlow>,
186    primary_outputs: &PrimaryOutputsValues,
187) -> Result<Option<CommodityID>> {
188    let mut has_primary = false;
189    let mut output_flow = None;
190    let mut outputs_count = 0;
191    for (commodity_id, is_primary_output) in primary_outputs.iter() {
192        let is_output = flows_map.get(commodity_id).unwrap().is_output();
193        if is_output {
194            outputs_count += 1;
195        }
196        match *is_primary_output {
197            Some(true) => {
198                ensure!(
199                    is_output,
200                    "Commodity {commodity_id} cannot be the primary output as it is an input flow"
201                );
202                ensure!(
203                    !has_primary,
204                    "Multiple commodities designated as primary outputs"
205                );
206                has_primary = true;
207            }
208            None if is_output => {
209                output_flow = Some(commodity_id.clone());
210            }
211            _ => {}
212        }
213    }
214
215    // If all flows are inputs or user has designated a primary output explicitly, we're done
216    if has_primary || outputs_count == 0 {
217        return Ok(None);
218    }
219
220    ensure!(
221        output_flow.is_some(),
222        "There are one or more output flows, but is_primary_output is explicitly set to false for these");
223
224    ensure!(
225        outputs_count == 1,
226        "There is more than one output flow, so one must be explicitly designated as the primary output");
227
228    // We can infer that the one output flow is the primary output
229    Ok(output_flow)
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::commodity::Commodity;
236    use crate::fixture::svd_commodity;
237
238    use rstest::rstest;
239    use std::rc::Rc;
240
241    fn flow(commodity: Rc<Commodity>, coeff: f64) -> ProcessFlow {
242        ProcessFlow {
243            commodity,
244            coeff: FlowPerActivity(coeff),
245            kind: FlowType::Fixed,
246            cost: MoneyPerFlow(0.0),
247            is_primary_output: false,
248        }
249    }
250
251    fn create_process_flow_raw(
252        coeff: FlowPerActivity,
253        cost: Option<MoneyPerFlow>,
254        is_primary_output: Option<bool>,
255    ) -> ProcessFlowRaw {
256        ProcessFlowRaw {
257            process_id: "process".into(),
258            commodity_id: "commodity".into(),
259            years: "2020".into(),
260            regions: "region".into(),
261            coeff,
262            kind: FlowType::Fixed,
263            cost,
264            is_primary_output,
265        }
266    }
267
268    #[test]
269    fn test_validate_flow_raw() {
270        // Valid
271        let valid =
272            create_process_flow_raw(FlowPerActivity(1.0), Some(MoneyPerFlow(0.0)), Some(false));
273        assert!(valid.validate().is_ok());
274
275        // Invalid: Bad flow value
276        let invalid =
277            create_process_flow_raw(FlowPerActivity(0.0), Some(MoneyPerFlow(0.0)), Some(false));
278        assert!(invalid.validate().is_err());
279        let invalid = create_process_flow_raw(
280            FlowPerActivity(f64::NAN),
281            Some(MoneyPerFlow(0.0)),
282            Some(false),
283        );
284        assert!(invalid.validate().is_err());
285        let invalid = create_process_flow_raw(
286            FlowPerActivity(f64::INFINITY),
287            Some(MoneyPerFlow(0.0)),
288            Some(false),
289        );
290        assert!(invalid.validate().is_err());
291        let invalid = create_process_flow_raw(
292            FlowPerActivity(f64::NEG_INFINITY),
293            Some(MoneyPerFlow(0.0)),
294            Some(false),
295        );
296        assert!(invalid.validate().is_err());
297
298        // Invalid: Bad flow cost value
299        let invalid = create_process_flow_raw(
300            FlowPerActivity(1.0),
301            Some(MoneyPerFlow(f64::NAN)),
302            Some(false),
303        );
304        assert!(invalid.validate().is_err());
305        let invalid = create_process_flow_raw(
306            FlowPerActivity(1.0),
307            Some(MoneyPerFlow(f64::NEG_INFINITY)),
308            Some(false),
309        );
310        assert!(invalid.validate().is_err());
311        let invalid = create_process_flow_raw(
312            FlowPerActivity(1.0),
313            Some(MoneyPerFlow(f64::INFINITY)),
314            Some(false),
315        );
316        assert!(invalid.validate().is_err());
317    }
318
319    #[rstest]
320    fn single_output_explicit_primary(#[from(svd_commodity)] commodity: Commodity) {
321        let c1 = Rc::new(commodity);
322        let mut flows = IndexMap::new();
323        flows.insert("commodity1".into(), flow(Rc::clone(&c1), 1.0));
324        let primary_outputs = vec![("commodity1".into(), Some(true))];
325        let res = validate_or_infer_primary_output(&flows, &primary_outputs).unwrap();
326        assert_eq!(res, None);
327    }
328
329    #[rstest]
330    fn multiple_outputs_one_explicit_primary(
331        #[from(svd_commodity)] commodity1: Commodity,
332        #[from(svd_commodity)] commodity2: Commodity,
333    ) {
334        let c1 = Rc::new(Commodity {
335            id: "c1".into(),
336            ..commodity1
337        });
338        let c2 = Rc::new(Commodity {
339            id: "c2".into(),
340            ..commodity2
341        });
342        let mut flows = IndexMap::new();
343        flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
344        flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
345        let primary_outputs = vec![("c1".into(), Some(true)), ("c2".into(), None)];
346        let res = validate_or_infer_primary_output(&flows, &primary_outputs).unwrap();
347        assert_eq!(res, None);
348    }
349
350    #[rstest]
351    fn multiple_outputs_none_explicit_should_error(
352        #[from(svd_commodity)] commodity1: Commodity,
353        #[from(svd_commodity)] commodity2: Commodity,
354    ) {
355        let c1 = Rc::new(Commodity {
356            id: "c1".into(),
357            ..commodity1
358        });
359        let c2 = Rc::new(Commodity {
360            id: "c2".into(),
361            ..commodity2
362        });
363        let mut flows = IndexMap::new();
364        flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
365        flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
366        let primary_outputs = vec![("c1".into(), None), ("c2".into(), None)];
367        let res = validate_or_infer_primary_output(&flows, &primary_outputs);
368        assert!(res.is_err());
369    }
370
371    #[rstest]
372    fn multiple_outputs_all_explicit_false_should_error(
373        #[from(svd_commodity)] commodity1: Commodity,
374        #[from(svd_commodity)] commodity2: Commodity,
375    ) {
376        let c1 = Rc::new(Commodity {
377            id: "c1".into(),
378            ..commodity1
379        });
380        let c2 = Rc::new(Commodity {
381            id: "c2".into(),
382            ..commodity2
383        });
384        let mut flows = IndexMap::new();
385        flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
386        flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
387        let primary_outputs = vec![("c1".into(), Some(false)), ("c2".into(), Some(false))];
388        let res = validate_or_infer_primary_output(&flows, &primary_outputs);
389        assert!(res.is_err());
390    }
391
392    #[rstest]
393    fn all_inputs(
394        #[from(svd_commodity)] commodity1: Commodity,
395        #[from(svd_commodity)] commodity2: Commodity,
396    ) {
397        let c1 = Rc::new(Commodity {
398            id: "c1".into(),
399            ..commodity1
400        });
401        let c2 = Rc::new(Commodity {
402            id: "c2".into(),
403            ..commodity2
404        });
405        let mut flows = IndexMap::new();
406        flows.insert("c1".into(), flow(Rc::clone(&c1), -1.0));
407        flows.insert("c2".into(), flow(Rc::clone(&c2), -2.0));
408        let primary_outputs = vec![("c1".into(), None), ("c2".into(), None)];
409        let res = validate_or_infer_primary_output(&flows, &primary_outputs).unwrap();
410        assert_eq!(res, None);
411    }
412
413    #[rstest]
414    fn multiple_outputs_multiple_explicit_primaries_should_error(
415        #[from(svd_commodity)] commodity1: Commodity,
416        #[from(svd_commodity)] commodity2: Commodity,
417    ) {
418        let c1 = Rc::new(Commodity {
419            id: "c1".into(),
420            ..commodity1
421        });
422        let c2 = Rc::new(Commodity {
423            id: "c2".into(),
424            ..commodity2
425        });
426        let mut flows = IndexMap::new();
427        flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
428        flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
429        let primary_outputs = vec![("c1".into(), Some(true)), ("c2".into(), Some(true))];
430        let res = validate_or_infer_primary_output(&flows, &primary_outputs);
431        assert!(res.is_err());
432    }
433
434    #[rstest]
435    fn explicit_primary_on_input_should_error(
436        #[from(svd_commodity)] commodity1: Commodity,
437        #[from(svd_commodity)] commodity2: Commodity,
438    ) {
439        let c1 = Rc::new(Commodity {
440            id: "c1".into(),
441            ..commodity1
442        });
443        let c2 = Rc::new(Commodity {
444            id: "c2".into(),
445            ..commodity2
446        });
447        let mut flows = IndexMap::new();
448        flows.insert("c1".into(), flow(Rc::clone(&c1), -1.0));
449        flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
450        let primary_outputs = vec![("c1".into(), Some(true)), ("c2".into(), None)];
451        let res = validate_or_infer_primary_output(&flows, &primary_outputs);
452        assert!(res.is_err());
453    }
454}