muse2/input/process/
flow.rs

1//! Code for reading process flows file
2use super::super::*;
3use crate::commodity::{CommodityID, CommodityMap};
4use crate::process::{FlowType, ProcessFlow, ProcessFlowsMap, ProcessID, ProcessMap};
5use crate::region::parse_region_str;
6use crate::units::{FlowPerActivity, MoneyPerFlow};
7use crate::year::parse_year_str;
8use anyhow::{Context, Result, ensure};
9use itertools::iproduct;
10use serde::Deserialize;
11use std::collections::HashMap;
12use std::path::Path;
13use std::rc::Rc;
14
15const PROCESS_FLOWS_FILE_NAME: &str = "process_flows.csv";
16
17#[derive(PartialEq, Debug, Deserialize)]
18struct ProcessFlowRaw {
19    process_id: String,
20    commodity_id: String,
21    years: String,
22    regions: String,
23    coeff: FlowPerActivity,
24    #[serde(default)]
25    #[serde(rename = "type")]
26    kind: FlowType,
27    cost: Option<MoneyPerFlow>,
28}
29
30impl ProcessFlowRaw {
31    fn validate(&self) -> Result<()> {
32        // Check that flow is not infinity, nan, 0 etc.
33        ensure!(
34            self.coeff.is_normal(),
35            "Invalid value for coeff ({})",
36            self.coeff
37        );
38
39        // **TODO**: https://github.com/EnergySystemsModellingLab/MUSE_2.0/issues/300
40        ensure!(
41            self.kind == FlowType::Fixed,
42            "Commodity flexible assets are not currently supported"
43        );
44
45        // Check that flow cost is non-negative
46        if let Some(cost) = self.cost {
47            ensure!(
48                (0.0..f64::INFINITY).contains(&cost.value()),
49                "Invalid value for flow cost ({cost}). Must be >=0."
50            )
51        }
52
53        Ok(())
54    }
55}
56
57/// Read process flows from a CSV file
58pub fn read_process_flows(
59    model_dir: &Path,
60    processes: &mut ProcessMap,
61    commodities: &CommodityMap,
62) -> Result<HashMap<ProcessID, ProcessFlowsMap>> {
63    let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
64    let process_flow_csv = read_csv(&file_path)?;
65    read_process_flows_from_iter(process_flow_csv, processes, commodities)
66        .with_context(|| input_err_msg(&file_path))
67}
68
69/// Read 'ProcessFlowRaw' records from an iterator and convert them into 'ProcessFlow' records.
70fn read_process_flows_from_iter<I>(
71    iter: I,
72    processes: &mut ProcessMap,
73    commodities: &CommodityMap,
74) -> Result<HashMap<ProcessID, ProcessFlowsMap>>
75where
76    I: Iterator<Item = ProcessFlowRaw>,
77{
78    let mut flows_map: HashMap<ProcessID, ProcessFlowsMap> = HashMap::new();
79    for record in iter {
80        record.validate()?;
81
82        // Get process
83        let (id, process) = processes
84            .get_key_value(record.process_id.as_str())
85            .with_context(|| format!("Process {} not found", record.process_id))?;
86
87        // Get regions
88        let process_regions = &process.regions;
89        let record_regions =
90            parse_region_str(&record.regions, process_regions).with_context(|| {
91                format!("Invalid region for process {id}. Valid regions are {process_regions:?}")
92            })?;
93
94        // Get years
95        let process_years = &process.years;
96        let record_years = parse_year_str(&record.years, process_years).with_context(|| {
97            format!("Invalid year for process {id}. Valid years are {process_years:?}")
98        })?;
99
100        // Get commodity
101        let commodity = commodities
102            .get(record.commodity_id.as_str())
103            .with_context(|| format!("{} is not a valid commodity ID", &record.commodity_id))?;
104
105        // Create ProcessFlow object
106        let process_flow = ProcessFlow {
107            commodity: Rc::clone(commodity),
108            coeff: record.coeff,
109            kind: FlowType::Fixed,
110            cost: record.cost.unwrap_or(MoneyPerFlow(0.0)),
111        };
112
113        // Insert flow into the map
114        let region_year_map = flows_map.entry(id.clone()).or_default();
115        for (year, region_id) in iproduct!(record_years, record_regions.iter()) {
116            let flows_map = region_year_map
117                .entry((region_id.clone(), year))
118                .or_default();
119            let existing = flows_map
120                .insert(commodity.id.clone(), process_flow.clone())
121                .is_some();
122            ensure!(
123                !existing,
124                "Duplicate process flow entry for region {}, year {} and commodity {}",
125                region_id,
126                year,
127                commodity.id
128            );
129        }
130    }
131
132    validate_flows_and_update_primary_output(processes, &flows_map)?;
133
134    Ok(flows_map)
135}
136
137fn validate_flows_and_update_primary_output(
138    processes: &mut ProcessMap,
139    flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
140) -> Result<()> {
141    for (process_id, process) in processes.iter_mut() {
142        let map = flows_map
143            .get(process_id)
144            .with_context(|| format!("Missing flows map for process {process_id}"))?;
145
146        ensure!(
147            map.len() == process.years.len() * process.regions.len(),
148            "Flows map for process {process_id} does not cover all regions and years"
149        );
150
151        let mut iter = iproduct!(process.years.iter(), process.regions.iter());
152
153        let primary_output = match &process.primary_output {
154            Some(primary_output) => Some(primary_output.clone()),
155            None => {
156                let (year, region_id) = iter.next().unwrap();
157                infer_primary_output(&map[&(region_id.clone(), *year)]).with_context(|| {
158                    format!("Could not infer primary_output for process {process_id}")
159                })?
160            }
161        };
162
163        for (&year, region_id) in iter {
164            let flows = &map[&(region_id.clone(), year)];
165
166            // Check that the process has flows for this region/year
167            check_flows_primary_output(flows, &primary_output).with_context(|| {
168                format!(
169                    "Invalid primary output configuration for process {process_id} \
170                    (region: {region_id}, year: {year})"
171                )
172            })?;
173        }
174
175        // Update primary output if needed
176        if process.primary_output != primary_output {
177            // Safe: There should only be one ref to process
178            Rc::get_mut(process).unwrap().primary_output = primary_output;
179        }
180    }
181
182    Ok(())
183}
184
185/// Infer the primary output.
186///
187/// This is only possible if there is only one output flow for the process.
188fn infer_primary_output(map: &IndexMap<CommodityID, ProcessFlow>) -> Result<Option<CommodityID>> {
189    let mut iter = map
190        .iter()
191        .filter_map(|(commodity_id, flow)| flow.is_output().then_some(commodity_id));
192
193    let Some(first_output) = iter.next() else {
194        // If there are only input flows, then the primary output should be None
195        return Ok(None);
196    };
197
198    ensure!(
199        iter.next().is_none(),
200        "Need to specify primary_output explicitly if there are multiple output flows"
201    );
202
203    Ok(Some(first_output.clone()))
204}
205
206/// Check the flows are correct for the specified primary output (or lack thereof)
207fn check_flows_primary_output(
208    flows_map: &IndexMap<CommodityID, ProcessFlow>,
209    primary_output: &Option<CommodityID>,
210) -> Result<()> {
211    if let Some(primary_output) = primary_output {
212        let flow = flows_map.get(primary_output).with_context(|| {
213            format!("Primary output commodity '{primary_output}' isn't a process flow",)
214        })?;
215
216        ensure!(
217            flow.is_output(),
218            "Primary output commodity '{primary_output}' isn't an output flow",
219        );
220    } else {
221        ensure!(
222            flows_map.values().all(|flow| flow.is_input()),
223            "First year is only inputs, but subsequent years have outputs, although no primary \
224            output is specified"
225        );
226    }
227
228    Ok(())
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use crate::commodity::Commodity;
235    use crate::fixture::{assert_error, process, sed_commodity, svd_commodity};
236    use crate::process::{FlowType, Process, ProcessFlow, ProcessMap};
237    use crate::units::{FlowPerActivity, MoneyPerFlow};
238    use indexmap::IndexMap;
239    use map_macro::hash_map;
240    use rstest::rstest;
241    use std::iter;
242    use std::rc::Rc;
243
244    fn flow(commodity: Rc<Commodity>, coeff: f64) -> ProcessFlow {
245        ProcessFlow {
246            commodity,
247            coeff: FlowPerActivity(coeff),
248            kind: FlowType::Fixed,
249            cost: MoneyPerFlow(0.0),
250        }
251    }
252
253    fn build_maps<I>(
254        process: Process,
255        flows: I,
256    ) -> (ProcessMap, HashMap<ProcessID, ProcessFlowsMap>)
257    where
258        I: Clone + Iterator<Item = (CommodityID, ProcessFlow)>,
259    {
260        let map: IndexMap<CommodityID, ProcessFlow> = flows.clone().collect();
261        let flows_inner = iproduct!(&process.regions, &process.years)
262            .map(|(region_id, year)| ((region_id.clone(), *year), map.clone()))
263            .collect();
264        let flows = hash_map! {process.id.clone() => flows_inner};
265        let processes = iter::once((process.id.clone(), process.into())).collect();
266
267        (processes, flows)
268    }
269
270    #[rstest]
271    fn single_output_infer_primary(#[from(svd_commodity)] commodity: Commodity, process: Process) {
272        let commodity = Rc::new(commodity);
273        let (mut processes, flows_map) = build_maps(
274            process,
275            std::iter::once((commodity.id.clone(), flow(commodity.clone(), 1.0))),
276        );
277        assert!(validate_flows_and_update_primary_output(&mut processes, &flows_map).is_ok());
278        assert_eq!(
279            processes.values().exactly_one().unwrap().primary_output,
280            Some(commodity.id.clone())
281        );
282    }
283
284    #[rstest]
285    fn multiple_outputs_error(
286        #[from(svd_commodity)] commodity1: Commodity,
287        #[from(sed_commodity)] commodity2: Commodity,
288        process: Process,
289    ) {
290        let commodity1 = Rc::new(commodity1);
291        let commodity2 = Rc::new(commodity2);
292        let (mut processes, flows_map) = build_maps(
293            process,
294            [
295                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
296                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
297            ]
298            .into_iter(),
299        );
300        let res = validate_flows_and_update_primary_output(&mut processes, &flows_map);
301        assert_error!(res, "Could not infer primary_output for process process1");
302    }
303
304    #[rstest]
305    fn explicit_primary_output(
306        #[from(svd_commodity)] commodity1: Commodity,
307        #[from(sed_commodity)] commodity2: Commodity,
308        process: Process,
309    ) {
310        let commodity1 = Rc::new(commodity1);
311        let commodity2 = Rc::new(commodity2);
312        let mut process = process;
313        process.primary_output = Some(commodity2.id.clone());
314        let (mut processes, flows_map) = build_maps(
315            process,
316            [
317                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
318                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
319            ]
320            .into_iter(),
321        );
322        assert!(validate_flows_and_update_primary_output(&mut processes, &flows_map).is_ok());
323        assert_eq!(
324            processes.values().exactly_one().unwrap().primary_output,
325            Some(commodity2.id.clone())
326        );
327    }
328
329    #[rstest]
330    fn all_inputs_no_primary(
331        #[from(svd_commodity)] commodity1: Commodity,
332        #[from(sed_commodity)] commodity2: Commodity,
333        process: Process,
334    ) {
335        let commodity1 = Rc::new(commodity1);
336        let commodity2 = Rc::new(commodity2);
337        let (mut processes, flows_map) = build_maps(
338            process,
339            [
340                (commodity1.id.clone(), flow(commodity1.clone(), -1.0)),
341                (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
342            ]
343            .into_iter(),
344        );
345        assert!(validate_flows_and_update_primary_output(&mut processes, &flows_map).is_ok());
346        assert_eq!(
347            processes.values().exactly_one().unwrap().primary_output,
348            None
349        );
350    }
351}