1use super::super::*;
3use crate::commodity::{CommodityID, CommodityMap};
4use crate::process::{FlowType, ProcessFlow, ProcessFlowsMap, ProcessID, ProcessMap};
5use crate::region::{parse_region_str, RegionID};
6use crate::units::{FlowPerActivity, MoneyPerFlow};
7use crate::year::parse_year_str;
8use anyhow::{ensure, Context, Result};
9use itertools::iproduct;
10use serde::Deserialize;
11use std::collections::HashMap;
12use std::path::Path;
13use std::rc::Rc;
14
15const PROCESS_FLOWS_FILE_NAME: &str = "process_flows.csv";
16
17type PrimaryOutputsKeys = (ProcessID, RegionID, u32);
18type PrimaryOutputsValues = Vec<(CommodityID, Option<bool>)>;
19type PrimaryOutputsMap = HashMap<PrimaryOutputsKeys, PrimaryOutputsValues>;
20
21#[derive(PartialEq, Debug, Deserialize)]
22struct ProcessFlowRaw {
23 process_id: String,
24 commodity_id: String,
25 years: String,
26 regions: String,
27 coeff: FlowPerActivity,
28 #[serde(default)]
29 #[serde(rename = "type")]
30 kind: FlowType,
31 cost: Option<MoneyPerFlow>,
32 is_primary_output: Option<bool>,
33}
34
35impl ProcessFlowRaw {
36 fn validate(&self) -> Result<()> {
37 ensure!(
39 self.coeff.is_normal(),
40 "Invalid value for coeff ({})",
41 self.coeff
42 );
43
44 ensure!(
46 self.kind == FlowType::Fixed,
47 "Commodity flexible assets are not currently supported"
48 );
49
50 if let Some(cost) = self.cost {
52 ensure!(
53 (0.0..f64::INFINITY).contains(&cost.value()),
54 "Invalid value for flow cost ({cost}). Must be >=0."
55 )
56 }
57
58 Ok(())
59 }
60}
61
62pub fn read_process_flows(
64 model_dir: &Path,
65 processes: &ProcessMap,
66 commodities: &CommodityMap,
67) -> Result<HashMap<ProcessID, ProcessFlowsMap>> {
68 let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
69 let process_flow_csv = read_csv(&file_path)?;
70 read_process_flows_from_iter(process_flow_csv, processes, commodities)
71 .with_context(|| input_err_msg(&file_path))
72}
73
74fn read_process_flows_from_iter<I>(
76 iter: I,
77 processes: &ProcessMap,
78 commodities: &CommodityMap,
79) -> Result<HashMap<ProcessID, ProcessFlowsMap>>
80where
81 I: Iterator<Item = ProcessFlowRaw>,
82{
83 let mut flows_map: HashMap<ProcessID, ProcessFlowsMap> = HashMap::new();
84 let mut primary_outputs = PrimaryOutputsMap::new();
85 for record in iter {
86 record.validate()?;
87
88 let (id, process) = processes
90 .get_key_value(record.process_id.as_str())
91 .with_context(|| format!("Process {} not found", record.process_id))?;
92
93 let process_regions = &process.regions;
95 let record_regions =
96 parse_region_str(&record.regions, process_regions).with_context(|| {
97 format!("Invalid region for process {id}. Valid regions are {process_regions:?}")
98 })?;
99
100 let process_years = &process.years;
102 let record_years = parse_year_str(&record.years, process_years).with_context(|| {
103 format!("Invalid year for process {id}. Valid years are {process_years:?}")
104 })?;
105
106 let commodity = commodities
108 .get(record.commodity_id.as_str())
109 .with_context(|| format!("{} is not a valid commodity ID", &record.commodity_id))?;
110
111 let process_flow = ProcessFlow {
113 commodity: Rc::clone(commodity),
114 coeff: record.coeff,
115 kind: FlowType::Fixed,
116 cost: record.cost.unwrap_or(MoneyPerFlow(0.0)),
117 is_primary_output: false, };
119
120 let region_year_map = flows_map.entry(id.clone()).or_default();
122 for (year, region_id) in iproduct!(record_years, record_regions.iter()) {
123 let flows_map = region_year_map
124 .entry((region_id.clone(), year))
125 .or_default();
126 let existing = flows_map
127 .insert(commodity.id.clone(), process_flow.clone())
128 .is_some();
129 ensure!(
130 !existing,
131 "Duplicate process flow entry for region {}, year {} and commodity {}",
132 region_id,
133 year,
134 commodity.id
135 );
136
137 primary_outputs
138 .entry((id.clone(), region_id.clone(), year))
139 .or_insert_with(|| Vec::with_capacity(1))
140 .push((commodity.id.clone(), record.is_primary_output))
141 }
142 }
143
144 validate_flows_and_update_primary_output(processes, &mut flows_map, &primary_outputs)?;
145
146 Ok(flows_map)
147}
148
149fn validate_flows_and_update_primary_output(
150 processes: &ProcessMap,
151 flows_map: &mut HashMap<ProcessID, ProcessFlowsMap>,
152 primary_outputs: &PrimaryOutputsMap,
153) -> Result<()> {
154 for (process_id, map) in flows_map.iter_mut() {
155 let process = processes.get(process_id).unwrap();
156 for (&year, region_id) in iproduct!(process.years.iter(), process.regions.iter()) {
157 let Some(flows) = map.get_mut(&(region_id.clone(), year)) else {
159 bail!("Missing entry for process {process_id} in {region_id}/{year}");
160 };
161
162 let primary_outputs = primary_outputs
163 .get(&(process_id.clone(), region_id.clone(), year))
164 .unwrap();
165
166 let inferred_primary_output = validate_or_infer_primary_output(flows, primary_outputs)
167 .with_context(|| {
168 format!(
169 "Invalid primary output configuration for process {process_id} (region: {region_id}, year: {year})"
170 )
171 })?;
172
173 if let Some(primary_output) = inferred_primary_output {
176 flows.get_mut(&primary_output).unwrap().is_primary_output = true;
177 }
178 }
179 }
180
181 Ok(())
182}
183
184fn validate_or_infer_primary_output(
185 flows_map: &IndexMap<CommodityID, ProcessFlow>,
186 primary_outputs: &PrimaryOutputsValues,
187) -> Result<Option<CommodityID>> {
188 let mut has_primary = false;
189 let mut output_flow = None;
190 let mut outputs_count = 0;
191 for (commodity_id, is_primary_output) in primary_outputs.iter() {
192 let is_output = flows_map.get(commodity_id).unwrap().is_output();
193 if is_output {
194 outputs_count += 1;
195 }
196 match *is_primary_output {
197 Some(true) => {
198 ensure!(
199 is_output,
200 "Commodity {commodity_id} cannot be the primary output as it is an input flow"
201 );
202 ensure!(
203 !has_primary,
204 "Multiple commodities designated as primary outputs"
205 );
206 has_primary = true;
207 }
208 None if is_output => {
209 output_flow = Some(commodity_id.clone());
210 }
211 _ => {}
212 }
213 }
214
215 if has_primary || outputs_count == 0 {
217 return Ok(None);
218 }
219
220 ensure!(
221 output_flow.is_some(),
222 "There are one or more output flows, but is_primary_output is explicitly set to false for these");
223
224 ensure!(
225 outputs_count == 1,
226 "There is more than one output flow, so one must be explicitly designated as the primary output");
227
228 Ok(output_flow)
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::commodity::Commodity;
236 use crate::fixture::svd_commodity;
237
238 use rstest::rstest;
239 use std::rc::Rc;
240
241 fn flow(commodity: Rc<Commodity>, coeff: f64) -> ProcessFlow {
242 ProcessFlow {
243 commodity,
244 coeff: FlowPerActivity(coeff),
245 kind: FlowType::Fixed,
246 cost: MoneyPerFlow(0.0),
247 is_primary_output: false,
248 }
249 }
250
251 fn create_process_flow_raw(
252 coeff: FlowPerActivity,
253 cost: Option<MoneyPerFlow>,
254 is_primary_output: Option<bool>,
255 ) -> ProcessFlowRaw {
256 ProcessFlowRaw {
257 process_id: "process".into(),
258 commodity_id: "commodity".into(),
259 years: "2020".into(),
260 regions: "region".into(),
261 coeff,
262 kind: FlowType::Fixed,
263 cost,
264 is_primary_output,
265 }
266 }
267
268 #[test]
269 fn test_validate_flow_raw() {
270 let valid =
272 create_process_flow_raw(FlowPerActivity(1.0), Some(MoneyPerFlow(0.0)), Some(false));
273 assert!(valid.validate().is_ok());
274
275 let invalid =
277 create_process_flow_raw(FlowPerActivity(0.0), Some(MoneyPerFlow(0.0)), Some(false));
278 assert!(invalid.validate().is_err());
279 let invalid = create_process_flow_raw(
280 FlowPerActivity(f64::NAN),
281 Some(MoneyPerFlow(0.0)),
282 Some(false),
283 );
284 assert!(invalid.validate().is_err());
285 let invalid = create_process_flow_raw(
286 FlowPerActivity(f64::INFINITY),
287 Some(MoneyPerFlow(0.0)),
288 Some(false),
289 );
290 assert!(invalid.validate().is_err());
291 let invalid = create_process_flow_raw(
292 FlowPerActivity(f64::NEG_INFINITY),
293 Some(MoneyPerFlow(0.0)),
294 Some(false),
295 );
296 assert!(invalid.validate().is_err());
297
298 let invalid = create_process_flow_raw(
300 FlowPerActivity(1.0),
301 Some(MoneyPerFlow(f64::NAN)),
302 Some(false),
303 );
304 assert!(invalid.validate().is_err());
305 let invalid = create_process_flow_raw(
306 FlowPerActivity(1.0),
307 Some(MoneyPerFlow(f64::NEG_INFINITY)),
308 Some(false),
309 );
310 assert!(invalid.validate().is_err());
311 let invalid = create_process_flow_raw(
312 FlowPerActivity(1.0),
313 Some(MoneyPerFlow(f64::INFINITY)),
314 Some(false),
315 );
316 assert!(invalid.validate().is_err());
317 }
318
319 #[rstest]
320 fn single_output_explicit_primary(#[from(svd_commodity)] commodity: Commodity) {
321 let c1 = Rc::new(commodity);
322 let mut flows = IndexMap::new();
323 flows.insert("commodity1".into(), flow(Rc::clone(&c1), 1.0));
324 let primary_outputs = vec![("commodity1".into(), Some(true))];
325 let res = validate_or_infer_primary_output(&flows, &primary_outputs).unwrap();
326 assert_eq!(res, None);
327 }
328
329 #[rstest]
330 fn multiple_outputs_one_explicit_primary(
331 #[from(svd_commodity)] commodity1: Commodity,
332 #[from(svd_commodity)] commodity2: Commodity,
333 ) {
334 let c1 = Rc::new(Commodity {
335 id: "c1".into(),
336 ..commodity1
337 });
338 let c2 = Rc::new(Commodity {
339 id: "c2".into(),
340 ..commodity2
341 });
342 let mut flows = IndexMap::new();
343 flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
344 flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
345 let primary_outputs = vec![("c1".into(), Some(true)), ("c2".into(), None)];
346 let res = validate_or_infer_primary_output(&flows, &primary_outputs).unwrap();
347 assert_eq!(res, None);
348 }
349
350 #[rstest]
351 fn multiple_outputs_none_explicit_should_error(
352 #[from(svd_commodity)] commodity1: Commodity,
353 #[from(svd_commodity)] commodity2: Commodity,
354 ) {
355 let c1 = Rc::new(Commodity {
356 id: "c1".into(),
357 ..commodity1
358 });
359 let c2 = Rc::new(Commodity {
360 id: "c2".into(),
361 ..commodity2
362 });
363 let mut flows = IndexMap::new();
364 flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
365 flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
366 let primary_outputs = vec![("c1".into(), None), ("c2".into(), None)];
367 let res = validate_or_infer_primary_output(&flows, &primary_outputs);
368 assert!(res.is_err());
369 }
370
371 #[rstest]
372 fn multiple_outputs_all_explicit_false_should_error(
373 #[from(svd_commodity)] commodity1: Commodity,
374 #[from(svd_commodity)] commodity2: Commodity,
375 ) {
376 let c1 = Rc::new(Commodity {
377 id: "c1".into(),
378 ..commodity1
379 });
380 let c2 = Rc::new(Commodity {
381 id: "c2".into(),
382 ..commodity2
383 });
384 let mut flows = IndexMap::new();
385 flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
386 flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
387 let primary_outputs = vec![("c1".into(), Some(false)), ("c2".into(), Some(false))];
388 let res = validate_or_infer_primary_output(&flows, &primary_outputs);
389 assert!(res.is_err());
390 }
391
392 #[rstest]
393 fn all_inputs(
394 #[from(svd_commodity)] commodity1: Commodity,
395 #[from(svd_commodity)] commodity2: Commodity,
396 ) {
397 let c1 = Rc::new(Commodity {
398 id: "c1".into(),
399 ..commodity1
400 });
401 let c2 = Rc::new(Commodity {
402 id: "c2".into(),
403 ..commodity2
404 });
405 let mut flows = IndexMap::new();
406 flows.insert("c1".into(), flow(Rc::clone(&c1), -1.0));
407 flows.insert("c2".into(), flow(Rc::clone(&c2), -2.0));
408 let primary_outputs = vec![("c1".into(), None), ("c2".into(), None)];
409 let res = validate_or_infer_primary_output(&flows, &primary_outputs).unwrap();
410 assert_eq!(res, None);
411 }
412
413 #[rstest]
414 fn multiple_outputs_multiple_explicit_primaries_should_error(
415 #[from(svd_commodity)] commodity1: Commodity,
416 #[from(svd_commodity)] commodity2: Commodity,
417 ) {
418 let c1 = Rc::new(Commodity {
419 id: "c1".into(),
420 ..commodity1
421 });
422 let c2 = Rc::new(Commodity {
423 id: "c2".into(),
424 ..commodity2
425 });
426 let mut flows = IndexMap::new();
427 flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
428 flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
429 let primary_outputs = vec![("c1".into(), Some(true)), ("c2".into(), Some(true))];
430 let res = validate_or_infer_primary_output(&flows, &primary_outputs);
431 assert!(res.is_err());
432 }
433
434 #[rstest]
435 fn explicit_primary_on_input_should_error(
436 #[from(svd_commodity)] commodity1: Commodity,
437 #[from(svd_commodity)] commodity2: Commodity,
438 ) {
439 let c1 = Rc::new(Commodity {
440 id: "c1".into(),
441 ..commodity1
442 });
443 let c2 = Rc::new(Commodity {
444 id: "c2".into(),
445 ..commodity2
446 });
447 let mut flows = IndexMap::new();
448 flows.insert("c1".into(), flow(Rc::clone(&c1), -1.0));
449 flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
450 let primary_outputs = vec![("c1".into(), Some(true)), ("c2".into(), None)];
451 let res = validate_or_infer_primary_output(&flows, &primary_outputs);
452 assert!(res.is_err());
453 }
454}