muse2/input/process/
flow.rs

1//! Code for reading process flows from a CSV file.
2use super::super::{input_err_msg, read_csv};
3use crate::commodity::{CommodityID, CommodityMap, CommodityType};
4use crate::process::{
5    FlowDirection, FlowType, ProcessFlow, ProcessFlowsMap, ProcessID, ProcessMap,
6};
7use crate::region::{RegionID, parse_region_str};
8use crate::units::{FlowPerActivity, MoneyPerFlow};
9use crate::year::parse_year_str;
10use anyhow::{Context, Result, bail, ensure};
11use indexmap::{IndexMap, IndexSet};
12use itertools::iproduct;
13use serde::Deserialize;
14use std::collections::HashMap;
15use std::path::Path;
16use std::rc::Rc;
17
18const PROCESS_FLOWS_FILE_NAME: &str = "process_flows.csv";
19
20#[derive(PartialEq, Debug, Deserialize)]
21struct ProcessFlowRaw {
22    process_id: String,
23    commodity_id: String,
24    commission_years: String,
25    regions: String,
26    coeff: FlowPerActivity,
27    #[serde(default)]
28    #[serde(rename = "type")]
29    kind: FlowType,
30    cost: Option<MoneyPerFlow>,
31}
32
33impl ProcessFlowRaw {
34    fn validate(&self) -> Result<()> {
35        // Check that flow is not infinity or nan.
36        ensure!(
37            self.coeff.is_finite(),
38            "Invalid value for coeff ({})",
39            self.coeff
40        );
41
42        // **TODO**: https://github.com/EnergySystemsModellingLab/MUSE2/issues/300
43        ensure!(
44            self.kind == FlowType::Fixed,
45            "Commodity flexible assets are not currently supported"
46        );
47
48        // Check that flow cost is non-negative
49        if let Some(cost) = self.cost {
50            ensure!(
51                (cost.value() >= 0.0),
52                "Invalid value for flow cost ({cost}). Must be >=0."
53            );
54        }
55
56        Ok(())
57    }
58}
59
60/// Read process flows from a CSV file.
61///
62/// # Arguments
63///
64/// * `model_dir` - Folder containing model configuration files
65/// * `processes` - Mutable map of known processes (may be updated)
66/// * `commodities` - Map of known commodities
67/// * `milestone_years` - Milestone years used by the model
68///
69/// # Returns
70///
71/// A `HashMap<ProcessID, ProcessFlowsMap>` mapping process IDs to their flows.
72pub fn read_process_flows(
73    model_dir: &Path,
74    processes: &mut ProcessMap,
75    commodities: &CommodityMap,
76    milestone_years: &[u32],
77) -> Result<HashMap<ProcessID, ProcessFlowsMap>> {
78    let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
79    let process_flow_csv = read_csv(&file_path)?;
80    read_process_flows_from_iter(process_flow_csv, processes, commodities, milestone_years)
81        .with_context(|| input_err_msg(&file_path))
82}
83
84/// Validates that all SED/SVD outputs from each process have consistent units.
85///
86/// For processes with multiple SED/SVD outputs, the annual fixed costs are distributed
87/// proportionally based on flow coefficients. This only makes sense if all outputs share
88/// the same units.
89fn validate_output_flows_units(flows_map: &HashMap<ProcessID, ProcessFlowsMap>) -> Result<()> {
90    // Collect all validation errors so that the error reported is deterministic,
91    // this is needed because ProcessFlows are stored in a HashMap.
92    let mut errors: Vec<(ProcessID, RegionID, u32, Vec<&str>)> = Vec::new();
93
94    for (process_id, process_flows) in flows_map {
95        for ((region_id, year), flows) in process_flows {
96            let sed_svd_output_units: IndexSet<&str> = flows
97                .values()
98                .filter_map(|flow| {
99                    let commodity = &flow.commodity;
100                    (flow.coeff.value() > 0.0
101                        && matches!(
102                            commodity.kind,
103                            CommodityType::ServiceDemand | CommodityType::SupplyEqualsDemand
104                        ))
105                    .then_some(commodity.units.as_str())
106                })
107                .collect();
108
109            // Record error if validation fails
110            if sed_svd_output_units.len() > 1 {
111                errors.push((
112                    process_id.clone(),
113                    region_id.clone(),
114                    *year,
115                    sed_svd_output_units.into_iter().collect(),
116                ));
117            }
118        }
119    }
120
121    // Sort errors for deterministic ordering
122    errors.sort_by_key(|(process_id, region_id, year, _)| {
123        (process_id.clone(), region_id.clone(), *year)
124    });
125
126    // Return first error if any exist
127    if let Some((process_id, region_id, year, units)) = errors.first() {
128        bail!(
129            "Process {process_id} has SED/SVD outputs with different units: [{}] \
130             in region: {region_id} and year: {year}",
131            units.join(", ")
132        );
133    }
134
135    Ok(())
136}
137
138/// Read `ProcessFlowRaw` records from an iterator and convert them into `ProcessFlow` records.
139///
140/// # Arguments
141///
142/// * `iter` - Iterator over `ProcessFlowRaw` records
143/// * `processes` - Mutable map of known processes used for validation and updates
144/// * `commodities` - Map of known commodities
145/// * `milestone_years` - Milestone years used by the model
146///
147/// # Returns
148///
149/// A `HashMap<ProcessID, ProcessFlowsMap>` mapping process IDs to their flows.
150fn read_process_flows_from_iter<I>(
151    iter: I,
152    processes: &mut ProcessMap,
153    commodities: &CommodityMap,
154    milestone_years: &[u32],
155) -> Result<HashMap<ProcessID, ProcessFlowsMap>>
156where
157    I: Iterator<Item = ProcessFlowRaw>,
158{
159    let mut flows_map: HashMap<ProcessID, ProcessFlowsMap> = HashMap::new();
160    for record in iter {
161        record.validate()?;
162
163        // Get process
164        let (id, process) = processes
165            .get_key_value(record.process_id.as_str())
166            .with_context(|| format!("Process {} not found", record.process_id))?;
167
168        // Get regions
169        let process_regions = &process.regions;
170        let record_regions =
171            parse_region_str(&record.regions, process_regions).with_context(|| {
172                format!("Invalid region for process {id}. Valid regions are {process_regions:?}")
173            })?;
174
175        // Get years
176        let process_years: Vec<u32> = process.years.clone().collect();
177        let record_years =
178            parse_year_str(&record.commission_years, &process_years).with_context(|| {
179                format!("Invalid year for process {id}. Valid years are {process_years:?}")
180            })?;
181
182        // Get commodity
183        let commodity = commodities
184            .get(record.commodity_id.as_str())
185            .with_context(|| format!("{} is not a valid commodity ID", &record.commodity_id))?;
186
187        // Create ProcessFlow object
188        let process_flow = ProcessFlow {
189            commodity: Rc::clone(commodity),
190            coeff: record.coeff,
191            kind: FlowType::Fixed,
192            cost: record.cost.unwrap_or(MoneyPerFlow(0.0)),
193        };
194
195        // Insert flow into the map
196        let region_year_map = flows_map.entry(id.clone()).or_default();
197        for (year, region_id) in iproduct!(record_years, record_regions.iter()) {
198            let flows_map = region_year_map
199                .entry((region_id.clone(), year))
200                .or_default();
201            let existing = Rc::get_mut(flows_map)
202                .unwrap() // safe: there will only be one copy
203                .insert(commodity.id.clone(), process_flow.clone())
204                .is_some();
205            ensure!(
206                !existing,
207                "Duplicate process flow entry for region {}, year {} and commodity {}",
208                region_id,
209                year,
210                commodity.id
211            );
212        }
213    }
214
215    validate_flows_and_update_primary_output(processes, &flows_map, milestone_years)?;
216    validate_secondary_flows(processes, &flows_map, milestone_years)?;
217    validate_output_flows_units(&flows_map)?;
218
219    Ok(flows_map)
220}
221
222fn validate_flows_and_update_primary_output(
223    processes: &mut ProcessMap,
224    flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
225    milestone_years: &[u32],
226) -> Result<()> {
227    for (process_id, process) in processes.iter_mut() {
228        let map = flows_map
229            .get(process_id)
230            .with_context(|| format!("Missing flows map for process {process_id}"))?;
231
232        // Flows are required for all milestone years within the process years of activity
233        let required_years = milestone_years
234            .iter()
235            .filter(|&y| process.years.contains(y));
236        let region_year: Vec<(&RegionID, &u32)> =
237            iproduct!(process.regions.iter(), required_years).collect();
238
239        ensure!(
240            region_year
241                .iter()
242                .all(|(region_id, year)| map.contains_key(&((*region_id).clone(), **year))),
243            "Flows map for process {process_id} does not cover all regions and required years"
244        );
245
246        let primary_output = if let Some(primary_output) = &process.primary_output {
247            Some(primary_output.clone())
248        } else {
249            let (region_id, year) = region_year[0];
250            infer_primary_output(&map[&(region_id.clone(), *year)]).with_context(|| {
251                format!("Could not infer primary_output for process {process_id}")
252            })?
253        };
254
255        for (region_id, &year) in region_year {
256            let flows = &map[&(region_id.clone(), year)];
257
258            // Check that the process has flows for this region/year
259            check_flows_primary_output(flows, primary_output.as_ref()).with_context(|| {
260                format!(
261                    "Invalid primary output configuration for process {process_id} \
262                    (region: {region_id}, year: {year})"
263                )
264            })?;
265        }
266
267        // Update primary output if needed
268        if process.primary_output != primary_output {
269            // Safe: There should only be one ref to process
270            Rc::get_mut(process).unwrap().primary_output = primary_output;
271        }
272    }
273
274    Ok(())
275}
276
277/// Infer the primary output.
278///
279/// This is only possible if there is only one output flow for the process.
280fn infer_primary_output(map: &IndexMap<CommodityID, ProcessFlow>) -> Result<Option<CommodityID>> {
281    let mut iter = map.iter().filter_map(|(commodity_id, flow)| {
282        (flow.direction() == FlowDirection::Output).then_some(commodity_id)
283    });
284
285    let Some(first_output) = iter.next() else {
286        // If there are only input flows, then the primary output should be None
287        return Ok(None);
288    };
289
290    ensure!(
291        iter.next().is_none(),
292        "Need to specify primary_output explicitly if there are multiple output flows"
293    );
294
295    Ok(Some(first_output.clone()))
296}
297
298/// Check the flows are correct for the specified primary output (or lack thereof)
299fn check_flows_primary_output(
300    flows_map: &IndexMap<CommodityID, ProcessFlow>,
301    primary_output: Option<&CommodityID>,
302) -> Result<()> {
303    if let Some(primary_output) = primary_output {
304        let flow = flows_map.get(primary_output).with_context(|| {
305            format!("Primary output commodity '{primary_output}' isn't a process flow")
306        })?;
307
308        ensure!(
309            flow.direction() == FlowDirection::Output,
310            "Primary output commodity '{primary_output}' isn't an output flow",
311        );
312    } else {
313        ensure!(
314            flows_map
315                .values()
316                .all(|x| x.direction() == FlowDirection::Input
317                    || x.direction() == FlowDirection::Zero),
318            "First year is only inputs, but subsequent years have outputs, although no primary \
319            output is specified"
320        );
321    }
322
323    Ok(())
324}
325
326/// Checks that non-primary io are defined for all milestone years, at least, (within a region) and
327/// that they are only inputs or only outputs in all years.
328fn validate_secondary_flows(
329    processes: &mut ProcessMap,
330    flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
331    milestone_years: &[u32],
332) -> Result<()> {
333    for (process_id, process) in processes.iter() {
334        // Get the flows for this process - there should be no error, as was checked already
335        let map = flows_map
336            .get(process_id)
337            .with_context(|| format!("Missing flows map for process {process_id}"))?;
338
339        // Flows are required for all milestone years within the process years of activity
340        let required_years: Vec<&u32> = milestone_years
341            .iter()
342            .filter(|&y| process.years.contains(y))
343            .collect();
344
345        // Get the non-primary io flows for all years, if any, arranged by (commodity, region)
346        let iter = iproduct!(process.years.clone(), process.regions.iter());
347        let mut flows: HashMap<(CommodityID, RegionID), Vec<&ProcessFlow>> = HashMap::new();
348        let mut number_of_years: HashMap<(CommodityID, RegionID), u32> = HashMap::new();
349        for (year, region_id) in iter {
350            if let Some(commodity_map) = map.get(&(region_id.clone(), year)) {
351                let flow = commodity_map.iter().filter_map(|(commodity_id, flow)| {
352                    (Some(commodity_id) != process.primary_output.as_ref())
353                        .then_some(((commodity_id.clone(), region_id.clone()), flow))
354                });
355
356                for (key, value) in flow {
357                    flows.entry(key.clone()).or_default().push(value);
358                    if required_years.contains(&&year) {
359                        *number_of_years.entry(key).or_default() += 1;
360                    }
361                }
362            }
363        }
364
365        // Finally we check that the flows for a given commodity and region are defined for all
366        // milestone years and that they are all inputs or all outputs. This later check is done
367        // for all years in the process range, required or not.
368        for ((commodity_id, region_id), value) in &flows {
369            ensure!(
370                number_of_years[&(commodity_id.clone(), region_id.clone())]
371                    == required_years.len().try_into().unwrap(),
372                "Flow of commodity {commodity_id} in region {region_id} for process {process_id} \
373                does not cover all milestone years within the process range of activity."
374            );
375            let input_or_zero = value
376                .iter()
377                .all(|&x| [FlowDirection::Input, FlowDirection::Zero].contains(&x.direction()));
378            let output_or_zero = value
379                .iter()
380                .all(|&x| [FlowDirection::Output, FlowDirection::Zero].contains(&x.direction()));
381            ensure!(
382                input_or_zero || output_or_zero,
383                "Flow of commodity {commodity_id} in region {region_id} for process {process_id} \
384                behaves as input or output in different years."
385            );
386        }
387    }
388
389    Ok(())
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395    use crate::commodity::Commodity;
396    use crate::commodity::{CommodityLevyMap, DemandMap, PricingStrategy};
397    use crate::fixture::{
398        assert_error, assert_validate_fails_with_simple, assert_validate_ok_simple,
399        other_commodity, process, sed_commodity, svd_commodity,
400    };
401    use crate::patch::FilePatch;
402    use crate::process::{FlowType, Process, ProcessFlow, ProcessMap};
403    use crate::time_slice::TimeSliceLevel;
404    use crate::units::{FlowPerActivity, MoneyPerFlow};
405    use indexmap::IndexMap;
406    use itertools::Itertools;
407    use map_macro::hash_map;
408    use rstest::{fixture, rstest};
409    use std::iter;
410    use std::rc::Rc;
411
412    fn flow(commodity: Rc<Commodity>, coeff: f64) -> ProcessFlow {
413        ProcessFlow {
414            commodity,
415            coeff: FlowPerActivity(coeff),
416            kind: FlowType::Fixed,
417            cost: MoneyPerFlow(0.0),
418        }
419    }
420
421    fn build_maps<I>(
422        process: Process,
423        flows: I,
424        years: Option<Vec<u32>>,
425    ) -> (ProcessMap, HashMap<ProcessID, ProcessFlowsMap>)
426    where
427        I: Clone + Iterator<Item = (CommodityID, ProcessFlow)>,
428    {
429        let years = years.unwrap_or(process.years.clone().collect());
430        let map: Rc<IndexMap<_, _>> = Rc::new(flows.collect());
431        let flows_inner = iproduct!(&process.regions, years)
432            .map(|(region_id, year)| ((region_id.clone(), year), map.clone()))
433            .collect();
434        let flows = hash_map! {process.id.clone() => flows_inner};
435        let processes = iter::once((process.id.clone(), process.into())).collect();
436
437        (processes, flows)
438    }
439
440    #[fixture]
441    pub fn sed_commodity_pj() -> Commodity {
442        Commodity {
443            id: "sed_pj".into(),
444            description: "Test SED commodity (PJ)".into(),
445            kind: CommodityType::SupplyEqualsDemand,
446            time_slice_level: TimeSliceLevel::DayNight,
447            pricing_strategy: PricingStrategy::Shadow,
448            levies_prod: CommodityLevyMap::new(),
449            levies_cons: CommodityLevyMap::new(),
450            demand: DemandMap::new(),
451            units: "PJ".into(),
452        }
453    }
454
455    #[fixture]
456    pub fn sed_commodity_tonnes() -> Commodity {
457        Commodity {
458            id: "sed_tonnes".into(),
459            description: "Test SED commodity (tonnes)".into(),
460            kind: CommodityType::SupplyEqualsDemand,
461            time_slice_level: TimeSliceLevel::DayNight,
462            pricing_strategy: PricingStrategy::Shadow,
463            levies_prod: CommodityLevyMap::new(),
464            levies_cons: CommodityLevyMap::new(),
465            demand: DemandMap::new(),
466            units: "tonnes".into(),
467        }
468    }
469
470    #[rstest]
471    fn output_flows_matching_units(
472        svd_commodity: Commodity,
473        sed_commodity: Commodity,
474        process: Process,
475    ) {
476        // Both commodities have the same units
477        assert_eq!(svd_commodity.units, sed_commodity.units);
478
479        let commodity1 = Rc::new(svd_commodity);
480        let commodity2 = Rc::new(sed_commodity);
481        let (_, flows_map) = build_maps(
482            process,
483            [
484                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
485                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
486            ]
487            .into_iter(),
488            None,
489        );
490
491        // Validation should pass since the units are the same
492        validate_output_flows_units(&flows_map).unwrap();
493    }
494
495    #[rstest]
496    fn output_flows_mismatched_units(
497        sed_commodity_pj: Commodity,
498        sed_commodity_tonnes: Commodity,
499        process: Process,
500    ) {
501        // Ensure the two commodities have different units
502        assert_ne!(sed_commodity_pj.units, sed_commodity_tonnes.units);
503
504        let commodity1 = Rc::new(sed_commodity_pj);
505        let commodity2 = Rc::new(sed_commodity_tonnes);
506        let (_, flows_map) = build_maps(
507            process,
508            [
509                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
510                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
511            ]
512            .into_iter(),
513            None,
514        );
515
516        // Different units should cause validation to fail
517        let result = validate_output_flows_units(&flows_map);
518        // Note that the error contents are sorted so we should deterministically get
519        // this exact error message.
520        assert_error!(
521            result,
522            "Process process1 has SED/SVD outputs with different units: [PJ, tonnes] in region: GBR and year: 2010"
523        );
524    }
525
526    #[rstest]
527    fn output_flows_other_commodity_ignored(
528        sed_commodity_pj: Commodity,
529        other_commodity: Commodity,
530        process: Process,
531    ) {
532        // Modify OTH commodity to have different units
533        let mut other_commodity = other_commodity;
534        other_commodity.units = "tonnes".into();
535        assert_ne!(sed_commodity_pj.units, other_commodity.units);
536
537        let sed_commodity = Rc::new(sed_commodity_pj);
538        let oth_commodity = Rc::new(other_commodity);
539
540        let (_, flows_map) = build_maps(
541            process,
542            [
543                (sed_commodity.id.clone(), flow(sed_commodity.clone(), 1.0)),
544                (oth_commodity.id.clone(), flow(oth_commodity.clone(), 2.0)),
545            ]
546            .into_iter(),
547            None,
548        );
549
550        // OTH commodity should be ignored, validation should pass
551        validate_output_flows_units(&flows_map).unwrap();
552    }
553
554    #[rstest]
555    fn single_sed_svd_output(svd_commodity: Commodity, process: Process) {
556        let commodity = Rc::new(svd_commodity);
557        let (_, flows_map) = build_maps(
558            process,
559            std::iter::once((commodity.id.clone(), flow(commodity.clone(), 1.0))),
560            None,
561        );
562
563        // Single output should always pass validation
564        validate_output_flows_units(&flows_map).unwrap();
565    }
566
567    #[rstest]
568    fn no_sed_svd_outputs(other_commodity: Commodity, process: Process) {
569        let oth_commodity_1 = Rc::new(other_commodity.clone());
570        let oth_commodity_2 = Rc::new(other_commodity.clone());
571        let (_, flows_map) = build_maps(
572            process,
573            [
574                (CommodityID("oth1".into()), flow(oth_commodity_1, 1.0)),
575                (CommodityID("oth2".into()), flow(oth_commodity_2, 2.0)),
576            ]
577            .into_iter(),
578            None,
579        );
580
581        // Processes with only OTH outputs should pass validation
582        validate_output_flows_units(&flows_map).unwrap();
583    }
584
585    #[rstest]
586    fn sed_svd_inputs_different_units_ignored(
587        sed_commodity_pj: Commodity,
588        sed_commodity_tonnes: Commodity,
589        svd_commodity: Commodity,
590        process: Process,
591    ) {
592        // Ensure input commodities have different units
593        assert_ne!(sed_commodity_pj.units, sed_commodity_tonnes.units);
594
595        // Output commodity shares units with one input
596        assert_eq!(svd_commodity.units, sed_commodity_pj.units);
597
598        let input1 = Rc::new(sed_commodity_pj);
599        let input2 = Rc::new(sed_commodity_tonnes);
600        let output = Rc::new(svd_commodity);
601
602        let (_, flows_map) = build_maps(
603            process,
604            [
605                // Two inputs with different units (negative coefficients)
606                (input1.id.clone(), flow(input1.clone(), -1.0)),
607                (input2.id.clone(), flow(input2.clone(), -2.0)),
608                // Single output (positive coefficient)
609                (output.id.clone(), flow(output.clone(), 3.0)),
610            ]
611            .into_iter(),
612            None,
613        );
614
615        // Validation should pass because only outputs are checked
616        validate_output_flows_units(&flows_map).unwrap();
617    }
618
619    #[rstest]
620    fn single_output_infer_primary(#[from(svd_commodity)] commodity: Commodity, process: Process) {
621        let milestone_years = vec![2010, 2020];
622        let commodity = Rc::new(commodity);
623        let (mut processes, flows_map) = build_maps(
624            process,
625            std::iter::once((commodity.id.clone(), flow(commodity.clone(), 1.0))),
626            None,
627        );
628        validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
629            .unwrap();
630        assert_eq!(
631            processes.values().exactly_one().unwrap().primary_output,
632            Some(commodity.id.clone())
633        );
634    }
635
636    #[rstest]
637    fn multiple_outputs_error(
638        #[from(svd_commodity)] commodity1: Commodity,
639        #[from(sed_commodity)] commodity2: Commodity,
640        process: Process,
641    ) {
642        let milestone_years: Vec<u32> = vec![2010, 2020];
643        let commodity1 = Rc::new(commodity1);
644        let commodity2 = Rc::new(commodity2);
645        let (mut processes, flows_map) = build_maps(
646            process,
647            [
648                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
649                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
650            ]
651            .into_iter(),
652            None,
653        );
654        let res =
655            validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years);
656        assert_error!(res, "Could not infer primary_output for process process1");
657    }
658
659    #[rstest]
660    fn explicit_primary_output(
661        #[from(svd_commodity)] commodity1: Commodity,
662        #[from(sed_commodity)] commodity2: Commodity,
663        process: Process,
664    ) {
665        let milestone_years = vec![2010, 2020];
666        let commodity1 = Rc::new(commodity1);
667        let commodity2 = Rc::new(commodity2);
668        let mut process = process;
669        process.primary_output = Some(commodity2.id.clone());
670        let (mut processes, flows_map) = build_maps(
671            process,
672            [
673                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
674                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
675            ]
676            .into_iter(),
677            None,
678        );
679        validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
680            .unwrap();
681        assert_eq!(
682            processes.values().exactly_one().unwrap().primary_output,
683            Some(commodity2.id.clone())
684        );
685    }
686
687    #[rstest]
688    fn all_inputs_no_primary(
689        #[from(svd_commodity)] commodity1: Commodity,
690        #[from(sed_commodity)] commodity2: Commodity,
691        process: Process,
692    ) {
693        let milestone_years = vec![2010, 2020];
694        let commodity1 = Rc::new(commodity1);
695        let commodity2 = Rc::new(commodity2);
696        let (mut processes, flows_map) = build_maps(
697            process,
698            [
699                (commodity1.id.clone(), flow(commodity1.clone(), -1.0)),
700                (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
701            ]
702            .into_iter(),
703            None,
704        );
705        validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
706            .unwrap();
707        assert_eq!(
708            processes.values().exactly_one().unwrap().primary_output,
709            None
710        );
711    }
712
713    #[rstest]
714    fn flows_not_in_all_milestone_years(
715        #[from(svd_commodity)] commodity1: Commodity,
716        #[from(sed_commodity)] commodity2: Commodity,
717        process: Process,
718    ) {
719        let milestone_years = vec![2010, 2015, 2020];
720        let flow_years = vec![2010, 2020];
721        let commodity1 = Rc::new(commodity1);
722        let commodity2 = Rc::new(commodity2);
723        let (mut processes, flows_map) = build_maps(
724            process,
725            [
726                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
727                (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
728            ]
729            .into_iter(),
730            Some(flow_years),
731        );
732        let res =
733            validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years);
734        assert_error!(
735            res,
736            "Flows map for process process1 does not cover all regions and required years"
737        );
738    }
739
740    #[rstest]
741    fn flows_only_milestone_years(
742        #[from(svd_commodity)] commodity1: Commodity,
743        #[from(sed_commodity)] commodity2: Commodity,
744        process: Process,
745    ) {
746        let milestone_years = vec![2010, 2015, 2020];
747        let commodity1 = Rc::new(commodity1);
748        let commodity2 = Rc::new(commodity2);
749        let (mut processes, flows_map) = build_maps(
750            process,
751            [
752                (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
753                (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
754            ]
755            .into_iter(),
756            Some(milestone_years.clone()),
757        );
758        validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
759            .unwrap();
760    }
761
762    #[test]
763    fn flows_different_direction_different_years() {
764        let patch = FilePatch::new("process_flows.csv")
765            .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
766            .with_addition("GASPRC,GASPRD,all,2020;2030,-1.05,fixed,")
767            .with_addition("GASPRC,GASPRD,all,2040,1.05,fixed,");
768        assert_validate_fails_with_simple!(
769            vec![patch],
770            "Flow of commodity GASPRD in region GBR for process GASPRC behaves as input or output in different years."
771        );
772    }
773
774    #[test]
775    fn missing_flow() {
776        let patch = FilePatch::new("process_flows.csv")
777            .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
778            .with_addition("GASPRC,GASPRD,all,2020;2030,-1.05,fixed,");
779        assert_validate_fails_with_simple!(
780            vec![patch],
781            "Flow of commodity GASPRD in region GBR for process GASPRC does not cover all milestone years within the process range of activity."
782        );
783    }
784
785    #[test]
786    fn coeff_zero() {
787        let patch = FilePatch::new("process_flows.csv")
788            .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
789            .with_addition("GASPRC,GASPRD,all,2020;2030,-1.05,fixed,")
790            .with_addition("GASPRC,GASPRD,all,2040,0,fixed,");
791        assert_validate_ok_simple!(vec![patch]);
792    }
793
794    #[test]
795    fn flows_not_needed_before_time_horizon() {
796        // NB: Time horizon starts at 2020 for simple example
797        //
798        // Flows are only needed for milestone years. Check that users can omit them for
799        // non-milestone years.
800        let patches = vec![
801            FilePatch::new("processes.csv")
802                .with_deletion("GASDRV,Dry gas extraction,all,GASPRD,2020,2040,1.0,")
803                .with_addition("GASDRV,Dry gas extraction,all,GASPRD,1980,2040,1.0,"),
804            FilePatch::new("process_flows.csv")
805                .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
806                .with_addition("GASPRC,GASPRD,all,2020;2030;2040,-1.05,fixed,"),
807        ];
808        assert_validate_ok_simple!(patches);
809    }
810}