Skip to main content

muse2/input/process/
flow.rs

1//! Code for reading process flows from a CSV file.
2use super::super::{input_err_msg, read_csv};
3use crate::commodity::{CommodityID, CommodityMap, CommodityType};
4use crate::id::GetIDValue;
5use crate::input::parse_year_str;
6use crate::process::{
7    FlowDirection, FlowType, ProcessFlow, ProcessFlowsMap, ProcessID, ProcessMap,
8};
9use crate::region::{RegionID, parse_region_str};
10use crate::units::{FlowPerActivity, MoneyPerFlow};
11use anyhow::{Context, Result, bail, ensure};
12use indexmap::{IndexMap, IndexSet};
13use itertools::iproduct;
14use serde::Deserialize;
15use std::collections::HashMap;
16use std::path::Path;
17use std::rc::Rc;
18
19const PROCESS_FLOWS_FILE_NAME: &str = "process_flows.csv";
20
21#[derive(PartialEq, Debug, Deserialize)]
22struct ProcessFlowRaw {
23    process_id: String,
24    commodity_id: String,
25    commission_years: String,
26    regions: String,
27    coeff: FlowPerActivity,
28    #[serde(default)]
29    #[serde(rename = "type")]
30    kind: FlowType,
31    cost: Option<MoneyPerFlow>,
32}
33
34impl ProcessFlowRaw {
35    fn validate(&self) -> Result<()> {
36        // Check that flow is not infinity or nan.
37        ensure!(
38            self.coeff.is_finite(),
39            "Invalid value for coeff ({})",
40            self.coeff
41        );
42
43        // **TODO**: https://github.com/EnergySystemsModellingLab/MUSE2/issues/300
44        ensure!(
45            self.kind == FlowType::Fixed,
46            "Commodity flexible assets are not currently supported"
47        );
48
49        // Check that flow cost is non-negative
50        if let Some(cost) = self.cost {
51            ensure!(
52                (cost.value() >= 0.0),
53                "Invalid value for flow cost ({cost}). Must be >=0."
54            );
55        }
56
57        Ok(())
58    }
59}
60
61/// Read process flows from a CSV file.
62///
63/// # Arguments
64///
65/// * `model_dir` - Folder containing model configuration files
66/// * `processes` - Mutable map of known processes (may be updated)
67/// * `commodities` - Map of known commodities
68/// * `milestone_years` - Milestone years used by the model
69///
70/// # Returns
71///
72/// A `HashMap<ProcessID, ProcessFlowsMap>` mapping process IDs to their flows.
73pub fn read_process_flows(
74    model_dir: &Path,
75    processes: &mut ProcessMap,
76    commodities: &CommodityMap,
77    milestone_years: &[u32],
78) -> Result<HashMap<ProcessID, ProcessFlowsMap>> {
79    let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
80    let process_flow_csv = read_csv(&file_path)?;
81    read_process_flows_from_iter(process_flow_csv, processes, commodities, milestone_years)
82        .with_context(|| input_err_msg(&file_path))
83}
84
85/// Validates that all SED/SVD outputs from each process have consistent units.
86///
87/// For processes with multiple SED/SVD outputs, the annual fixed costs are distributed
88/// proportionally based on flow coefficients. This only makes sense if all outputs share
89/// the same units.
90fn validate_output_flows_units(flows_map: &HashMap<ProcessID, ProcessFlowsMap>) -> Result<()> {
91    // Collect all validation errors so that the error reported is deterministic,
92    // this is needed because ProcessFlows are stored in a HashMap.
93    let mut errors: Vec<(ProcessID, RegionID, u32, Vec<&str>)> = Vec::new();
94
95    for (process_id, process_flows) in flows_map {
96        for ((region_id, year), flows) in process_flows {
97            let sed_svd_output_units: IndexSet<&str> = flows
98                .values()
99                .filter_map(|flow| {
100                    let commodity = &flow.commodity;
101                    (flow.coeff.value() > 0.0
102                        && matches!(
103                            commodity.kind,
104                            CommodityType::ServiceDemand | CommodityType::SupplyEqualsDemand
105                        ))
106                    .then_some(commodity.units.as_str())
107                })
108                .collect();
109
110            // Record error if validation fails
111            if sed_svd_output_units.len() > 1 {
112                errors.push((
113                    process_id.clone(),
114                    region_id.clone(),
115                    *year,
116                    sed_svd_output_units.into_iter().collect(),
117                ));
118            }
119        }
120    }
121
122    // Sort errors for deterministic ordering
123    errors.sort_by_key(|(process_id, region_id, year, _)| {
124        (process_id.clone(), region_id.clone(), *year)
125    });
126
127    // Return first error if any exist
128    if let Some((process_id, region_id, year, units)) = errors.first() {
129        bail!(
130            "Process {process_id} has SED/SVD outputs with different units: [{}] \
131             in region: {region_id} and year: {year}",
132            units.join(", ")
133        );
134    }
135
136    Ok(())
137}
138
139/// Read `ProcessFlowRaw` records from an iterator and convert them into `ProcessFlow` records.
140///
141/// # Arguments
142///
143/// * `iter` - Iterator over `ProcessFlowRaw` records
144/// * `processes` - Mutable map of known processes used for validation and updates
145/// * `commodities` - Map of known commodities
146/// * `milestone_years` - Milestone years used by the model
147///
148/// # Returns
149///
150/// A `HashMap<ProcessID, ProcessFlowsMap>` mapping process IDs to their flows.
151fn read_process_flows_from_iter<I>(
152    iter: I,
153    processes: &mut ProcessMap,
154    commodities: &CommodityMap,
155    milestone_years: &[u32],
156) -> Result<HashMap<ProcessID, ProcessFlowsMap>>
157where
158    I: Iterator<Item = ProcessFlowRaw>,
159{
160    let mut flows_map: HashMap<ProcessID, ProcessFlowsMap> = HashMap::new();
161    for record in iter {
162        record.validate()?;
163
164        // Get process
165        let (id, process) = processes.get_id_value(&record.process_id)?;
166
167        // Get regions
168        let process_regions = &process.regions;
169        let record_regions =
170            parse_region_str(&record.regions, process_regions).with_context(|| {
171                format!("Invalid region for process {id}. Valid regions are {process_regions:?}")
172            })?;
173
174        // Get years
175        let process_years: Vec<u32> = process.years.clone().collect();
176        let record_years =
177            parse_year_str(&record.commission_years, &process_years).with_context(|| {
178                format!("Invalid year for process {id}. Valid years are {process_years:?}")
179            })?;
180
181        // Get commodity
182        let (_, commodity) = commodities.get_id_value(&record.commodity_id)?;
183
184        // Create ProcessFlow object
185        let process_flow = ProcessFlow {
186            commodity: Rc::clone(commodity),
187            coeff: record.coeff,
188            kind: FlowType::Fixed,
189            cost: record.cost.unwrap_or(MoneyPerFlow(0.0)),
190        };
191
192        // Insert flow into the map
193        let region_year_map = flows_map.entry(id.clone()).or_default();
194        for (year, region_id) in iproduct!(record_years, record_regions.iter()) {
195            let flows_map = region_year_map
196                .entry((region_id.clone(), year))
197                .or_default();
198            let existing = Rc::get_mut(flows_map)
199                .unwrap() // safe: there will only be one copy
200                .insert(commodity.id.clone(), process_flow.clone())
201                .is_some();
202            ensure!(
203                !existing,
204                "Duplicate process flow entry for region {}, year {} and commodity {}",
205                region_id,
206                year,
207                commodity.id
208            );
209        }
210    }
211
212    validate_flows_and_update_primary_output(processes, &flows_map, milestone_years)?;
213    validate_secondary_flows(processes, &flows_map, milestone_years)?;
214    validate_output_flows_units(&flows_map)?;
215
216    Ok(flows_map)
217}
218
219fn validate_flows_and_update_primary_output(
220    processes: &mut ProcessMap,
221    flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
222    milestone_years: &[u32],
223) -> Result<()> {
224    for (process_id, process) in processes.iter_mut() {
225        let map = flows_map
226            .get(process_id)
227            .with_context(|| format!("Missing flows map for process {process_id}"))?;
228
229        // Flows are required for all milestone years within the process years of activity
230        let required_years = milestone_years
231            .iter()
232            .filter(|&y| process.years.contains(y));
233        let region_year: Vec<(&RegionID, &u32)> =
234            iproduct!(process.regions.iter(), required_years).collect();
235
236        ensure!(
237            region_year
238                .iter()
239                .all(|(region_id, year)| map.contains_key(&((*region_id).clone(), **year))),
240            "Flows map for process {process_id} does not cover all regions and required years"
241        );
242
243        let primary_output = if let Some(primary_output) = &process.primary_output {
244            Some(primary_output.clone())
245        } else {
246            let (region_id, year) = region_year[0];
247            infer_primary_output(&map[&(region_id.clone(), *year)]).with_context(|| {
248                format!("Could not infer primary_output for process {process_id}")
249            })?
250        };
251
252        for (region_id, &year) in region_year {
253            let flows = &map[&(region_id.clone(), year)];
254
255            // Check that the process has flows for this region/year
256            check_flows_primary_output(flows, primary_output.as_ref()).with_context(|| {
257                format!(
258                    "Invalid primary output configuration for process {process_id} \
259                    (region: {region_id}, year: {year})"
260                )
261            })?;
262        }
263
264        // Update primary output if needed
265        if process.primary_output != primary_output {
266            // Safe: There should only be one ref to process
267            Rc::get_mut(process).unwrap().primary_output = primary_output;
268        }
269    }
270
271    Ok(())
272}
273
274/// Infer the primary output.
275///
276/// This is only possible if there is only one output flow for the process.
277fn infer_primary_output(map: &IndexMap<CommodityID, ProcessFlow>) -> Result<Option<CommodityID>> {
278    let mut iter = map.iter().filter_map(|(commodity_id, flow)| {
279        (flow.direction() == FlowDirection::Output).then_some(commodity_id)
280    });
281
282    let Some(first_output) = iter.next() else {
283        // If there are only input flows, then the primary output should be None
284        return Ok(None);
285    };
286
287    ensure!(
288        iter.next().is_none(),
289        "Need to specify primary_output explicitly if there are multiple output flows"
290    );
291
292    Ok(Some(first_output.clone()))
293}
294
295/// Check the flows are correct for the specified primary output (or lack thereof)
296fn check_flows_primary_output(
297    flows_map: &IndexMap<CommodityID, ProcessFlow>,
298    primary_output: Option<&CommodityID>,
299) -> Result<()> {
300    if let Some(primary_output) = primary_output {
301        let flow = flows_map.get(primary_output).with_context(|| {
302            format!("Primary output commodity '{primary_output}' isn't a process flow")
303        })?;
304
305        ensure!(
306            flow.direction() == FlowDirection::Output,
307            "Primary output commodity '{primary_output}' isn't an output flow",
308        );
309    } else {
310        ensure!(
311            flows_map
312                .values()
313                .all(|x| x.direction() == FlowDirection::Input
314                    || x.direction() == FlowDirection::Zero),
315            "First year is only inputs, but subsequent years have outputs, although no primary \
316            output is specified"
317        );
318    }
319
320    Ok(())
321}
322
323/// Checks that non-primary io are defined for all milestone years, at least, (within a region) and
324/// that they are only inputs or only outputs in all years.
325fn validate_secondary_flows(
326    processes: &mut ProcessMap,
327    flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
328    milestone_years: &[u32],
329) -> Result<()> {
330    for (process_id, process) in processes.iter() {
331        // Get the flows for this process - there should be no error, as was checked already
332        let map = flows_map
333            .get(process_id)
334            .with_context(|| format!("Missing flows map for process {process_id}"))?;
335
336        // Flows are required for all milestone years within the process years of activity
337        let required_years: Vec<&u32> = milestone_years
338            .iter()
339            .filter(|&y| process.years.contains(y))
340            .collect();
341
342        // Get the non-primary io flows for all years, if any, arranged by (commodity, region)
343        let iter = iproduct!(process.years.clone(), process.regions.iter());
344        let mut flows: HashMap<(CommodityID, RegionID), Vec<&ProcessFlow>> = HashMap::new();
345        let mut number_of_years: HashMap<(CommodityID, RegionID), u32> = HashMap::new();
346        for (year, region_id) in iter {
347            if let Some(commodity_map) = map.get(&(region_id.clone(), year)) {
348                let flow = commodity_map.iter().filter_map(|(commodity_id, flow)| {
349                    (Some(commodity_id) != process.primary_output.as_ref())
350                        .then_some(((commodity_id.clone(), region_id.clone()), flow))
351                });
352
353                for (key, value) in flow {
354                    flows.entry(key.clone()).or_default().push(value);
355                    if required_years.contains(&&year) {
356                        *number_of_years.entry(key).or_default() += 1;
357                    }
358                }
359            }
360        }
361
362        // Finally we check that the flows for a given commodity and region are defined for all
363        // milestone years and that they are all inputs or all outputs. This later check is done
364        // for all years in the process range, required or not.
365        for ((commodity_id, region_id), value) in &flows {
366            ensure!(
367                number_of_years[&(commodity_id.clone(), region_id.clone())]
368                    == required_years.len().try_into().unwrap(),
369                "Flow of commodity {commodity_id} in region {region_id} for process {process_id} \
370                does not cover all milestone years within the process range of activity."
371            );
372            let input_or_zero = value
373                .iter()
374                .all(|&x| [FlowDirection::Input, FlowDirection::Zero].contains(&x.direction()));
375            let output_or_zero = value
376                .iter()
377                .all(|&x| [FlowDirection::Output, FlowDirection::Zero].contains(&x.direction()));
378            ensure!(
379                input_or_zero || output_or_zero,
380                "Flow of commodity {commodity_id} in region {region_id} for process {process_id} \
381                behaves as input or output in different years."
382            );
383        }
384    }
385
386    Ok(())
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use crate::commodity::Commodity;
393    use crate::commodity::{CommodityLevyMap, DemandMap, PricingStrategy};
394    use crate::fixture::{
395        assert_error, assert_validate_fails_with_simple, assert_validate_ok_simple,
396        other_commodity, process, sed_commodity, svd_commodity,
397    };
398    use crate::patch::FilePatch;
399    use crate::process::{FlowType, Process, ProcessFlow, ProcessMap};
400    use crate::time_slice::TimeSliceLevel;
401    use crate::units::{FlowPerActivity, MoneyPerFlow};
402    use indexmap::IndexMap;
403    use itertools::Itertools;
404    use map_macro::hash_map;
405    use rstest::{fixture, rstest};
406    use std::iter;
407    use std::rc::Rc;
408
409    fn flow(commodity: Rc<Commodity>, coeff: f64) -> ProcessFlow {
410        ProcessFlow {
411            commodity,
412            coeff: FlowPerActivity(coeff),
413            kind: FlowType::Fixed,
414            cost: MoneyPerFlow(0.0),
415        }
416    }
417
418    fn build_maps<I>(
419        process: Process,
420        flows: I,
421        years: Option<Vec<u32>>,
422    ) -> (ProcessMap, HashMap<ProcessID, ProcessFlowsMap>)
423    where
424        I: Clone + Iterator<Item = (CommodityID, ProcessFlow)>,
425    {
426        let years = years.unwrap_or(process.years.clone().collect());
427        let map: Rc<IndexMap<_, _>> = Rc::new(flows.collect());
428        let flows_inner = iproduct!(&process.regions, years)
429            .map(|(region_id, year)| ((region_id.clone(), year), map.clone()))
430            .collect();
431        let flows = hash_map! {process.id.clone() => flows_inner};
432        let processes = iter::once((process.id.clone(), process.into())).collect();
433
434        (processes, flows)
435    }
436
437    #[fixture]
438    pub fn sed_commodity_pj() -> Commodity {
439        Commodity {
440            id: "sed_pj".into(),
441            description: "Test SED commodity (PJ)".into(),
442            kind: CommodityType::SupplyEqualsDemand,
443            time_slice_level: TimeSliceLevel::DayNight,
444            pricing_strategy: PricingStrategy::Shadow,
445            levies_prod: CommodityLevyMap::new(),
446            levies_cons: CommodityLevyMap::new(),
447            demand: DemandMap::new(),
448            units: "PJ".into(),
449        }
450    }
451
452    #[fixture]
453    pub fn sed_commodity_tonnes() -> Commodity {
454        Commodity {
455            id: "sed_tonnes".into(),
456            description: "Test SED commodity (tonnes)".into(),
457            kind: CommodityType::SupplyEqualsDemand,
458            time_slice_level: TimeSliceLevel::DayNight,
459            pricing_strategy: PricingStrategy::Shadow,
460            levies_prod: CommodityLevyMap::new(),
461            levies_cons: CommodityLevyMap::new(),
462            demand: DemandMap::new(),
463            units: "tonnes".into(),
464        }
465    }
466
467    #[rstest]
468    fn output_flows_matching_units(
469        svd_commodity: Commodity,
470        sed_commodity: Commodity,
471        process: Process,
472    ) {
473        // Both commodities have the same units
474        assert_eq!(svd_commodity.units, sed_commodity.units);
475
476        let commodity1 = Rc::new(svd_commodity);
477        let commodity2 = Rc::new(sed_commodity);
478        let (_, flows_map) = build_maps(
479            process,
480            [
481                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
482                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
483            ]
484            .into_iter(),
485            None,
486        );
487
488        // Validation should pass since the units are the same
489        validate_output_flows_units(&flows_map).unwrap();
490    }
491
492    #[rstest]
493    fn output_flows_mismatched_units(
494        sed_commodity_pj: Commodity,
495        sed_commodity_tonnes: Commodity,
496        process: Process,
497    ) {
498        // Ensure the two commodities have different units
499        assert_ne!(sed_commodity_pj.units, sed_commodity_tonnes.units);
500
501        let commodity1 = Rc::new(sed_commodity_pj);
502        let commodity2 = Rc::new(sed_commodity_tonnes);
503        let (_, flows_map) = build_maps(
504            process,
505            [
506                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
507                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
508            ]
509            .into_iter(),
510            None,
511        );
512
513        // Different units should cause validation to fail
514        let result = validate_output_flows_units(&flows_map);
515        // Note that the error contents are sorted so we should deterministically get
516        // this exact error message.
517        assert_error!(
518            result,
519            "Process process1 has SED/SVD outputs with different units: [PJ, tonnes] in region: GBR and year: 2010"
520        );
521    }
522
523    #[rstest]
524    fn output_flows_other_commodity_ignored(
525        sed_commodity_pj: Commodity,
526        other_commodity: Commodity,
527        process: Process,
528    ) {
529        // Modify OTH commodity to have different units
530        let mut other_commodity = other_commodity;
531        other_commodity.units = "tonnes".into();
532        assert_ne!(sed_commodity_pj.units, other_commodity.units);
533
534        let sed_commodity = Rc::new(sed_commodity_pj);
535        let oth_commodity = Rc::new(other_commodity);
536
537        let (_, flows_map) = build_maps(
538            process,
539            [
540                (sed_commodity.id.clone(), flow(sed_commodity.clone(), 1.0)),
541                (oth_commodity.id.clone(), flow(oth_commodity.clone(), 2.0)),
542            ]
543            .into_iter(),
544            None,
545        );
546
547        // OTH commodity should be ignored, validation should pass
548        validate_output_flows_units(&flows_map).unwrap();
549    }
550
551    #[rstest]
552    fn single_sed_svd_output(svd_commodity: Commodity, process: Process) {
553        let commodity = Rc::new(svd_commodity);
554        let (_, flows_map) = build_maps(
555            process,
556            std::iter::once((commodity.id.clone(), flow(commodity.clone(), 1.0))),
557            None,
558        );
559
560        // Single output should always pass validation
561        validate_output_flows_units(&flows_map).unwrap();
562    }
563
564    #[rstest]
565    fn no_sed_svd_outputs(other_commodity: Commodity, process: Process) {
566        let oth_commodity_1 = Rc::new(other_commodity.clone());
567        let oth_commodity_2 = Rc::new(other_commodity.clone());
568        let (_, flows_map) = build_maps(
569            process,
570            [
571                (CommodityID("oth1".into()), flow(oth_commodity_1, 1.0)),
572                (CommodityID("oth2".into()), flow(oth_commodity_2, 2.0)),
573            ]
574            .into_iter(),
575            None,
576        );
577
578        // Processes with only OTH outputs should pass validation
579        validate_output_flows_units(&flows_map).unwrap();
580    }
581
582    #[rstest]
583    fn sed_svd_inputs_different_units_ignored(
584        sed_commodity_pj: Commodity,
585        sed_commodity_tonnes: Commodity,
586        svd_commodity: Commodity,
587        process: Process,
588    ) {
589        // Ensure input commodities have different units
590        assert_ne!(sed_commodity_pj.units, sed_commodity_tonnes.units);
591
592        // Output commodity shares units with one input
593        assert_eq!(svd_commodity.units, sed_commodity_pj.units);
594
595        let input1 = Rc::new(sed_commodity_pj);
596        let input2 = Rc::new(sed_commodity_tonnes);
597        let output = Rc::new(svd_commodity);
598
599        let (_, flows_map) = build_maps(
600            process,
601            [
602                // Two inputs with different units (negative coefficients)
603                (input1.id.clone(), flow(input1.clone(), -1.0)),
604                (input2.id.clone(), flow(input2.clone(), -2.0)),
605                // Single output (positive coefficient)
606                (output.id.clone(), flow(output.clone(), 3.0)),
607            ]
608            .into_iter(),
609            None,
610        );
611
612        // Validation should pass because only outputs are checked
613        validate_output_flows_units(&flows_map).unwrap();
614    }
615
616    #[rstest]
617    fn single_output_infer_primary(#[from(svd_commodity)] commodity: Commodity, process: Process) {
618        let milestone_years = vec![2010, 2020];
619        let commodity = Rc::new(commodity);
620        let (mut processes, flows_map) = build_maps(
621            process,
622            std::iter::once((commodity.id.clone(), flow(commodity.clone(), 1.0))),
623            None,
624        );
625        validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
626            .unwrap();
627        assert_eq!(
628            processes.values().exactly_one().unwrap().primary_output,
629            Some(commodity.id.clone())
630        );
631    }
632
633    #[rstest]
634    fn multiple_outputs_error(
635        #[from(svd_commodity)] commodity1: Commodity,
636        #[from(sed_commodity)] commodity2: Commodity,
637        process: Process,
638    ) {
639        let milestone_years: Vec<u32> = vec![2010, 2020];
640        let commodity1 = Rc::new(commodity1);
641        let commodity2 = Rc::new(commodity2);
642        let (mut processes, flows_map) = build_maps(
643            process,
644            [
645                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
646                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
647            ]
648            .into_iter(),
649            None,
650        );
651        let res =
652            validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years);
653        assert_error!(res, "Could not infer primary_output for process process1");
654    }
655
656    #[rstest]
657    fn explicit_primary_output(
658        #[from(svd_commodity)] commodity1: Commodity,
659        #[from(sed_commodity)] commodity2: Commodity,
660        process: Process,
661    ) {
662        let milestone_years = vec![2010, 2020];
663        let commodity1 = Rc::new(commodity1);
664        let commodity2 = Rc::new(commodity2);
665        let mut process = process;
666        process.primary_output = Some(commodity2.id.clone());
667        let (mut processes, flows_map) = build_maps(
668            process,
669            [
670                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
671                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
672            ]
673            .into_iter(),
674            None,
675        );
676        validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
677            .unwrap();
678        assert_eq!(
679            processes.values().exactly_one().unwrap().primary_output,
680            Some(commodity2.id.clone())
681        );
682    }
683
684    #[rstest]
685    fn all_inputs_no_primary(
686        #[from(svd_commodity)] commodity1: Commodity,
687        #[from(sed_commodity)] commodity2: Commodity,
688        process: Process,
689    ) {
690        let milestone_years = vec![2010, 2020];
691        let commodity1 = Rc::new(commodity1);
692        let commodity2 = Rc::new(commodity2);
693        let (mut processes, flows_map) = build_maps(
694            process,
695            [
696                (commodity1.id.clone(), flow(commodity1.clone(), -1.0)),
697                (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
698            ]
699            .into_iter(),
700            None,
701        );
702        validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
703            .unwrap();
704        assert_eq!(
705            processes.values().exactly_one().unwrap().primary_output,
706            None
707        );
708    }
709
710    #[rstest]
711    fn flows_not_in_all_milestone_years(
712        #[from(svd_commodity)] commodity1: Commodity,
713        #[from(sed_commodity)] commodity2: Commodity,
714        process: Process,
715    ) {
716        let milestone_years = vec![2010, 2015, 2020];
717        let flow_years = vec![2010, 2020];
718        let commodity1 = Rc::new(commodity1);
719        let commodity2 = Rc::new(commodity2);
720        let (mut processes, flows_map) = build_maps(
721            process,
722            [
723                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
724                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
725            ]
726            .into_iter(),
727            Some(flow_years),
728        );
729        let res =
730            validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years);
731        assert_error!(
732            res,
733            "Flows map for process process1 does not cover all regions and required years"
734        );
735    }
736
737    #[rstest]
738    fn flows_only_milestone_years(
739        #[from(svd_commodity)] commodity1: Commodity,
740        #[from(sed_commodity)] commodity2: Commodity,
741        process: Process,
742    ) {
743        let milestone_years = vec![2010, 2015, 2020];
744        let commodity1 = Rc::new(commodity1);
745        let commodity2 = Rc::new(commodity2);
746        let (mut processes, flows_map) = build_maps(
747            process,
748            [
749                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
750                (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
751            ]
752            .into_iter(),
753            Some(milestone_years.clone()),
754        );
755        validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
756            .unwrap();
757    }
758
759    #[test]
760    fn flows_different_direction_different_years() {
761        let patch = FilePatch::new("process_flows.csv")
762            .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
763            .with_addition("GASPRC,GASPRD,all,2020;2030,-1.05,fixed,")
764            .with_addition("GASPRC,GASPRD,all,2040,1.05,fixed,");
765        assert_validate_fails_with_simple!(
766            vec![patch],
767            "Flow of commodity GASPRD in region GBR for process GASPRC behaves as input or output in different years."
768        );
769    }
770
771    #[test]
772    fn missing_flow() {
773        let patch = FilePatch::new("process_flows.csv")
774            .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
775            .with_addition("GASPRC,GASPRD,all,2020;2030,-1.05,fixed,");
776        assert_validate_fails_with_simple!(
777            vec![patch],
778            "Flow of commodity GASPRD in region GBR for process GASPRC does not cover all milestone years within the process range of activity."
779        );
780    }
781
782    #[test]
783    fn coeff_zero() {
784        let patch = FilePatch::new("process_flows.csv")
785            .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
786            .with_addition("GASPRC,GASPRD,all,2020;2030,-1.05,fixed,")
787            .with_addition("GASPRC,GASPRD,all,2040,0,fixed,");
788        assert_validate_ok_simple!(vec![patch]);
789    }
790
791    #[test]
792    fn flows_not_needed_before_time_horizon() {
793        // NB: Time horizon starts at 2020 for simple example
794        //
795        // Flows are only needed for milestone years. Check that users can omit them for
796        // non-milestone years.
797        let patches = vec![
798            FilePatch::new("processes.csv")
799                .with_deletion("GASDRV,Dry gas extraction,all,GASPRD,2020,2040,1.0,")
800                .with_addition("GASDRV,Dry gas extraction,all,GASPRD,1980,2040,1.0,"),
801            FilePatch::new("process_flows.csv")
802                .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
803                .with_addition("GASPRC,GASPRD,all,2020;2030;2040,-1.05,fixed,"),
804        ];
805        assert_validate_ok_simple!(patches);
806    }
807}