1use super::super::{input_err_msg, read_csv};
3use crate::commodity::{CommodityID, CommodityMap};
4use crate::process::{
5 FlowDirection, FlowType, ProcessFlow, ProcessFlowsMap, ProcessID, ProcessMap,
6};
7use crate::region::{RegionID, parse_region_str};
8use crate::units::{FlowPerActivity, MoneyPerFlow};
9use crate::year::parse_year_str;
10use anyhow::{Context, Result, ensure};
11use indexmap::IndexMap;
12use itertools::iproduct;
13use serde::Deserialize;
14use std::collections::HashMap;
15use std::path::Path;
16use std::rc::Rc;
17
18const PROCESS_FLOWS_FILE_NAME: &str = "process_flows.csv";
19
20#[derive(PartialEq, Debug, Deserialize)]
21struct ProcessFlowRaw {
22 process_id: String,
23 commodity_id: String,
24 commission_years: String,
25 regions: String,
26 coeff: FlowPerActivity,
27 #[serde(default)]
28 #[serde(rename = "type")]
29 kind: FlowType,
30 cost: Option<MoneyPerFlow>,
31}
32
33impl ProcessFlowRaw {
34 fn validate(&self) -> Result<()> {
35 ensure!(
37 self.coeff.is_finite(),
38 "Invalid value for coeff ({})",
39 self.coeff
40 );
41
42 ensure!(
44 self.kind == FlowType::Fixed,
45 "Commodity flexible assets are not currently supported"
46 );
47
48 if let Some(cost) = self.cost {
50 ensure!(
51 (cost.value() >= 0.0),
52 "Invalid value for flow cost ({cost}). Must be >=0."
53 );
54 }
55
56 Ok(())
57 }
58}
59
60pub fn read_process_flows(
62 model_dir: &Path,
63 processes: &mut ProcessMap,
64 commodities: &CommodityMap,
65 milestone_years: &[u32],
66) -> Result<HashMap<ProcessID, ProcessFlowsMap>> {
67 let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
68 let process_flow_csv = read_csv(&file_path)?;
69 read_process_flows_from_iter(process_flow_csv, processes, commodities, milestone_years)
70 .with_context(|| input_err_msg(&file_path))
71}
72
73fn read_process_flows_from_iter<I>(
75 iter: I,
76 processes: &mut ProcessMap,
77 commodities: &CommodityMap,
78 milestone_years: &[u32],
79) -> Result<HashMap<ProcessID, ProcessFlowsMap>>
80where
81 I: Iterator<Item = ProcessFlowRaw>,
82{
83 let mut flows_map: HashMap<ProcessID, ProcessFlowsMap> = HashMap::new();
84 for record in iter {
85 record.validate()?;
86
87 let (id, process) = processes
89 .get_key_value(record.process_id.as_str())
90 .with_context(|| format!("Process {} not found", record.process_id))?;
91
92 let process_regions = &process.regions;
94 let record_regions =
95 parse_region_str(&record.regions, process_regions).with_context(|| {
96 format!("Invalid region for process {id}. Valid regions are {process_regions:?}")
97 })?;
98
99 let process_years: Vec<u32> = process.years.clone().collect();
101 let record_years =
102 parse_year_str(&record.commission_years, &process_years).with_context(|| {
103 format!("Invalid year for process {id}. Valid years are {process_years:?}")
104 })?;
105
106 let commodity = commodities
108 .get(record.commodity_id.as_str())
109 .with_context(|| format!("{} is not a valid commodity ID", &record.commodity_id))?;
110
111 let process_flow = ProcessFlow {
113 commodity: Rc::clone(commodity),
114 coeff: record.coeff,
115 kind: FlowType::Fixed,
116 cost: record.cost.unwrap_or(MoneyPerFlow(0.0)),
117 };
118
119 let region_year_map = flows_map.entry(id.clone()).or_default();
121 for (year, region_id) in iproduct!(record_years, record_regions.iter()) {
122 let flows_map = region_year_map
123 .entry((region_id.clone(), year))
124 .or_default();
125 let existing = Rc::get_mut(flows_map)
126 .unwrap() .insert(commodity.id.clone(), process_flow.clone())
128 .is_some();
129 ensure!(
130 !existing,
131 "Duplicate process flow entry for region {}, year {} and commodity {}",
132 region_id,
133 year,
134 commodity.id
135 );
136 }
137 }
138
139 validate_flows_and_update_primary_output(processes, &flows_map, milestone_years)?;
140 validate_secondary_flows(processes, &flows_map, milestone_years)?;
141
142 Ok(flows_map)
143}
144
145fn validate_flows_and_update_primary_output(
146 processes: &mut ProcessMap,
147 flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
148 milestone_years: &[u32],
149) -> Result<()> {
150 for (process_id, process) in processes.iter_mut() {
151 let map = flows_map
152 .get(process_id)
153 .with_context(|| format!("Missing flows map for process {process_id}"))?;
154
155 let required_years = milestone_years
157 .iter()
158 .filter(|&y| process.years.contains(y));
159 let region_year: Vec<(&RegionID, &u32)> =
160 iproduct!(process.regions.iter(), required_years).collect();
161
162 ensure!(
163 region_year
164 .iter()
165 .all(|(region_id, year)| map.contains_key(&((*region_id).clone(), **year))),
166 "Flows map for process {process_id} does not cover all regions and required years"
167 );
168
169 let primary_output = if let Some(primary_output) = &process.primary_output {
170 Some(primary_output.clone())
171 } else {
172 let (region_id, year) = region_year[0];
173 infer_primary_output(&map[&(region_id.clone(), *year)]).with_context(|| {
174 format!("Could not infer primary_output for process {process_id}")
175 })?
176 };
177
178 for (region_id, &year) in region_year {
179 let flows = &map[&(region_id.clone(), year)];
180
181 check_flows_primary_output(flows, primary_output.as_ref()).with_context(|| {
183 format!(
184 "Invalid primary output configuration for process {process_id} \
185 (region: {region_id}, year: {year})"
186 )
187 })?;
188 }
189
190 if process.primary_output != primary_output {
192 Rc::get_mut(process).unwrap().primary_output = primary_output;
194 }
195 }
196
197 Ok(())
198}
199
200fn infer_primary_output(map: &IndexMap<CommodityID, ProcessFlow>) -> Result<Option<CommodityID>> {
204 let mut iter = map.iter().filter_map(|(commodity_id, flow)| {
205 (flow.direction() == FlowDirection::Output).then_some(commodity_id)
206 });
207
208 let Some(first_output) = iter.next() else {
209 return Ok(None);
211 };
212
213 ensure!(
214 iter.next().is_none(),
215 "Need to specify primary_output explicitly if there are multiple output flows"
216 );
217
218 Ok(Some(first_output.clone()))
219}
220
221fn check_flows_primary_output(
223 flows_map: &IndexMap<CommodityID, ProcessFlow>,
224 primary_output: Option<&CommodityID>,
225) -> Result<()> {
226 if let Some(primary_output) = primary_output {
227 let flow = flows_map.get(primary_output).with_context(|| {
228 format!("Primary output commodity '{primary_output}' isn't a process flow")
229 })?;
230
231 ensure!(
232 flow.direction() == FlowDirection::Output,
233 "Primary output commodity '{primary_output}' isn't an output flow",
234 );
235 } else {
236 ensure!(
237 flows_map
238 .values()
239 .all(|x| x.direction() == FlowDirection::Input
240 || x.direction() == FlowDirection::Zero),
241 "First year is only inputs, but subsequent years have outputs, although no primary \
242 output is specified"
243 );
244 }
245
246 Ok(())
247}
248
249fn validate_secondary_flows(
252 processes: &mut ProcessMap,
253 flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
254 milestone_years: &[u32],
255) -> Result<()> {
256 for (process_id, process) in processes.iter() {
257 let map = flows_map
259 .get(process_id)
260 .with_context(|| format!("Missing flows map for process {process_id}"))?;
261
262 let required_years: Vec<&u32> = milestone_years
264 .iter()
265 .filter(|&y| process.years.contains(y))
266 .collect();
267
268 let iter = iproduct!(process.years.clone(), process.regions.iter());
270 let mut flows: HashMap<(CommodityID, RegionID), Vec<&ProcessFlow>> = HashMap::new();
271 let mut number_of_years: HashMap<(CommodityID, RegionID), u32> = HashMap::new();
272 for (year, region_id) in iter {
273 if let Some(commodity_map) = map.get(&(region_id.clone(), year)) {
274 let flow = commodity_map.iter().filter_map(|(commodity_id, flow)| {
275 (Some(commodity_id) != process.primary_output.as_ref())
276 .then_some(((commodity_id.clone(), region_id.clone()), flow))
277 });
278
279 for (key, value) in flow {
280 flows.entry(key.clone()).or_default().push(value);
281 if required_years.contains(&&year) {
282 *number_of_years.entry(key).or_default() += 1;
283 }
284 }
285 }
286 }
287
288 for ((commodity_id, region_id), value) in &flows {
292 ensure!(
293 number_of_years[&(commodity_id.clone(), region_id.clone())]
294 == required_years.len().try_into().unwrap(),
295 "Flow of commodity {commodity_id} in region {region_id} for process {process_id} \
296 does not cover all milestone years within the process range of activity."
297 );
298 let input_or_zero = value
299 .iter()
300 .all(|&x| [FlowDirection::Input, FlowDirection::Zero].contains(&x.direction()));
301 let output_or_zero = value
302 .iter()
303 .all(|&x| [FlowDirection::Output, FlowDirection::Zero].contains(&x.direction()));
304 ensure!(
305 input_or_zero || output_or_zero,
306 "Flow of commodity {commodity_id} in region {region_id} for process {process_id} \
307 behaves as input or output in different years."
308 );
309 }
310 }
311
312 Ok(())
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318 use crate::commodity::Commodity;
319 use crate::fixture::{assert_error, process, sed_commodity, svd_commodity};
320 use crate::process::{FlowType, Process, ProcessFlow, ProcessMap};
321 use crate::units::{FlowPerActivity, MoneyPerFlow};
322 use indexmap::IndexMap;
323 use itertools::Itertools;
324 use map_macro::hash_map;
325 use rstest::rstest;
326 use std::iter;
327 use std::rc::Rc;
328
329 fn flow(commodity: Rc<Commodity>, coeff: f64) -> ProcessFlow {
330 ProcessFlow {
331 commodity,
332 coeff: FlowPerActivity(coeff),
333 kind: FlowType::Fixed,
334 cost: MoneyPerFlow(0.0),
335 }
336 }
337
338 fn build_maps<I>(
339 process: Process,
340 flows: I,
341 years: Option<Vec<u32>>,
342 ) -> (ProcessMap, HashMap<ProcessID, ProcessFlowsMap>)
343 where
344 I: Clone + Iterator<Item = (CommodityID, ProcessFlow)>,
345 {
346 let years = years.unwrap_or(process.years.clone().collect());
347 let map: Rc<IndexMap<_, _>> = Rc::new(flows.clone().collect());
348 let flows_inner = iproduct!(&process.regions, years)
349 .map(|(region_id, year)| ((region_id.clone(), year), map.clone()))
350 .collect();
351 let flows = hash_map! {process.id.clone() => flows_inner};
352 let processes = iter::once((process.id.clone(), process.into())).collect();
353
354 (processes, flows)
355 }
356
357 #[rstest]
358 fn single_output_infer_primary(#[from(svd_commodity)] commodity: Commodity, process: Process) {
359 let milestone_years = vec![2010, 2020];
360 let commodity = Rc::new(commodity);
361 let (mut processes, flows_map) = build_maps(
362 process,
363 std::iter::once((commodity.id.clone(), flow(commodity.clone(), 1.0))),
364 None,
365 );
366 assert!(
367 validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
368 .is_ok()
369 );
370 assert_eq!(
371 processes.values().exactly_one().unwrap().primary_output,
372 Some(commodity.id.clone())
373 );
374 }
375
376 #[rstest]
377 fn multiple_outputs_error(
378 #[from(svd_commodity)] commodity1: Commodity,
379 #[from(sed_commodity)] commodity2: Commodity,
380 process: Process,
381 ) {
382 let milestone_years: Vec<u32> = vec![2010, 2020];
383 let commodity1 = Rc::new(commodity1);
384 let commodity2 = Rc::new(commodity2);
385 let (mut processes, flows_map) = build_maps(
386 process,
387 [
388 (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
389 (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
390 ]
391 .into_iter(),
392 None,
393 );
394 let res =
395 validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years);
396 assert_error!(res, "Could not infer primary_output for process process1");
397 }
398
399 #[rstest]
400 fn explicit_primary_output(
401 #[from(svd_commodity)] commodity1: Commodity,
402 #[from(sed_commodity)] commodity2: Commodity,
403 process: Process,
404 ) {
405 let milestone_years = vec![2010, 2020];
406 let commodity1 = Rc::new(commodity1);
407 let commodity2 = Rc::new(commodity2);
408 let mut process = process;
409 process.primary_output = Some(commodity2.id.clone());
410 let (mut processes, flows_map) = build_maps(
411 process,
412 [
413 (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
414 (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
415 ]
416 .into_iter(),
417 None,
418 );
419 assert!(
420 validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
421 .is_ok()
422 );
423 assert_eq!(
424 processes.values().exactly_one().unwrap().primary_output,
425 Some(commodity2.id.clone())
426 );
427 }
428
429 #[rstest]
430 fn all_inputs_no_primary(
431 #[from(svd_commodity)] commodity1: Commodity,
432 #[from(sed_commodity)] commodity2: Commodity,
433 process: Process,
434 ) {
435 let milestone_years = vec![2010, 2020];
436 let commodity1 = Rc::new(commodity1);
437 let commodity2 = Rc::new(commodity2);
438 let (mut processes, flows_map) = build_maps(
439 process,
440 [
441 (commodity1.id.clone(), flow(commodity1.clone(), -1.0)),
442 (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
443 ]
444 .into_iter(),
445 None,
446 );
447 assert!(
448 validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
449 .is_ok()
450 );
451 assert_eq!(
452 processes.values().exactly_one().unwrap().primary_output,
453 None
454 );
455 }
456
457 #[rstest]
458 fn flows_not_in_all_milestone_years(
459 #[from(svd_commodity)] commodity1: Commodity,
460 #[from(sed_commodity)] commodity2: Commodity,
461 process: Process,
462 ) {
463 let milestone_years = vec![2010, 2015, 2020];
464 let flow_years = vec![2010, 2020];
465 let commodity1 = Rc::new(commodity1);
466 let commodity2 = Rc::new(commodity2);
467 let (mut processes, flows_map) = build_maps(
468 process,
469 [
470 (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
471 (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
472 ]
473 .into_iter(),
474 Some(flow_years),
475 );
476 let res =
477 validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years);
478 assert_error!(
479 res,
480 "Flows map for process process1 does not cover all regions and required years"
481 );
482 }
483
484 #[rstest]
485 fn flows_only_milestone_years(
486 #[from(svd_commodity)] commodity1: Commodity,
487 #[from(sed_commodity)] commodity2: Commodity,
488 process: Process,
489 ) {
490 let milestone_years = vec![2010, 2015, 2020];
491 let commodity1 = Rc::new(commodity1);
492 let commodity2 = Rc::new(commodity2);
493 let (mut processes, flows_map) = build_maps(
494 process,
495 [
496 (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
497 (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
498 ]
499 .into_iter(),
500 Some(milestone_years.clone()),
501 );
502 assert!(
503 validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
504 .is_ok()
505 );
506 }
507}