1use super::super::{input_err_msg, read_csv};
3use crate::commodity::{CommodityID, CommodityMap};
4use crate::process::{
5 FlowDirection, FlowType, ProcessFlow, ProcessFlowsMap, ProcessID, ProcessMap,
6};
7use crate::region::{RegionID, parse_region_str};
8use crate::units::{FlowPerActivity, MoneyPerFlow};
9use crate::year::parse_year_str;
10use anyhow::{Context, Result, ensure};
11use indexmap::IndexMap;
12use itertools::iproduct;
13use serde::Deserialize;
14use std::collections::HashMap;
15use std::path::Path;
16use std::rc::Rc;
17
18const PROCESS_FLOWS_FILE_NAME: &str = "process_flows.csv";
19
20#[derive(PartialEq, Debug, Deserialize)]
21struct ProcessFlowRaw {
22 process_id: String,
23 commodity_id: String,
24 commission_years: String,
25 regions: String,
26 coeff: FlowPerActivity,
27 #[serde(default)]
28 #[serde(rename = "type")]
29 kind: FlowType,
30 cost: Option<MoneyPerFlow>,
31}
32
33impl ProcessFlowRaw {
34 fn validate(&self) -> Result<()> {
35 ensure!(
37 self.coeff.is_finite(),
38 "Invalid value for coeff ({})",
39 self.coeff
40 );
41
42 ensure!(
44 self.kind == FlowType::Fixed,
45 "Commodity flexible assets are not currently supported"
46 );
47
48 if let Some(cost) = self.cost {
50 ensure!(
51 (cost.value() >= 0.0),
52 "Invalid value for flow cost ({cost}). Must be >=0."
53 );
54 }
55
56 Ok(())
57 }
58}
59
60pub fn read_process_flows(
73 model_dir: &Path,
74 processes: &mut ProcessMap,
75 commodities: &CommodityMap,
76 milestone_years: &[u32],
77) -> Result<HashMap<ProcessID, ProcessFlowsMap>> {
78 let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
79 let process_flow_csv = read_csv(&file_path)?;
80 read_process_flows_from_iter(process_flow_csv, processes, commodities, milestone_years)
81 .with_context(|| input_err_msg(&file_path))
82}
83
84fn read_process_flows_from_iter<I>(
97 iter: I,
98 processes: &mut ProcessMap,
99 commodities: &CommodityMap,
100 milestone_years: &[u32],
101) -> Result<HashMap<ProcessID, ProcessFlowsMap>>
102where
103 I: Iterator<Item = ProcessFlowRaw>,
104{
105 let mut flows_map: HashMap<ProcessID, ProcessFlowsMap> = HashMap::new();
106 for record in iter {
107 record.validate()?;
108
109 let (id, process) = processes
111 .get_key_value(record.process_id.as_str())
112 .with_context(|| format!("Process {} not found", record.process_id))?;
113
114 let process_regions = &process.regions;
116 let record_regions =
117 parse_region_str(&record.regions, process_regions).with_context(|| {
118 format!("Invalid region for process {id}. Valid regions are {process_regions:?}")
119 })?;
120
121 let process_years: Vec<u32> = process.years.clone().collect();
123 let record_years =
124 parse_year_str(&record.commission_years, &process_years).with_context(|| {
125 format!("Invalid year for process {id}. Valid years are {process_years:?}")
126 })?;
127
128 let commodity = commodities
130 .get(record.commodity_id.as_str())
131 .with_context(|| format!("{} is not a valid commodity ID", &record.commodity_id))?;
132
133 let process_flow = ProcessFlow {
135 commodity: Rc::clone(commodity),
136 coeff: record.coeff,
137 kind: FlowType::Fixed,
138 cost: record.cost.unwrap_or(MoneyPerFlow(0.0)),
139 };
140
141 let region_year_map = flows_map.entry(id.clone()).or_default();
143 for (year, region_id) in iproduct!(record_years, record_regions.iter()) {
144 let flows_map = region_year_map
145 .entry((region_id.clone(), year))
146 .or_default();
147 let existing = Rc::get_mut(flows_map)
148 .unwrap() .insert(commodity.id.clone(), process_flow.clone())
150 .is_some();
151 ensure!(
152 !existing,
153 "Duplicate process flow entry for region {}, year {} and commodity {}",
154 region_id,
155 year,
156 commodity.id
157 );
158 }
159 }
160
161 validate_flows_and_update_primary_output(processes, &flows_map, milestone_years)?;
162 validate_secondary_flows(processes, &flows_map, milestone_years)?;
163
164 Ok(flows_map)
165}
166
167fn validate_flows_and_update_primary_output(
168 processes: &mut ProcessMap,
169 flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
170 milestone_years: &[u32],
171) -> Result<()> {
172 for (process_id, process) in processes.iter_mut() {
173 let map = flows_map
174 .get(process_id)
175 .with_context(|| format!("Missing flows map for process {process_id}"))?;
176
177 let required_years = milestone_years
179 .iter()
180 .filter(|&y| process.years.contains(y));
181 let region_year: Vec<(&RegionID, &u32)> =
182 iproduct!(process.regions.iter(), required_years).collect();
183
184 ensure!(
185 region_year
186 .iter()
187 .all(|(region_id, year)| map.contains_key(&((*region_id).clone(), **year))),
188 "Flows map for process {process_id} does not cover all regions and required years"
189 );
190
191 let primary_output = if let Some(primary_output) = &process.primary_output {
192 Some(primary_output.clone())
193 } else {
194 let (region_id, year) = region_year[0];
195 infer_primary_output(&map[&(region_id.clone(), *year)]).with_context(|| {
196 format!("Could not infer primary_output for process {process_id}")
197 })?
198 };
199
200 for (region_id, &year) in region_year {
201 let flows = &map[&(region_id.clone(), year)];
202
203 check_flows_primary_output(flows, primary_output.as_ref()).with_context(|| {
205 format!(
206 "Invalid primary output configuration for process {process_id} \
207 (region: {region_id}, year: {year})"
208 )
209 })?;
210 }
211
212 if process.primary_output != primary_output {
214 Rc::get_mut(process).unwrap().primary_output = primary_output;
216 }
217 }
218
219 Ok(())
220}
221
222fn infer_primary_output(map: &IndexMap<CommodityID, ProcessFlow>) -> Result<Option<CommodityID>> {
226 let mut iter = map.iter().filter_map(|(commodity_id, flow)| {
227 (flow.direction() == FlowDirection::Output).then_some(commodity_id)
228 });
229
230 let Some(first_output) = iter.next() else {
231 return Ok(None);
233 };
234
235 ensure!(
236 iter.next().is_none(),
237 "Need to specify primary_output explicitly if there are multiple output flows"
238 );
239
240 Ok(Some(first_output.clone()))
241}
242
243fn check_flows_primary_output(
245 flows_map: &IndexMap<CommodityID, ProcessFlow>,
246 primary_output: Option<&CommodityID>,
247) -> Result<()> {
248 if let Some(primary_output) = primary_output {
249 let flow = flows_map.get(primary_output).with_context(|| {
250 format!("Primary output commodity '{primary_output}' isn't a process flow")
251 })?;
252
253 ensure!(
254 flow.direction() == FlowDirection::Output,
255 "Primary output commodity '{primary_output}' isn't an output flow",
256 );
257 } else {
258 ensure!(
259 flows_map
260 .values()
261 .all(|x| x.direction() == FlowDirection::Input
262 || x.direction() == FlowDirection::Zero),
263 "First year is only inputs, but subsequent years have outputs, although no primary \
264 output is specified"
265 );
266 }
267
268 Ok(())
269}
270
271fn validate_secondary_flows(
274 processes: &mut ProcessMap,
275 flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
276 milestone_years: &[u32],
277) -> Result<()> {
278 for (process_id, process) in processes.iter() {
279 let map = flows_map
281 .get(process_id)
282 .with_context(|| format!("Missing flows map for process {process_id}"))?;
283
284 let required_years: Vec<&u32> = milestone_years
286 .iter()
287 .filter(|&y| process.years.contains(y))
288 .collect();
289
290 let iter = iproduct!(process.years.clone(), process.regions.iter());
292 let mut flows: HashMap<(CommodityID, RegionID), Vec<&ProcessFlow>> = HashMap::new();
293 let mut number_of_years: HashMap<(CommodityID, RegionID), u32> = HashMap::new();
294 for (year, region_id) in iter {
295 if let Some(commodity_map) = map.get(&(region_id.clone(), year)) {
296 let flow = commodity_map.iter().filter_map(|(commodity_id, flow)| {
297 (Some(commodity_id) != process.primary_output.as_ref())
298 .then_some(((commodity_id.clone(), region_id.clone()), flow))
299 });
300
301 for (key, value) in flow {
302 flows.entry(key.clone()).or_default().push(value);
303 if required_years.contains(&&year) {
304 *number_of_years.entry(key).or_default() += 1;
305 }
306 }
307 }
308 }
309
310 for ((commodity_id, region_id), value) in &flows {
314 ensure!(
315 number_of_years[&(commodity_id.clone(), region_id.clone())]
316 == required_years.len().try_into().unwrap(),
317 "Flow of commodity {commodity_id} in region {region_id} for process {process_id} \
318 does not cover all milestone years within the process range of activity."
319 );
320 let input_or_zero = value
321 .iter()
322 .all(|&x| [FlowDirection::Input, FlowDirection::Zero].contains(&x.direction()));
323 let output_or_zero = value
324 .iter()
325 .all(|&x| [FlowDirection::Output, FlowDirection::Zero].contains(&x.direction()));
326 ensure!(
327 input_or_zero || output_or_zero,
328 "Flow of commodity {commodity_id} in region {region_id} for process {process_id} \
329 behaves as input or output in different years."
330 );
331 }
332 }
333
334 Ok(())
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use crate::commodity::Commodity;
341 use crate::fixture::{
342 assert_error, assert_validate_fails_with_simple, assert_validate_ok_simple, process,
343 sed_commodity, svd_commodity,
344 };
345 use crate::patch::FilePatch;
346 use crate::process::{FlowType, Process, ProcessFlow, ProcessMap};
347 use crate::units::{FlowPerActivity, MoneyPerFlow};
348 use indexmap::IndexMap;
349 use itertools::Itertools;
350 use map_macro::hash_map;
351 use rstest::rstest;
352 use std::iter;
353 use std::rc::Rc;
354
355 fn flow(commodity: Rc<Commodity>, coeff: f64) -> ProcessFlow {
356 ProcessFlow {
357 commodity,
358 coeff: FlowPerActivity(coeff),
359 kind: FlowType::Fixed,
360 cost: MoneyPerFlow(0.0),
361 }
362 }
363
364 fn build_maps<I>(
365 process: Process,
366 flows: I,
367 years: Option<Vec<u32>>,
368 ) -> (ProcessMap, HashMap<ProcessID, ProcessFlowsMap>)
369 where
370 I: Clone + Iterator<Item = (CommodityID, ProcessFlow)>,
371 {
372 let years = years.unwrap_or(process.years.clone().collect());
373 let map: Rc<IndexMap<_, _>> = Rc::new(flows.collect());
374 let flows_inner = iproduct!(&process.regions, years)
375 .map(|(region_id, year)| ((region_id.clone(), year), map.clone()))
376 .collect();
377 let flows = hash_map! {process.id.clone() => flows_inner};
378 let processes = iter::once((process.id.clone(), process.into())).collect();
379
380 (processes, flows)
381 }
382
383 #[rstest]
384 fn single_output_infer_primary(#[from(svd_commodity)] commodity: Commodity, process: Process) {
385 let milestone_years = vec![2010, 2020];
386 let commodity = Rc::new(commodity);
387 let (mut processes, flows_map) = build_maps(
388 process,
389 std::iter::once((commodity.id.clone(), flow(commodity.clone(), 1.0))),
390 None,
391 );
392 validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
393 .unwrap();
394 assert_eq!(
395 processes.values().exactly_one().unwrap().primary_output,
396 Some(commodity.id.clone())
397 );
398 }
399
400 #[rstest]
401 fn multiple_outputs_error(
402 #[from(svd_commodity)] commodity1: Commodity,
403 #[from(sed_commodity)] commodity2: Commodity,
404 process: Process,
405 ) {
406 let milestone_years: Vec<u32> = vec![2010, 2020];
407 let commodity1 = Rc::new(commodity1);
408 let commodity2 = Rc::new(commodity2);
409 let (mut processes, flows_map) = build_maps(
410 process,
411 [
412 (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
413 (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
414 ]
415 .into_iter(),
416 None,
417 );
418 let res =
419 validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years);
420 assert_error!(res, "Could not infer primary_output for process process1");
421 }
422
423 #[rstest]
424 fn explicit_primary_output(
425 #[from(svd_commodity)] commodity1: Commodity,
426 #[from(sed_commodity)] commodity2: Commodity,
427 process: Process,
428 ) {
429 let milestone_years = vec![2010, 2020];
430 let commodity1 = Rc::new(commodity1);
431 let commodity2 = Rc::new(commodity2);
432 let mut process = process;
433 process.primary_output = Some(commodity2.id.clone());
434 let (mut processes, flows_map) = build_maps(
435 process,
436 [
437 (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
438 (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
439 ]
440 .into_iter(),
441 None,
442 );
443 validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
444 .unwrap();
445 assert_eq!(
446 processes.values().exactly_one().unwrap().primary_output,
447 Some(commodity2.id.clone())
448 );
449 }
450
451 #[rstest]
452 fn all_inputs_no_primary(
453 #[from(svd_commodity)] commodity1: Commodity,
454 #[from(sed_commodity)] commodity2: Commodity,
455 process: Process,
456 ) {
457 let milestone_years = vec![2010, 2020];
458 let commodity1 = Rc::new(commodity1);
459 let commodity2 = Rc::new(commodity2);
460 let (mut processes, flows_map) = build_maps(
461 process,
462 [
463 (commodity1.id.clone(), flow(commodity1.clone(), -1.0)),
464 (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
465 ]
466 .into_iter(),
467 None,
468 );
469 validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
470 .unwrap();
471 assert_eq!(
472 processes.values().exactly_one().unwrap().primary_output,
473 None
474 );
475 }
476
477 #[rstest]
478 fn flows_not_in_all_milestone_years(
479 #[from(svd_commodity)] commodity1: Commodity,
480 #[from(sed_commodity)] commodity2: Commodity,
481 process: Process,
482 ) {
483 let milestone_years = vec![2010, 2015, 2020];
484 let flow_years = vec![2010, 2020];
485 let commodity1 = Rc::new(commodity1);
486 let commodity2 = Rc::new(commodity2);
487 let (mut processes, flows_map) = build_maps(
488 process,
489 [
490 (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
491 (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
492 ]
493 .into_iter(),
494 Some(flow_years),
495 );
496 let res =
497 validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years);
498 assert_error!(
499 res,
500 "Flows map for process process1 does not cover all regions and required years"
501 );
502 }
503
504 #[rstest]
505 fn flows_only_milestone_years(
506 #[from(svd_commodity)] commodity1: Commodity,
507 #[from(sed_commodity)] commodity2: Commodity,
508 process: Process,
509 ) {
510 let milestone_years = vec![2010, 2015, 2020];
511 let commodity1 = Rc::new(commodity1);
512 let commodity2 = Rc::new(commodity2);
513 let (mut processes, flows_map) = build_maps(
514 process,
515 [
516 (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
517 (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
518 ]
519 .into_iter(),
520 Some(milestone_years.clone()),
521 );
522 validate_flows_and_update_primary_output(&mut processes, &flows_map, &milestone_years)
523 .unwrap();
524 }
525
526 #[test]
527 fn flows_different_direction_different_years() {
528 let patch = FilePatch::new("process_flows.csv")
529 .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
530 .with_addition("GASPRC,GASPRD,all,2020;2030,-1.05,fixed,")
531 .with_addition("GASPRC,GASPRD,all,2040,1.05,fixed,");
532 assert_validate_fails_with_simple!(
533 vec![patch],
534 "Flow of commodity GASPRD in region GBR for process GASPRC behaves as input or output in different years."
535 );
536 }
537
538 #[test]
539 fn missing_flow() {
540 let patch = FilePatch::new("process_flows.csv")
541 .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
542 .with_addition("GASPRC,GASPRD,all,2020;2030,-1.05,fixed,");
543 assert_validate_fails_with_simple!(
544 vec![patch],
545 "Flow of commodity GASPRD in region GBR for process GASPRC does not cover all milestone years within the process range of activity."
546 );
547 }
548
549 #[test]
550 fn coeff_zero() {
551 let patch = FilePatch::new("process_flows.csv")
552 .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
553 .with_addition("GASPRC,GASPRD,all,2020;2030,-1.05,fixed,")
554 .with_addition("GASPRC,GASPRD,all,2040,0,fixed,");
555 assert_validate_ok_simple!(vec![patch]);
556 }
557
558 #[test]
559 fn flows_not_needed_before_time_horizon() {
560 let patches = vec![
565 FilePatch::new("processes.csv")
566 .with_deletion("GASDRV,Dry gas extraction,all,GASPRD,2020,2040,1.0,")
567 .with_addition("GASDRV,Dry gas extraction,all,GASPRD,1980,2040,1.0,"),
568 FilePatch::new("process_flows.csv")
569 .with_deletion("GASPRC,GASPRD,all,all,-1.05,fixed,")
570 .with_addition("GASPRC,GASPRD,all,2020;2030;2040,-1.05,fixed,"),
571 ];
572 assert_validate_ok_simple!(patches);
573 }
574}