muse2/input/process/
flow.rs

1//! Code for reading process flows file
2use super::super::{input_err_msg, read_csv};
3use crate::commodity::{CommodityID, CommodityMap};
4use crate::process::{FlowType, ProcessFlow, ProcessFlowsMap, ProcessID, ProcessMap};
5use crate::region::parse_region_str;
6use crate::units::{FlowPerActivity, MoneyPerFlow};
7use crate::year::parse_year_str;
8use anyhow::{Context, Result, ensure};
9use indexmap::IndexMap;
10use itertools::iproduct;
11use serde::Deserialize;
12use std::collections::HashMap;
13use std::path::Path;
14use std::rc::Rc;
15
16const PROCESS_FLOWS_FILE_NAME: &str = "process_flows.csv";
17
18#[derive(PartialEq, Debug, Deserialize)]
19struct ProcessFlowRaw {
20    process_id: String,
21    commodity_id: String,
22    years: String,
23    regions: String,
24    coeff: FlowPerActivity,
25    #[serde(default)]
26    #[serde(rename = "type")]
27    kind: FlowType,
28    cost: Option<MoneyPerFlow>,
29}
30
31impl ProcessFlowRaw {
32    fn validate(&self) -> Result<()> {
33        // Check that flow is not infinity, nan, 0 etc.
34        ensure!(
35            self.coeff.is_normal(),
36            "Invalid value for coeff ({})",
37            self.coeff
38        );
39
40        // **TODO**: https://github.com/EnergySystemsModellingLab/MUSE2/issues/300
41        ensure!(
42            self.kind == FlowType::Fixed,
43            "Commodity flexible assets are not currently supported"
44        );
45
46        // Check that flow cost is non-negative
47        if let Some(cost) = self.cost {
48            ensure!(
49                (0.0..f64::INFINITY).contains(&cost.value()),
50                "Invalid value for flow cost ({cost}). Must be >=0."
51            );
52        }
53
54        Ok(())
55    }
56}
57
58/// Read process flows from a CSV file
59pub fn read_process_flows(
60    model_dir: &Path,
61    processes: &mut ProcessMap,
62    commodities: &CommodityMap,
63) -> Result<HashMap<ProcessID, ProcessFlowsMap>> {
64    let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
65    let process_flow_csv = read_csv(&file_path)?;
66    read_process_flows_from_iter(process_flow_csv, processes, commodities)
67        .with_context(|| input_err_msg(&file_path))
68}
69
70/// Read '`ProcessFlowRaw`' records from an iterator and convert them into '`ProcessFlow`' records.
71fn read_process_flows_from_iter<I>(
72    iter: I,
73    processes: &mut ProcessMap,
74    commodities: &CommodityMap,
75) -> Result<HashMap<ProcessID, ProcessFlowsMap>>
76where
77    I: Iterator<Item = ProcessFlowRaw>,
78{
79    let mut flows_map: HashMap<ProcessID, ProcessFlowsMap> = HashMap::new();
80    for record in iter {
81        record.validate()?;
82
83        // Get process
84        let (id, process) = processes
85            .get_key_value(record.process_id.as_str())
86            .with_context(|| format!("Process {} not found", record.process_id))?;
87
88        // Get regions
89        let process_regions = &process.regions;
90        let record_regions =
91            parse_region_str(&record.regions, process_regions).with_context(|| {
92                format!("Invalid region for process {id}. Valid regions are {process_regions:?}")
93            })?;
94
95        // Get years
96        let process_years = &process.years;
97        let record_years = parse_year_str(&record.years, process_years).with_context(|| {
98            format!("Invalid year for process {id}. Valid years are {process_years:?}")
99        })?;
100
101        // Get commodity
102        let commodity = commodities
103            .get(record.commodity_id.as_str())
104            .with_context(|| format!("{} is not a valid commodity ID", &record.commodity_id))?;
105
106        // Create ProcessFlow object
107        let process_flow = ProcessFlow {
108            commodity: Rc::clone(commodity),
109            coeff: record.coeff,
110            kind: FlowType::Fixed,
111            cost: record.cost.unwrap_or(MoneyPerFlow(0.0)),
112        };
113
114        // Insert flow into the map
115        let region_year_map = flows_map.entry(id.clone()).or_default();
116        for (year, region_id) in iproduct!(record_years, record_regions.iter()) {
117            let flows_map = region_year_map
118                .entry((region_id.clone(), year))
119                .or_default();
120            let existing = Rc::get_mut(flows_map)
121                .unwrap() // safe: there will only be one copy
122                .insert(commodity.id.clone(), process_flow.clone())
123                .is_some();
124            ensure!(
125                !existing,
126                "Duplicate process flow entry for region {}, year {} and commodity {}",
127                region_id,
128                year,
129                commodity.id
130            );
131        }
132    }
133
134    validate_flows_and_update_primary_output(processes, &flows_map)?;
135
136    Ok(flows_map)
137}
138
139fn validate_flows_and_update_primary_output(
140    processes: &mut ProcessMap,
141    flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
142) -> Result<()> {
143    for (process_id, process) in processes.iter_mut() {
144        let map = flows_map
145            .get(process_id)
146            .with_context(|| format!("Missing flows map for process {process_id}"))?;
147
148        ensure!(
149            map.len() == process.years.len() * process.regions.len(),
150            "Flows map for process {process_id} does not cover all regions and years"
151        );
152
153        let mut iter = iproduct!(process.years.iter(), process.regions.iter());
154
155        let primary_output = if let Some(primary_output) = &process.primary_output {
156            Some(primary_output.clone())
157        } else {
158            let (year, region_id) = iter.next().unwrap();
159            infer_primary_output(&map[&(region_id.clone(), *year)]).with_context(|| {
160                format!("Could not infer primary_output for process {process_id}")
161            })?
162        };
163
164        for (&year, region_id) in iter {
165            let flows = &map[&(region_id.clone(), year)];
166
167            // Check that the process has flows for this region/year
168            check_flows_primary_output(flows, primary_output.as_ref()).with_context(|| {
169                format!(
170                    "Invalid primary output configuration for process {process_id} \
171                    (region: {region_id}, year: {year})"
172                )
173            })?;
174        }
175
176        // Update primary output if needed
177        if process.primary_output != primary_output {
178            // Safe: There should only be one ref to process
179            Rc::get_mut(process).unwrap().primary_output = primary_output;
180        }
181    }
182
183    Ok(())
184}
185
186/// Infer the primary output.
187///
188/// This is only possible if there is only one output flow for the process.
189fn infer_primary_output(map: &IndexMap<CommodityID, ProcessFlow>) -> Result<Option<CommodityID>> {
190    let mut iter = map
191        .iter()
192        .filter_map(|(commodity_id, flow)| flow.is_output().then_some(commodity_id));
193
194    let Some(first_output) = iter.next() else {
195        // If there are only input flows, then the primary output should be None
196        return Ok(None);
197    };
198
199    ensure!(
200        iter.next().is_none(),
201        "Need to specify primary_output explicitly if there are multiple output flows"
202    );
203
204    Ok(Some(first_output.clone()))
205}
206
207/// Check the flows are correct for the specified primary output (or lack thereof)
208fn check_flows_primary_output(
209    flows_map: &IndexMap<CommodityID, ProcessFlow>,
210    primary_output: Option<&CommodityID>,
211) -> Result<()> {
212    if let Some(primary_output) = primary_output {
213        let flow = flows_map.get(primary_output).with_context(|| {
214            format!("Primary output commodity '{primary_output}' isn't a process flow")
215        })?;
216
217        ensure!(
218            flow.is_output(),
219            "Primary output commodity '{primary_output}' isn't an output flow",
220        );
221    } else {
222        ensure!(
223            flows_map.values().all(ProcessFlow::is_input),
224            "First year is only inputs, but subsequent years have outputs, although no primary \
225            output is specified"
226        );
227    }
228
229    Ok(())
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use crate::commodity::Commodity;
236    use crate::fixture::{assert_error, process, sed_commodity, svd_commodity};
237    use crate::process::{FlowType, Process, ProcessFlow, ProcessMap};
238    use crate::units::{FlowPerActivity, MoneyPerFlow};
239    use indexmap::IndexMap;
240    use itertools::Itertools;
241    use map_macro::hash_map;
242    use rstest::rstest;
243    use std::iter;
244    use std::rc::Rc;
245
246    fn flow(commodity: Rc<Commodity>, coeff: f64) -> ProcessFlow {
247        ProcessFlow {
248            commodity,
249            coeff: FlowPerActivity(coeff),
250            kind: FlowType::Fixed,
251            cost: MoneyPerFlow(0.0),
252        }
253    }
254
255    fn build_maps<I>(
256        process: Process,
257        flows: I,
258    ) -> (ProcessMap, HashMap<ProcessID, ProcessFlowsMap>)
259    where
260        I: Clone + Iterator<Item = (CommodityID, ProcessFlow)>,
261    {
262        let map: Rc<IndexMap<_, _>> = Rc::new(flows.clone().collect());
263        let flows_inner = iproduct!(&process.regions, &process.years)
264            .map(|(region_id, year)| ((region_id.clone(), *year), map.clone()))
265            .collect();
266        let flows = hash_map! {process.id.clone() => flows_inner};
267        let processes = iter::once((process.id.clone(), process.into())).collect();
268
269        (processes, flows)
270    }
271
272    #[rstest]
273    fn single_output_infer_primary(#[from(svd_commodity)] commodity: Commodity, process: Process) {
274        let commodity = Rc::new(commodity);
275        let (mut processes, flows_map) = build_maps(
276            process,
277            std::iter::once((commodity.id.clone(), flow(commodity.clone(), 1.0))),
278        );
279        assert!(validate_flows_and_update_primary_output(&mut processes, &flows_map).is_ok());
280        assert_eq!(
281            processes.values().exactly_one().unwrap().primary_output,
282            Some(commodity.id.clone())
283        );
284    }
285
286    #[rstest]
287    fn multiple_outputs_error(
288        #[from(svd_commodity)] commodity1: Commodity,
289        #[from(sed_commodity)] commodity2: Commodity,
290        process: Process,
291    ) {
292        let commodity1 = Rc::new(commodity1);
293        let commodity2 = Rc::new(commodity2);
294        let (mut processes, flows_map) = build_maps(
295            process,
296            [
297                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
298                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
299            ]
300            .into_iter(),
301        );
302        let res = validate_flows_and_update_primary_output(&mut processes, &flows_map);
303        assert_error!(res, "Could not infer primary_output for process process1");
304    }
305
306    #[rstest]
307    fn explicit_primary_output(
308        #[from(svd_commodity)] commodity1: Commodity,
309        #[from(sed_commodity)] commodity2: Commodity,
310        process: Process,
311    ) {
312        let commodity1 = Rc::new(commodity1);
313        let commodity2 = Rc::new(commodity2);
314        let mut process = process;
315        process.primary_output = Some(commodity2.id.clone());
316        let (mut processes, flows_map) = build_maps(
317            process,
318            [
319                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
320                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
321            ]
322            .into_iter(),
323        );
324        assert!(validate_flows_and_update_primary_output(&mut processes, &flows_map).is_ok());
325        assert_eq!(
326            processes.values().exactly_one().unwrap().primary_output,
327            Some(commodity2.id.clone())
328        );
329    }
330
331    #[rstest]
332    fn all_inputs_no_primary(
333        #[from(svd_commodity)] commodity1: Commodity,
334        #[from(sed_commodity)] commodity2: Commodity,
335        process: Process,
336    ) {
337        let commodity1 = Rc::new(commodity1);
338        let commodity2 = Rc::new(commodity2);
339        let (mut processes, flows_map) = build_maps(
340            process,
341            [
342                (commodity1.id.clone(), flow(commodity1.clone(), -1.0)),
343                (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
344            ]
345            .into_iter(),
346        );
347        assert!(validate_flows_and_update_primary_output(&mut processes, &flows_map).is_ok());
348        assert_eq!(
349            processes.values().exactly_one().unwrap().primary_output,
350            None
351        );
352    }
353}