muse2/input/process/
flow.rs

1//! Code for reading process flows file
2use super::super::{input_err_msg, read_csv};
3use crate::commodity::{CommodityID, CommodityMap};
4use crate::process::{
5    FlowDirection, FlowType, ProcessFlow, ProcessFlowsMap, ProcessID, ProcessMap,
6};
7use crate::region::{RegionID, parse_region_str};
8use crate::units::{FlowPerActivity, MoneyPerFlow};
9use crate::year::parse_year_str;
10use anyhow::{Context, Result, ensure};
11use indexmap::IndexMap;
12use itertools::iproduct;
13use serde::Deserialize;
14use std::collections::HashMap;
15use std::path::Path;
16use std::rc::Rc;
17
18const PROCESS_FLOWS_FILE_NAME: &str = "process_flows.csv";
19
20#[derive(PartialEq, Debug, Deserialize)]
21struct ProcessFlowRaw {
22    process_id: String,
23    commodity_id: String,
24    commission_years: String,
25    regions: String,
26    coeff: FlowPerActivity,
27    #[serde(default)]
28    #[serde(rename = "type")]
29    kind: FlowType,
30    cost: Option<MoneyPerFlow>,
31}
32
33impl ProcessFlowRaw {
34    fn validate(&self) -> Result<()> {
35        // Check that flow is not infinity or nan.
36        ensure!(
37            self.coeff.is_finite(),
38            "Invalid value for coeff ({})",
39            self.coeff
40        );
41
42        // **TODO**: https://github.com/EnergySystemsModellingLab/MUSE2/issues/300
43        ensure!(
44            self.kind == FlowType::Fixed,
45            "Commodity flexible assets are not currently supported"
46        );
47
48        // Check that flow cost is non-negative
49        if let Some(cost) = self.cost {
50            ensure!(
51                (cost.value() >= 0.0),
52                "Invalid value for flow cost ({cost}). Must be >=0."
53            );
54        }
55
56        Ok(())
57    }
58}
59
60/// Read process flows from a CSV file
61pub fn read_process_flows(
62    model_dir: &Path,
63    processes: &mut ProcessMap,
64    commodities: &CommodityMap,
65    milestone_years: &[u32],
66) -> Result<HashMap<ProcessID, ProcessFlowsMap>> {
67    let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
68    let process_flow_csv = read_csv(&file_path)?;
69    read_process_flows_from_iter(process_flow_csv, processes, commodities, milestone_years)
70        .with_context(|| input_err_msg(&file_path))
71}
72
73/// Read '`ProcessFlowRaw`' records from an iterator and convert them into '`ProcessFlow`' records.
74fn read_process_flows_from_iter<I>(
75    iter: I,
76    processes: &mut ProcessMap,
77    commodities: &CommodityMap,
78    milestone_years: &[u32],
79) -> Result<HashMap<ProcessID, ProcessFlowsMap>>
80where
81    I: Iterator<Item = ProcessFlowRaw>,
82{
83    let mut flows_map: HashMap<ProcessID, ProcessFlowsMap> = HashMap::new();
84    for record in iter {
85        record.validate()?;
86
87        // Get process
88        let (id, process) = processes
89            .get_key_value(record.process_id.as_str())
90            .with_context(|| format!("Process {} not found", record.process_id))?;
91
92        // Get regions
93        let process_regions = &process.regions;
94        let record_regions =
95            parse_region_str(&record.regions, process_regions).with_context(|| {
96                format!("Invalid region for process {id}. Valid regions are {process_regions:?}")
97            })?;
98
99        // Get years
100        let process_years: Vec<u32> = process.years.clone().collect();
101        let record_years =
102            parse_year_str(&record.commission_years, &process_years).with_context(|| {
103                format!("Invalid year for process {id}. Valid years are {process_years:?}")
104            })?;
105
106        // Get commodity
107        let commodity = commodities
108            .get(record.commodity_id.as_str())
109            .with_context(|| format!("{} is not a valid commodity ID", &record.commodity_id))?;
110
111        // Create ProcessFlow object
112        let process_flow = ProcessFlow {
113            commodity: Rc::clone(commodity),
114            coeff: record.coeff,
115            kind: FlowType::Fixed,
116            cost: record.cost.unwrap_or(MoneyPerFlow(0.0)),
117        };
118
119        // Insert flow into the map
120        let region_year_map = flows_map.entry(id.clone()).or_default();
121        for (year, region_id) in iproduct!(record_years, record_regions.iter()) {
122            let flows_map = region_year_map
123                .entry((region_id.clone(), year))
124                .or_default();
125            let existing = Rc::get_mut(flows_map)
126                .unwrap() // safe: there will only be one copy
127                .insert(commodity.id.clone(), process_flow.clone())
128                .is_some();
129            ensure!(
130                !existing,
131                "Duplicate process flow entry for region {}, year {} and commodity {}",
132                region_id,
133                year,
134                commodity.id
135            );
136        }
137    }
138
139    validate_flows_and_update_primary_output(processes, &flows_map, milestone_years)?;
140    validate_secondary_flows(processes, &flows_map, milestone_years)?;
141
142    Ok(flows_map)
143}
144
145fn validate_flows_and_update_primary_output(
146    processes: &mut ProcessMap,
147    flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
148    milestone_years: &[u32],
149) -> Result<()> {
150    for (process_id, process) in processes.iter_mut() {
151        let map = flows_map
152            .get(process_id)
153            .with_context(|| format!("Missing flows map for process {process_id}"))?;
154
155        // Flows are required for all milestone years within the process years of activity
156        let required_years = milestone_years
157            .iter()
158            .filter(|&y| process.years.contains(y));
159        let region_year: Vec<(&RegionID, &u32)> =
160            iproduct!(process.regions.iter(), required_years).collect();
161
162        ensure!(
163            region_year
164                .iter()
165                .all(|(region_id, year)| map.contains_key(&((*region_id).clone(), **year))),
166            "Flows map for process {process_id} does not cover all regions and required years"
167        );
168
169        let primary_output = if let Some(primary_output) = &process.primary_output {
170            Some(primary_output.clone())
171        } else {
172            let (region_id, year) = region_year[0];
173            infer_primary_output(&map[&(region_id.clone(), *year)]).with_context(|| {
174                format!("Could not infer primary_output for process {process_id}")
175            })?
176        };
177
178        for (region_id, &year) in region_year {
179            let flows = &map[&(region_id.clone(), year)];
180
181            // Check that the process has flows for this region/year
182            check_flows_primary_output(flows, primary_output.as_ref()).with_context(|| {
183                format!(
184                    "Invalid primary output configuration for process {process_id} \
185                    (region: {region_id}, year: {year})"
186                )
187            })?;
188        }
189
190        // Update primary output if needed
191        if process.primary_output != primary_output {
192            // Safe: There should only be one ref to process
193            Rc::get_mut(process).unwrap().primary_output = primary_output;
194        }
195    }
196
197    Ok(())
198}
199
200/// Infer the primary output.
201///
202/// This is only possible if there is only one output flow for the process.
203fn infer_primary_output(map: &IndexMap<CommodityID, ProcessFlow>) -> Result<Option<CommodityID>> {
204    let mut iter = map.iter().filter_map(|(commodity_id, flow)| {
205        (flow.direction() == FlowDirection::Output).then_some(commodity_id)
206    });
207
208    let Some(first_output) = iter.next() else {
209        // If there are only input flows, then the primary output should be None
210        return Ok(None);
211    };
212
213    ensure!(
214        iter.next().is_none(),
215        "Need to specify primary_output explicitly if there are multiple output flows"
216    );
217
218    Ok(Some(first_output.clone()))
219}
220
221/// Check the flows are correct for the specified primary output (or lack thereof)
222fn check_flows_primary_output(
223    flows_map: &IndexMap<CommodityID, ProcessFlow>,
224    primary_output: Option<&CommodityID>,
225) -> Result<()> {
226    if let Some(primary_output) = primary_output {
227        let flow = flows_map.get(primary_output).with_context(|| {
228            format!("Primary output commodity '{primary_output}' isn't a process flow")
229        })?;
230
231        ensure!(
232            flow.direction() == FlowDirection::Output,
233            "Primary output commodity '{primary_output}' isn't an output flow",
234        );
235    } else {
236        ensure!(
237            flows_map
238                .values()
239                .all(|x| x.direction() == FlowDirection::Input
240                    || x.direction() == FlowDirection::Zero),
241            "First year is only inputs, but subsequent years have outputs, although no primary \
242            output is specified"
243        );
244    }
245
246    Ok(())
247}
248
249/// Checks that non-primary io are defined for all milestone years, at least, (within a region) and
250/// that they are only inputs or only outputs in all years.
251fn validate_secondary_flows(
252    processes: &mut ProcessMap,
253    flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
254    milestone_years: &[u32],
255) -> Result<()> {
256    for (process_id, process) in processes.iter() {
257        // Get the flows for this process - there should be no error, as was checked already
258        let map = flows_map
259            .get(process_id)
260            .with_context(|| format!("Missing flows map for process {process_id}"))?;
261
262        // Flows are required for all milestone years within the process years of activity
263        let required_years: Vec<&u32> = milestone_years
264            .iter()
265            .filter(|&y| process.years.contains(y))
266            .collect();
267
268        // Get the non-primary io flows for all years, if any, arranged by (commodity, region)
269        let iter = iproduct!(process.years.clone(), process.regions.iter());
270        let mut flows: HashMap<(CommodityID, RegionID), Vec<&ProcessFlow>> = HashMap::new();
271        let mut number_of_years: HashMap<(CommodityID, RegionID), u32> = HashMap::new();
272        for (year, region_id) in iter {
273            if let Some(commodity_map) = map.get(&(region_id.clone(), year)) {
274                let flow = commodity_map.iter().filter_map(|(commodity_id, flow)| {
275                    (Some(commodity_id) != process.primary_output.as_ref())
276                        .then_some(((commodity_id.clone(), region_id.clone()), flow))
277                });
278
279                for (key, value) in flow {
280                    flows.entry(key.clone()).or_default().push(value);
281                    if required_years.contains(&&year) {
282                        *number_of_years.entry(key).or_default() += 1;
283                    }
284                }
285            }
286        }
287
288        // Finally we check that the flows for a given commodity and region are defined for all
289        // milestone years and that they are all inputs or all outputs. This later check is done
290        // for all years in the process range, required or not.
291        for ((commodity_id, region_id), value) in &flows {
292            ensure!(
293                number_of_years[&(commodity_id.clone(), region_id.clone())]
294                    == required_years.len().try_into().unwrap(),
295                "Flow of commodity {commodity_id} in region {region_id} for process {process_id} \
296                does not cover all milestone years within the process range of activity."
297            );
298            let input_or_zero = value
299                .iter()
300                .all(|&x| [FlowDirection::Input, FlowDirection::Zero].contains(&x.direction()));
301            let output_or_zero = value
302                .iter()
303                .all(|&x| [FlowDirection::Output, FlowDirection::Zero].contains(&x.direction()));
304            ensure!(
305                input_or_zero || output_or_zero,
306                "Flow of commodity {commodity_id} in region {region_id} for process {process_id} \
307                behaves as input or output in different years."
308            );
309        }
310    }
311
312    Ok(())
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    use crate::commodity::Commodity;
319    use crate::fixture::{assert_error, process, sed_commodity, svd_commodity};
320    use crate::process::{FlowType, Process, ProcessFlow, ProcessMap};
321    use crate::units::{FlowPerActivity, MoneyPerFlow};
322    use indexmap::IndexMap;
323    use itertools::Itertools;
324    use map_macro::hash_map;
325    use rstest::rstest;
326    use std::iter;
327    use std::rc::Rc;
328
329    fn flow(commodity: Rc<Commodity>, coeff: f64) -> ProcessFlow {
330        ProcessFlow {
331            commodity,
332            coeff: FlowPerActivity(coeff),
333            kind: FlowType::Fixed,
334            cost: MoneyPerFlow(0.0),
335        }
336    }
337
338    fn build_maps<I>(
339        process: Process,
340        flows: I,
341        years: Option<Vec<u32>>,
342    ) -> (ProcessMap, HashMap<ProcessID, ProcessFlowsMap>)
343    where
344        I: Clone + Iterator<Item = (CommodityID, ProcessFlow)>,
345    {
346        let years = years.unwrap_or(process.years.clone().collect());
347        let map: Rc<IndexMap<_, _>> = Rc::new(flows.clone().collect());
348        let flows_inner = iproduct!(&process.regions, years)
349            .map(|(region_id, year)| ((region_id.clone(), year), map.clone()))
350            .collect();
351        let flows = hash_map! {process.id.clone() => flows_inner};
352        let processes = iter::once((process.id.clone(), process.into())).collect();
353
354        (processes, flows)
355    }
356
357    #[rstest]
358    fn single_output_infer_primary(#[from(svd_commodity)] commodity: Commodity, process: Process) {
359        let milestone_years = vec![2010, 2020];
360        let commodity = Rc::new(commodity);
361        let (mut processes, flows_map) = build_maps(
362            process,
363            std::iter::once((commodity.id.clone(), flow(commodity.clone(), 1.0))),
364            None,
365        );
366        assert!(
367            validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
368                .is_ok()
369        );
370        assert_eq!(
371            processes.values().exactly_one().unwrap().primary_output,
372            Some(commodity.id.clone())
373        );
374    }
375
376    #[rstest]
377    fn multiple_outputs_error(
378        #[from(svd_commodity)] commodity1: Commodity,
379        #[from(sed_commodity)] commodity2: Commodity,
380        process: Process,
381    ) {
382        let milestone_years: Vec<u32> = vec![2010, 2020];
383        let commodity1 = Rc::new(commodity1);
384        let commodity2 = Rc::new(commodity2);
385        let (mut processes, flows_map) = build_maps(
386            process,
387            [
388                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
389                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
390            ]
391            .into_iter(),
392            None,
393        );
394        let res =
395            validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years);
396        assert_error!(res, "Could not infer primary_output for process process1");
397    }
398
399    #[rstest]
400    fn explicit_primary_output(
401        #[from(svd_commodity)] commodity1: Commodity,
402        #[from(sed_commodity)] commodity2: Commodity,
403        process: Process,
404    ) {
405        let milestone_years = vec![2010, 2020];
406        let commodity1 = Rc::new(commodity1);
407        let commodity2 = Rc::new(commodity2);
408        let mut process = process;
409        process.primary_output = Some(commodity2.id.clone());
410        let (mut processes, flows_map) = build_maps(
411            process,
412            [
413                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
414                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
415            ]
416            .into_iter(),
417            None,
418        );
419        assert!(
420            validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
421                .is_ok()
422        );
423        assert_eq!(
424            processes.values().exactly_one().unwrap().primary_output,
425            Some(commodity2.id.clone())
426        );
427    }
428
429    #[rstest]
430    fn all_inputs_no_primary(
431        #[from(svd_commodity)] commodity1: Commodity,
432        #[from(sed_commodity)] commodity2: Commodity,
433        process: Process,
434    ) {
435        let milestone_years = vec![2010, 2020];
436        let commodity1 = Rc::new(commodity1);
437        let commodity2 = Rc::new(commodity2);
438        let (mut processes, flows_map) = build_maps(
439            process,
440            [
441                (commodity1.id.clone(), flow(commodity1.clone(), -1.0)),
442                (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
443            ]
444            .into_iter(),
445            None,
446        );
447        assert!(
448            validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
449                .is_ok()
450        );
451        assert_eq!(
452            processes.values().exactly_one().unwrap().primary_output,
453            None
454        );
455    }
456
457    #[rstest]
458    fn flows_not_in_all_milestone_years(
459        #[from(svd_commodity)] commodity1: Commodity,
460        #[from(sed_commodity)] commodity2: Commodity,
461        process: Process,
462    ) {
463        let milestone_years = vec![2010, 2015, 2020];
464        let flow_years = vec![2010, 2020];
465        let commodity1 = Rc::new(commodity1);
466        let commodity2 = Rc::new(commodity2);
467        let (mut processes, flows_map) = build_maps(
468            process,
469            [
470                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
471                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
472            ]
473            .into_iter(),
474            Some(flow_years),
475        );
476        let res =
477            validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years);
478        assert_error!(
479            res,
480            "Flows map for process process1 does not cover all regions and required years"
481        );
482    }
483
484    #[rstest]
485    fn flows_only_milestone_years(
486        #[from(svd_commodity)] commodity1: Commodity,
487        #[from(sed_commodity)] commodity2: Commodity,
488        process: Process,
489    ) {
490        let milestone_years = vec![2010, 2015, 2020];
491        let commodity1 = Rc::new(commodity1);
492        let commodity2 = Rc::new(commodity2);
493        let (mut processes, flows_map) = build_maps(
494            process,
495            [
496                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
497                (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
498            ]
499            .into_iter(),
500            Some(milestone_years.clone()),
501        );
502        assert!(
503            validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
504                .is_ok()
505        );
506    }
507}