1use super::super::*;
3use crate::commodity::{CommodityID, CommodityMap};
4use crate::process::{FlowType, ProcessFlow, ProcessFlowsMap, ProcessID, ProcessMap};
5use crate::region::{parse_region_str, RegionID};
6use crate::units::{FlowPerActivity, MoneyPerFlow};
7use crate::year::parse_year_str;
8use anyhow::{ensure, Context, Result};
9use itertools::iproduct;
10use serde::Deserialize;
11use std::collections::HashMap;
12use std::path::Path;
13use std::rc::Rc;
14
15const PROCESS_FLOWS_FILE_NAME: &str = "process_flows.csv";
16
17type PrimaryOutputsKeys = (ProcessID, RegionID, u32);
18type PrimaryOutputsValues = Vec<(CommodityID, Option<bool>)>;
19type PrimaryOutputsMap = HashMap<PrimaryOutputsKeys, PrimaryOutputsValues>;
20
21#[derive(PartialEq, Debug, Deserialize)]
22struct ProcessFlowRaw {
23 process_id: String,
24 commodity_id: String,
25 years: String,
26 regions: String,
27 coeff: FlowPerActivity,
28 #[serde(default)]
29 #[serde(rename = "type")]
30 kind: FlowType,
31 cost: Option<MoneyPerFlow>,
32 is_primary_output: Option<bool>,
33}
34
35impl ProcessFlowRaw {
36 fn validate(&self) -> Result<()> {
37 ensure!(
39 self.coeff.is_normal(),
40 "Invalid value for coeff ({})",
41 self.coeff
42 );
43
44 ensure!(
46 self.kind == FlowType::Fixed,
47 "Commodity flexible assets are not currently supported"
48 );
49
50 if let Some(cost) = self.cost {
52 ensure!(
53 (0.0..f64::INFINITY).contains(&cost.value()),
54 "Invalid value for flow cost ({cost}). Must be >=0."
55 )
56 }
57
58 Ok(())
59 }
60}
61
62pub fn read_process_flows(
64 model_dir: &Path,
65 processes: &ProcessMap,
66 commodities: &CommodityMap,
67) -> Result<HashMap<ProcessID, ProcessFlowsMap>> {
68 let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
69 let process_flow_csv = read_csv(&file_path)?;
70 read_process_flows_from_iter(process_flow_csv, processes, commodities)
71 .with_context(|| input_err_msg(&file_path))
72}
73
74fn read_process_flows_from_iter<I>(
76 iter: I,
77 processes: &ProcessMap,
78 commodities: &CommodityMap,
79) -> Result<HashMap<ProcessID, ProcessFlowsMap>>
80where
81 I: Iterator<Item = ProcessFlowRaw>,
82{
83 let mut flows_map: HashMap<ProcessID, ProcessFlowsMap> = HashMap::new();
84 let mut primary_outputs = PrimaryOutputsMap::new();
85 for record in iter {
86 record.validate()?;
87
88 let (id, process) = processes
90 .get_key_value(record.process_id.as_str())
91 .with_context(|| format!("Process {} not found", record.process_id))?;
92
93 let process_regions = &process.regions;
95 let record_regions =
96 parse_region_str(&record.regions, process_regions).with_context(|| {
97 format!("Invalid region for process {id}. Valid regions are {process_regions:?}")
98 })?;
99
100 let process_years = &process.years;
102 let record_years = parse_year_str(&record.years, process_years).with_context(|| {
103 format!("Invalid year for process {id}. Valid years are {process_years:?}")
104 })?;
105
106 let commodity = commodities
108 .get(record.commodity_id.as_str())
109 .with_context(|| format!("{} is not a valid commodity ID", &record.commodity_id))?;
110
111 let process_flow = ProcessFlow {
113 commodity: Rc::clone(commodity),
114 coeff: record.coeff,
115 kind: FlowType::Fixed,
116 cost: record.cost.unwrap_or(MoneyPerFlow(0.0)),
117 is_primary_output: false, };
119
120 let region_year_map = flows_map.entry(id.clone()).or_default();
122 for (year, region_id) in iproduct!(record_years, record_regions.iter()) {
123 let flows_map = region_year_map
124 .entry((region_id.clone(), year))
125 .or_default();
126 let existing = flows_map
127 .insert(commodity.id.clone(), process_flow.clone())
128 .is_some();
129 ensure!(
130 !existing,
131 "Duplicate process flow entry for region {}, year {} and commodity {}",
132 region_id,
133 year,
134 commodity.id
135 );
136
137 primary_outputs
138 .entry((id.clone(), region_id.clone(), year))
139 .or_insert_with(|| Vec::with_capacity(1))
140 .push((commodity.id.clone(), record.is_primary_output))
141 }
142 }
143
144 validate_flows_and_update_primary_output(processes, &mut flows_map, &primary_outputs)?;
145
146 Ok(flows_map)
147}
148
149fn validate_flows_and_update_primary_output(
150 processes: &ProcessMap,
151 flows_map: &mut HashMap<ProcessID, ProcessFlowsMap>,
152 primary_outputs: &PrimaryOutputsMap,
153) -> Result<()> {
154 for (process_id, map) in flows_map.iter_mut() {
155 let process = processes.get(process_id).unwrap();
156 for (&year, region_id) in iproduct!(process.years.iter(), process.regions.iter()) {
157 let Some(flows) = map.get_mut(&(region_id.clone(), year)) else {
159 bail!("Missing entry for process {process_id} in {region_id}/{year}");
160 };
161
162 let primary_outputs = primary_outputs
163 .get(&(process_id.clone(), region_id.clone(), year))
164 .unwrap();
165
166 let inferred_primary_output = validate_or_infer_primary_output(flows, primary_outputs)
167 .with_context(|| {
168 format!(
169 "Invalid primary output configuration for process {} (region: {}, year: {})",
170 process_id, region_id, year
171 )
172 })?;
173
174 if let Some(primary_output) = inferred_primary_output {
177 flows.get_mut(&primary_output).unwrap().is_primary_output = true;
178 }
179 }
180 }
181
182 Ok(())
183}
184
185fn validate_or_infer_primary_output(
186 flows_map: &IndexMap<CommodityID, ProcessFlow>,
187 primary_outputs: &PrimaryOutputsValues,
188) -> Result<Option<CommodityID>> {
189 let mut has_primary = false;
190 let mut output_flow = None;
191 let mut outputs_count = 0;
192 for (commodity_id, is_primary_output) in primary_outputs.iter() {
193 let is_output = flows_map.get(commodity_id).unwrap().is_output();
194 if is_output {
195 outputs_count += 1;
196 }
197 match *is_primary_output {
198 Some(true) => {
199 ensure!(
200 is_output,
201 "Commodity {commodity_id} cannot be the primary output as it is an input flow"
202 );
203 ensure!(
204 !has_primary,
205 "Multiple commodities designated as primary outputs"
206 );
207 has_primary = true;
208 }
209 None if is_output => {
210 output_flow = Some(commodity_id.clone());
211 }
212 _ => {}
213 }
214 }
215
216 if has_primary || outputs_count == 0 {
218 return Ok(None);
219 }
220
221 ensure!(
222 output_flow.is_some(),
223 "There are one or more output flows, but is_primary_output is explicitly set to false for these");
224
225 ensure!(
226 outputs_count == 1,
227 "There is more than one output flow, so one must be explicitly designated as the primary output");
228
229 Ok(output_flow)
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use crate::commodity::Commodity;
237 use crate::fixture::svd_commodity;
238
239 use rstest::rstest;
240 use std::rc::Rc;
241
242 fn flow(commodity: Rc<Commodity>, coeff: f64) -> ProcessFlow {
243 ProcessFlow {
244 commodity,
245 coeff: FlowPerActivity(coeff),
246 kind: FlowType::Fixed,
247 cost: MoneyPerFlow(0.0),
248 is_primary_output: false,
249 }
250 }
251
252 fn create_process_flow_raw(
253 coeff: FlowPerActivity,
254 cost: Option<MoneyPerFlow>,
255 is_primary_output: Option<bool>,
256 ) -> ProcessFlowRaw {
257 ProcessFlowRaw {
258 process_id: "process".into(),
259 commodity_id: "commodity".into(),
260 years: "2020".into(),
261 regions: "region".into(),
262 coeff,
263 kind: FlowType::Fixed,
264 cost,
265 is_primary_output,
266 }
267 }
268
269 #[test]
270 fn test_validate_flow_raw() {
271 let valid =
273 create_process_flow_raw(FlowPerActivity(1.0), Some(MoneyPerFlow(0.0)), Some(false));
274 assert!(valid.validate().is_ok());
275
276 let invalid =
278 create_process_flow_raw(FlowPerActivity(0.0), Some(MoneyPerFlow(0.0)), Some(false));
279 assert!(invalid.validate().is_err());
280 let invalid = create_process_flow_raw(
281 FlowPerActivity(f64::NAN),
282 Some(MoneyPerFlow(0.0)),
283 Some(false),
284 );
285 assert!(invalid.validate().is_err());
286 let invalid = create_process_flow_raw(
287 FlowPerActivity(f64::INFINITY),
288 Some(MoneyPerFlow(0.0)),
289 Some(false),
290 );
291 assert!(invalid.validate().is_err());
292 let invalid = create_process_flow_raw(
293 FlowPerActivity(f64::NEG_INFINITY),
294 Some(MoneyPerFlow(0.0)),
295 Some(false),
296 );
297 assert!(invalid.validate().is_err());
298
299 let invalid = create_process_flow_raw(
301 FlowPerActivity(1.0),
302 Some(MoneyPerFlow(f64::NAN)),
303 Some(false),
304 );
305 assert!(invalid.validate().is_err());
306 let invalid = create_process_flow_raw(
307 FlowPerActivity(1.0),
308 Some(MoneyPerFlow(f64::NEG_INFINITY)),
309 Some(false),
310 );
311 assert!(invalid.validate().is_err());
312 let invalid = create_process_flow_raw(
313 FlowPerActivity(1.0),
314 Some(MoneyPerFlow(f64::INFINITY)),
315 Some(false),
316 );
317 assert!(invalid.validate().is_err());
318 }
319
320 #[rstest]
321 fn single_output_explicit_primary(#[from(svd_commodity)] commodity: Commodity) {
322 let c1 = Rc::new(commodity);
323 let mut flows = IndexMap::new();
324 flows.insert("commodity1".into(), flow(Rc::clone(&c1), 1.0));
325 let primary_outputs = vec![("commodity1".into(), Some(true))];
326 let res = validate_or_infer_primary_output(&flows, &primary_outputs).unwrap();
327 assert_eq!(res, None);
328 }
329
330 #[rstest]
331 fn multiple_outputs_one_explicit_primary(
332 #[from(svd_commodity)] commodity1: Commodity,
333 #[from(svd_commodity)] commodity2: Commodity,
334 ) {
335 let c1 = Rc::new(Commodity {
336 id: "c1".into(),
337 ..commodity1
338 });
339 let c2 = Rc::new(Commodity {
340 id: "c2".into(),
341 ..commodity2
342 });
343 let mut flows = IndexMap::new();
344 flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
345 flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
346 let primary_outputs = vec![("c1".into(), Some(true)), ("c2".into(), None)];
347 let res = validate_or_infer_primary_output(&flows, &primary_outputs).unwrap();
348 assert_eq!(res, None);
349 }
350
351 #[rstest]
352 fn multiple_outputs_none_explicit_should_error(
353 #[from(svd_commodity)] commodity1: Commodity,
354 #[from(svd_commodity)] commodity2: Commodity,
355 ) {
356 let c1 = Rc::new(Commodity {
357 id: "c1".into(),
358 ..commodity1
359 });
360 let c2 = Rc::new(Commodity {
361 id: "c2".into(),
362 ..commodity2
363 });
364 let mut flows = IndexMap::new();
365 flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
366 flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
367 let primary_outputs = vec![("c1".into(), None), ("c2".into(), None)];
368 let res = validate_or_infer_primary_output(&flows, &primary_outputs);
369 assert!(res.is_err());
370 }
371
372 #[rstest]
373 fn multiple_outputs_all_explicit_false_should_error(
374 #[from(svd_commodity)] commodity1: Commodity,
375 #[from(svd_commodity)] commodity2: Commodity,
376 ) {
377 let c1 = Rc::new(Commodity {
378 id: "c1".into(),
379 ..commodity1
380 });
381 let c2 = Rc::new(Commodity {
382 id: "c2".into(),
383 ..commodity2
384 });
385 let mut flows = IndexMap::new();
386 flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
387 flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
388 let primary_outputs = vec![("c1".into(), Some(false)), ("c2".into(), Some(false))];
389 let res = validate_or_infer_primary_output(&flows, &primary_outputs);
390 assert!(res.is_err());
391 }
392
393 #[rstest]
394 fn all_inputs(
395 #[from(svd_commodity)] commodity1: Commodity,
396 #[from(svd_commodity)] commodity2: Commodity,
397 ) {
398 let c1 = Rc::new(Commodity {
399 id: "c1".into(),
400 ..commodity1
401 });
402 let c2 = Rc::new(Commodity {
403 id: "c2".into(),
404 ..commodity2
405 });
406 let mut flows = IndexMap::new();
407 flows.insert("c1".into(), flow(Rc::clone(&c1), -1.0));
408 flows.insert("c2".into(), flow(Rc::clone(&c2), -2.0));
409 let primary_outputs = vec![("c1".into(), None), ("c2".into(), None)];
410 let res = validate_or_infer_primary_output(&flows, &primary_outputs).unwrap();
411 assert_eq!(res, None);
412 }
413
414 #[rstest]
415 fn multiple_outputs_multiple_explicit_primaries_should_error(
416 #[from(svd_commodity)] commodity1: Commodity,
417 #[from(svd_commodity)] commodity2: Commodity,
418 ) {
419 let c1 = Rc::new(Commodity {
420 id: "c1".into(),
421 ..commodity1
422 });
423 let c2 = Rc::new(Commodity {
424 id: "c2".into(),
425 ..commodity2
426 });
427 let mut flows = IndexMap::new();
428 flows.insert("c1".into(), flow(Rc::clone(&c1), 1.0));
429 flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
430 let primary_outputs = vec![("c1".into(), Some(true)), ("c2".into(), Some(true))];
431 let res = validate_or_infer_primary_output(&flows, &primary_outputs);
432 assert!(res.is_err());
433 }
434
435 #[rstest]
436 fn explicit_primary_on_input_should_error(
437 #[from(svd_commodity)] commodity1: Commodity,
438 #[from(svd_commodity)] commodity2: Commodity,
439 ) {
440 let c1 = Rc::new(Commodity {
441 id: "c1".into(),
442 ..commodity1
443 });
444 let c2 = Rc::new(Commodity {
445 id: "c2".into(),
446 ..commodity2
447 });
448 let mut flows = IndexMap::new();
449 flows.insert("c1".into(), flow(Rc::clone(&c1), -1.0));
450 flows.insert("c2".into(), flow(Rc::clone(&c2), 2.0));
451 let primary_outputs = vec![("c1".into(), Some(true)), ("c2".into(), None)];
452 let res = validate_or_infer_primary_output(&flows, &primary_outputs);
453 assert!(res.is_err());
454 }
455}