muse2/input/process/
flow.rs

1//! Code for reading process flows from a CSV file.
2use super::super::{input_err_msg, read_csv};
3use crate::commodity::{CommodityID, CommodityMap};
4use crate::process::{
5    FlowDirection, FlowType, ProcessFlow, ProcessFlowsMap, ProcessID, ProcessMap,
6};
7use crate::region::{RegionID, parse_region_str};
8use crate::units::{FlowPerActivity, MoneyPerFlow};
9use crate::year::parse_year_str;
10use anyhow::{Context, Result, ensure};
11use indexmap::IndexMap;
12use itertools::iproduct;
13use serde::Deserialize;
14use std::collections::HashMap;
15use std::path::Path;
16use std::rc::Rc;
17
18const PROCESS_FLOWS_FILE_NAME: &str = "process_flows.csv";
19
20#[derive(PartialEq, Debug, Deserialize)]
21struct ProcessFlowRaw {
22    process_id: String,
23    commodity_id: String,
24    commission_years: String,
25    regions: String,
26    coeff: FlowPerActivity,
27    #[serde(default)]
28    #[serde(rename = "type")]
29    kind: FlowType,
30    cost: Option<MoneyPerFlow>,
31}
32
33impl ProcessFlowRaw {
34    fn validate(&self) -> Result<()> {
35        // Check that flow is not infinity or nan.
36        ensure!(
37            self.coeff.is_finite(),
38            "Invalid value for coeff ({})",
39            self.coeff
40        );
41
42        // **TODO**: https://github.com/EnergySystemsModellingLab/MUSE2/issues/300
43        ensure!(
44            self.kind == FlowType::Fixed,
45            "Commodity flexible assets are not currently supported"
46        );
47
48        // Check that flow cost is non-negative
49        if let Some(cost) = self.cost {
50            ensure!(
51                (cost.value() >= 0.0),
52                "Invalid value for flow cost ({cost}). Must be >=0."
53            );
54        }
55
56        Ok(())
57    }
58}
59
60/// Read process flows from a CSV file.
61///
62/// # Arguments
63///
64/// * `model_dir` - Folder containing model configuration files
65/// * `processes` - Mutable map of known processes (may be updated)
66/// * `commodities` - Map of known commodities
67/// * `milestone_years` - Milestone years used by the model
68///
69/// # Returns
70///
71/// A `HashMap<ProcessID, ProcessFlowsMap>` mapping process IDs to their flows.
72pub fn read_process_flows(
73    model_dir: &Path,
74    processes: &mut ProcessMap,
75    commodities: &CommodityMap,
76    milestone_years: &[u32],
77) -> Result<HashMap<ProcessID, ProcessFlowsMap>> {
78    let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
79    let process_flow_csv = read_csv(&file_path)?;
80    read_process_flows_from_iter(process_flow_csv, processes, commodities, milestone_years)
81        .with_context(|| input_err_msg(&file_path))
82}
83
84/// Read `ProcessFlowRaw` records from an iterator and convert them into `ProcessFlow` records.
85///
86/// # Arguments
87///
88/// * `iter` - Iterator over `ProcessFlowRaw` records
89/// * `processes` - Mutable map of known processes used for validation and updates
90/// * `commodities` - Map of known commodities
91/// * `milestone_years` - Milestone years used by the model
92///
93/// # Returns
94///
95/// A `HashMap<ProcessID, ProcessFlowsMap>` mapping process IDs to their flows.
96fn read_process_flows_from_iter<I>(
97    iter: I,
98    processes: &mut ProcessMap,
99    commodities: &CommodityMap,
100    milestone_years: &[u32],
101) -> Result<HashMap<ProcessID, ProcessFlowsMap>>
102where
103    I: Iterator<Item = ProcessFlowRaw>,
104{
105    let mut flows_map: HashMap<ProcessID, ProcessFlowsMap> = HashMap::new();
106    for record in iter {
107        record.validate()?;
108
109        // Get process
110        let (id, process) = processes
111            .get_key_value(record.process_id.as_str())
112            .with_context(|| format!("Process {} not found", record.process_id))?;
113
114        // Get regions
115        let process_regions = &process.regions;
116        let record_regions =
117            parse_region_str(&record.regions, process_regions).with_context(|| {
118                format!("Invalid region for process {id}. Valid regions are {process_regions:?}")
119            })?;
120
121        // Get years
122        let process_years: Vec<u32> = process.years.clone().collect();
123        let record_years =
124            parse_year_str(&record.commission_years, &process_years).with_context(|| {
125                format!("Invalid year for process {id}. Valid years are {process_years:?}")
126            })?;
127
128        // Get commodity
129        let commodity = commodities
130            .get(record.commodity_id.as_str())
131            .with_context(|| format!("{} is not a valid commodity ID", &record.commodity_id))?;
132
133        // Create ProcessFlow object
134        let process_flow = ProcessFlow {
135            commodity: Rc::clone(commodity),
136            coeff: record.coeff,
137            kind: FlowType::Fixed,
138            cost: record.cost.unwrap_or(MoneyPerFlow(0.0)),
139        };
140
141        // Insert flow into the map
142        let region_year_map = flows_map.entry(id.clone()).or_default();
143        for (year, region_id) in iproduct!(record_years, record_regions.iter()) {
144            let flows_map = region_year_map
145                .entry((region_id.clone(), year))
146                .or_default();
147            let existing = Rc::get_mut(flows_map)
148                .unwrap() // safe: there will only be one copy
149                .insert(commodity.id.clone(), process_flow.clone())
150                .is_some();
151            ensure!(
152                !existing,
153                "Duplicate process flow entry for region {}, year {} and commodity {}",
154                region_id,
155                year,
156                commodity.id
157            );
158        }
159    }
160
161    validate_flows_and_update_primary_output(processes, &flows_map, milestone_years)?;
162    validate_secondary_flows(processes, &flows_map, milestone_years)?;
163
164    Ok(flows_map)
165}
166
167fn validate_flows_and_update_primary_output(
168    processes: &mut ProcessMap,
169    flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
170    milestone_years: &[u32],
171) -> Result<()> {
172    for (process_id, process) in processes.iter_mut() {
173        let map = flows_map
174            .get(process_id)
175            .with_context(|| format!("Missing flows map for process {process_id}"))?;
176
177        // Flows are required for all milestone years within the process years of activity
178        let required_years = milestone_years
179            .iter()
180            .filter(|&y| process.years.contains(y));
181        let region_year: Vec<(&RegionID, &u32)> =
182            iproduct!(process.regions.iter(), required_years).collect();
183
184        ensure!(
185            region_year
186                .iter()
187                .all(|(region_id, year)| map.contains_key(&((*region_id).clone(), **year))),
188            "Flows map for process {process_id} does not cover all regions and required years"
189        );
190
191        let primary_output = if let Some(primary_output) = &process.primary_output {
192            Some(primary_output.clone())
193        } else {
194            let (region_id, year) = region_year[0];
195            infer_primary_output(&map[&(region_id.clone(), *year)]).with_context(|| {
196                format!("Could not infer primary_output for process {process_id}")
197            })?
198        };
199
200        for (region_id, &year) in region_year {
201            let flows = &map[&(region_id.clone(), year)];
202
203            // Check that the process has flows for this region/year
204            check_flows_primary_output(flows, primary_output.as_ref()).with_context(|| {
205                format!(
206                    "Invalid primary output configuration for process {process_id} \
207                    (region: {region_id}, year: {year})"
208                )
209            })?;
210        }
211
212        // Update primary output if needed
213        if process.primary_output != primary_output {
214            // Safe: There should only be one ref to process
215            Rc::get_mut(process).unwrap().primary_output = primary_output;
216        }
217    }
218
219    Ok(())
220}
221
222/// Infer the primary output.
223///
224/// This is only possible if there is only one output flow for the process.
225fn infer_primary_output(map: &IndexMap<CommodityID, ProcessFlow>) -> Result<Option<CommodityID>> {
226    let mut iter = map.iter().filter_map(|(commodity_id, flow)| {
227        (flow.direction() == FlowDirection::Output).then_some(commodity_id)
228    });
229
230    let Some(first_output) = iter.next() else {
231        // If there are only input flows, then the primary output should be None
232        return Ok(None);
233    };
234
235    ensure!(
236        iter.next().is_none(),
237        "Need to specify primary_output explicitly if there are multiple output flows"
238    );
239
240    Ok(Some(first_output.clone()))
241}
242
243/// Check the flows are correct for the specified primary output (or lack thereof)
244fn check_flows_primary_output(
245    flows_map: &IndexMap<CommodityID, ProcessFlow>,
246    primary_output: Option<&CommodityID>,
247) -> Result<()> {
248    if let Some(primary_output) = primary_output {
249        let flow = flows_map.get(primary_output).with_context(|| {
250            format!("Primary output commodity '{primary_output}' isn't a process flow")
251        })?;
252
253        ensure!(
254            flow.direction() == FlowDirection::Output,
255            "Primary output commodity '{primary_output}' isn't an output flow",
256        );
257    } else {
258        ensure!(
259            flows_map
260                .values()
261                .all(|x| x.direction() == FlowDirection::Input
262                    || x.direction() == FlowDirection::Zero),
263            "First year is only inputs, but subsequent years have outputs, although no primary \
264            output is specified"
265        );
266    }
267
268    Ok(())
269}
270
271/// Checks that non-primary io are defined for all milestone years, at least, (within a region) and
272/// that they are only inputs or only outputs in all years.
273fn validate_secondary_flows(
274    processes: &mut ProcessMap,
275    flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
276    milestone_years: &[u32],
277) -> Result<()> {
278    for (process_id, process) in processes.iter() {
279        // Get the flows for this process - there should be no error, as was checked already
280        let map = flows_map
281            .get(process_id)
282            .with_context(|| format!("Missing flows map for process {process_id}"))?;
283
284        // Flows are required for all milestone years within the process years of activity
285        let required_years: Vec<&u32> = milestone_years
286            .iter()
287            .filter(|&y| process.years.contains(y))
288            .collect();
289
290        // Get the non-primary io flows for all years, if any, arranged by (commodity, region)
291        let iter = iproduct!(process.years.clone(), process.regions.iter());
292        let mut flows: HashMap<(CommodityID, RegionID), Vec<&ProcessFlow>> = HashMap::new();
293        let mut number_of_years: HashMap<(CommodityID, RegionID), u32> = HashMap::new();
294        for (year, region_id) in iter {
295            if let Some(commodity_map) = map.get(&(region_id.clone(), year)) {
296                let flow = commodity_map.iter().filter_map(|(commodity_id, flow)| {
297                    (Some(commodity_id) != process.primary_output.as_ref())
298                        .then_some(((commodity_id.clone(), region_id.clone()), flow))
299                });
300
301                for (key, value) in flow {
302                    flows.entry(key.clone()).or_default().push(value);
303                    if required_years.contains(&&year) {
304                        *number_of_years.entry(key).or_default() += 1;
305                    }
306                }
307            }
308        }
309
310        // Finally we check that the flows for a given commodity and region are defined for all
311        // milestone years and that they are all inputs or all outputs. This later check is done
312        // for all years in the process range, required or not.
313        for ((commodity_id, region_id), value) in &flows {
314            ensure!(
315                number_of_years[&(commodity_id.clone(), region_id.clone())]
316                    == required_years.len().try_into().unwrap(),
317                "Flow of commodity {commodity_id} in region {region_id} for process {process_id} \
318                does not cover all milestone years within the process range of activity."
319            );
320            let input_or_zero = value
321                .iter()
322                .all(|&x| [FlowDirection::Input, FlowDirection::Zero].contains(&x.direction()));
323            let output_or_zero = value
324                .iter()
325                .all(|&x| [FlowDirection::Output, FlowDirection::Zero].contains(&x.direction()));
326            ensure!(
327                input_or_zero || output_or_zero,
328                "Flow of commodity {commodity_id} in region {region_id} for process {process_id} \
329                behaves as input or output in different years."
330            );
331        }
332    }
333
334    Ok(())
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use crate::commodity::Commodity;
341    use crate::fixture::{
342        assert_error, assert_validate_fails_with_simple, assert_validate_ok_simple, process,
343        sed_commodity, svd_commodity,
344    };
345    use crate::patch::FilePatch;
346    use crate::process::{FlowType, Process, ProcessFlow, ProcessMap};
347    use crate::units::{FlowPerActivity, MoneyPerFlow};
348    use indexmap::IndexMap;
349    use itertools::Itertools;
350    use map_macro::hash_map;
351    use rstest::rstest;
352    use std::iter;
353    use std::rc::Rc;
354
355    fn flow(commodity: Rc<Commodity>, coeff: f64) -> ProcessFlow {
356        ProcessFlow {
357            commodity,
358            coeff: FlowPerActivity(coeff),
359            kind: FlowType::Fixed,
360            cost: MoneyPerFlow(0.0),
361        }
362    }
363
364    fn build_maps<I>(
365        process: Process,
366        flows: I,
367        years: Option<Vec<u32>>,
368    ) -> (ProcessMap, HashMap<ProcessID, ProcessFlowsMap>)
369    where
370        I: Clone + Iterator<Item = (CommodityID, ProcessFlow)>,
371    {
372        let years = years.unwrap_or(process.years.clone().collect());
373        let map: Rc<IndexMap<_, _>> = Rc::new(flows.collect());
374        let flows_inner = iproduct!(&process.regions, years)
375            .map(|(region_id, year)| ((region_id.clone(), year), map.clone()))
376            .collect();
377        let flows = hash_map! {process.id.clone() => flows_inner};
378        let processes = iter::once((process.id.clone(), process.into())).collect();
379
380        (processes, flows)
381    }
382
383    #[rstest]
384    fn single_output_infer_primary(#[from(svd_commodity)] commodity: Commodity, process: Process) {
385        let milestone_years = vec![2010, 2020];
386        let commodity = Rc::new(commodity);
387        let (mut processes, flows_map) = build_maps(
388            process,
389            std::iter::once((commodity.id.clone(), flow(commodity.clone(), 1.0))),
390            None,
391        );
392        validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
393            .unwrap();
394        assert_eq!(
395            processes.values().exactly_one().unwrap().primary_output,
396            Some(commodity.id.clone())
397        );
398    }
399
400    #[rstest]
401    fn multiple_outputs_error(
402        #[from(svd_commodity)] commodity1: Commodity,
403        #[from(sed_commodity)] commodity2: Commodity,
404        process: Process,
405    ) {
406        let milestone_years: Vec<u32> = vec![2010, 2020];
407        let commodity1 = Rc::new(commodity1);
408        let commodity2 = Rc::new(commodity2);
409        let (mut processes, flows_map) = build_maps(
410            process,
411            [
412                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
413                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
414            ]
415            .into_iter(),
416            None,
417        );
418        let res =
419            validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years);
420        assert_error!(res, "Could not infer primary_output for process process1");
421    }
422
423    #[rstest]
424    fn explicit_primary_output(
425        #[from(svd_commodity)] commodity1: Commodity,
426        #[from(sed_commodity)] commodity2: Commodity,
427        process: Process,
428    ) {
429        let milestone_years = vec![2010, 2020];
430        let commodity1 = Rc::new(commodity1);
431        let commodity2 = Rc::new(commodity2);
432        let mut process = process;
433        process.primary_output = Some(commodity2.id.clone());
434        let (mut processes, flows_map) = build_maps(
435            process,
436            [
437                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
438                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
439            ]
440            .into_iter(),
441            None,
442        );
443        validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
444            .unwrap();
445        assert_eq!(
446            processes.values().exactly_one().unwrap().primary_output,
447            Some(commodity2.id.clone())
448        );
449    }
450
451    #[rstest]
452    fn all_inputs_no_primary(
453        #[from(svd_commodity)] commodity1: Commodity,
454        #[from(sed_commodity)] commodity2: Commodity,
455        process: Process,
456    ) {
457        let milestone_years = vec![2010, 2020];
458        let commodity1 = Rc::new(commodity1);
459        let commodity2 = Rc::new(commodity2);
460        let (mut processes, flows_map) = build_maps(
461            process,
462            [
463                (commodity1.id.clone(), flow(commodity1.clone(), -1.0)),
464                (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
465            ]
466            .into_iter(),
467            None,
468        );
469        validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
470            .unwrap();
471        assert_eq!(
472            processes.values().exactly_one().unwrap().primary_output,
473            None
474        );
475    }
476
477    #[rstest]
478    fn flows_not_in_all_milestone_years(
479        #[from(svd_commodity)] commodity1: Commodity,
480        #[from(sed_commodity)] commodity2: Commodity,
481        process: Process,
482    ) {
483        let milestone_years = vec![2010, 2015, 2020];
484        let flow_years = vec![2010, 2020];
485        let commodity1 = Rc::new(commodity1);
486        let commodity2 = Rc::new(commodity2);
487        let (mut processes, flows_map) = build_maps(
488            process,
489            [
490                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
491                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
492            ]
493            .into_iter(),
494            Some(flow_years),
495        );
496        let res =
497            validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years);
498        assert_error!(
499            res,
500            "Flows map for process process1 does not cover all regions and required years"
501        );
502    }
503
504    #[rstest]
505    fn flows_only_milestone_years(
506        #[from(svd_commodity)] commodity1: Commodity,
507        #[from(sed_commodity)] commodity2: Commodity,
508        process: Process,
509    ) {
510        let milestone_years = vec![2010, 2015, 2020];
511        let commodity1 = Rc::new(commodity1);
512        let commodity2 = Rc::new(commodity2);
513        let (mut processes, flows_map) = build_maps(
514            process,
515            [
516                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
517                (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
518            ]
519            .into_iter(),
520            Some(milestone_years.clone()),
521        );
522        validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
523            .unwrap();
524    }
525
526    #[test]
527    fn flows_different_direction_different_years() {
528        let patch = FilePatch::new("process_flows.csv")
529            .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
530            .with_addition("GASPRC,GASPRD,all,2020;2030,-1.05,fixed,")
531            .with_addition("GASPRC,GASPRD,all,2040,1.05,fixed,");
532        assert_validate_fails_with_simple!(
533            vec![patch],
534            "Flow of commodity GASPRD in region GBR for process GASPRC behaves as input or output in different years."
535        );
536    }
537
538    #[test]
539    fn missing_flow() {
540        let patch = FilePatch::new("process_flows.csv")
541            .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
542            .with_addition("GASPRC,GASPRD,all,2020;2030,-1.05,fixed,");
543        assert_validate_fails_with_simple!(
544            vec![patch],
545            "Flow of commodity GASPRD in region GBR for process GASPRC does not cover all milestone years within the process range of activity."
546        );
547    }
548
549    #[test]
550    fn coeff_zero() {
551        let patch = FilePatch::new("process_flows.csv")
552            .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
553            .with_addition("GASPRC,GASPRD,all,2020;2030,-1.05,fixed,")
554            .with_addition("GASPRC,GASPRD,all,2040,0,fixed,");
555        assert_validate_ok_simple!(vec![patch]);
556    }
557
558    #[test]
559    fn flows_not_needed_before_time_horizon() {
560        // NB: Time horizon starts at 2020 for simple example
561        //
562        // Flows are only needed for milestone years. Check that users can omit them for
563        // non-milestone years.
564        let patches = vec![
565            FilePatch::new("processes.csv")
566                .with_deletion("GASDRV,Dry gas extraction,all,GASPRD,2020,2040,1.0,")
567                .with_addition("GASDRV,Dry gas extraction,all,GASPRD,1980,2040,1.0,"),
568            FilePatch::new("process_flows.csv")
569                .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
570                .with_addition("GASPRC,GASPRD,all,2020;2030;2040,-1.05,fixed,"),
571        ];
572        assert_validate_ok_simple!(patches);
573    }
574}