1use super::super::{input_err_msg, read_csv};
3use crate::commodity::{CommodityID, CommodityMap};
4use crate::process::{FlowType, ProcessFlow, ProcessFlowsMap, ProcessID, ProcessMap};
5use crate::region::parse_region_str;
6use crate::units::{FlowPerActivity, MoneyPerFlow};
7use crate::year::parse_year_str;
8use anyhow::{Context, Result, ensure};
9use indexmap::IndexMap;
10use itertools::iproduct;
11use serde::Deserialize;
12use std::collections::HashMap;
13use std::path::Path;
14use std::rc::Rc;
15
16const PROCESS_FLOWS_FILE_NAME: &str = "process_flows.csv";
17
18#[derive(PartialEq, Debug, Deserialize)]
19struct ProcessFlowRaw {
20 process_id: String,
21 commodity_id: String,
22 years: String,
23 regions: String,
24 coeff: FlowPerActivity,
25 #[serde(default)]
26 #[serde(rename = "type")]
27 kind: FlowType,
28 cost: Option<MoneyPerFlow>,
29}
30
31impl ProcessFlowRaw {
32 fn validate(&self) -> Result<()> {
33 ensure!(
35 self.coeff.is_normal(),
36 "Invalid value for coeff ({})",
37 self.coeff
38 );
39
40 ensure!(
42 self.kind == FlowType::Fixed,
43 "Commodity flexible assets are not currently supported"
44 );
45
46 if let Some(cost) = self.cost {
48 ensure!(
49 (0.0..f64::INFINITY).contains(&cost.value()),
50 "Invalid value for flow cost ({cost}). Must be >=0."
51 );
52 }
53
54 Ok(())
55 }
56}
57
58pub fn read_process_flows(
60 model_dir: &Path,
61 processes: &mut ProcessMap,
62 commodities: &CommodityMap,
63) -> Result<HashMap<ProcessID, ProcessFlowsMap>> {
64 let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
65 let process_flow_csv = read_csv(&file_path)?;
66 read_process_flows_from_iter(process_flow_csv, processes, commodities)
67 .with_context(|| input_err_msg(&file_path))
68}
69
70fn read_process_flows_from_iter<I>(
72 iter: I,
73 processes: &mut ProcessMap,
74 commodities: &CommodityMap,
75) -> Result<HashMap<ProcessID, ProcessFlowsMap>>
76where
77 I: Iterator<Item = ProcessFlowRaw>,
78{
79 let mut flows_map: HashMap<ProcessID, ProcessFlowsMap> = HashMap::new();
80 for record in iter {
81 record.validate()?;
82
83 let (id, process) = processes
85 .get_key_value(record.process_id.as_str())
86 .with_context(|| format!("Process {} not found", record.process_id))?;
87
88 let process_regions = &process.regions;
90 let record_regions =
91 parse_region_str(&record.regions, process_regions).with_context(|| {
92 format!("Invalid region for process {id}. Valid regions are {process_regions:?}")
93 })?;
94
95 let process_years = &process.years;
97 let record_years = parse_year_str(&record.years, process_years).with_context(|| {
98 format!("Invalid year for process {id}. Valid years are {process_years:?}")
99 })?;
100
101 let commodity = commodities
103 .get(record.commodity_id.as_str())
104 .with_context(|| format!("{} is not a valid commodity ID", &record.commodity_id))?;
105
106 let process_flow = ProcessFlow {
108 commodity: Rc::clone(commodity),
109 coeff: record.coeff,
110 kind: FlowType::Fixed,
111 cost: record.cost.unwrap_or(MoneyPerFlow(0.0)),
112 };
113
114 let region_year_map = flows_map.entry(id.clone()).or_default();
116 for (year, region_id) in iproduct!(record_years, record_regions.iter()) {
117 let flows_map = region_year_map
118 .entry((region_id.clone(), year))
119 .or_default();
120 let existing = Rc::get_mut(flows_map)
121 .unwrap() .insert(commodity.id.clone(), process_flow.clone())
123 .is_some();
124 ensure!(
125 !existing,
126 "Duplicate process flow entry for region {}, year {} and commodity {}",
127 region_id,
128 year,
129 commodity.id
130 );
131 }
132 }
133
134 validate_flows_and_update_primary_output(processes, &flows_map)?;
135
136 Ok(flows_map)
137}
138
139fn validate_flows_and_update_primary_output(
140 processes: &mut ProcessMap,
141 flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
142) -> Result<()> {
143 for (process_id, process) in processes.iter_mut() {
144 let map = flows_map
145 .get(process_id)
146 .with_context(|| format!("Missing flows map for process {process_id}"))?;
147
148 ensure!(
149 map.len() == process.years.len() * process.regions.len(),
150 "Flows map for process {process_id} does not cover all regions and years"
151 );
152
153 let mut iter = iproduct!(process.years.iter(), process.regions.iter());
154
155 let primary_output = if let Some(primary_output) = &process.primary_output {
156 Some(primary_output.clone())
157 } else {
158 let (year, region_id) = iter.next().unwrap();
159 infer_primary_output(&map[&(region_id.clone(), *year)]).with_context(|| {
160 format!("Could not infer primary_output for process {process_id}")
161 })?
162 };
163
164 for (&year, region_id) in iter {
165 let flows = &map[&(region_id.clone(), year)];
166
167 check_flows_primary_output(flows, primary_output.as_ref()).with_context(|| {
169 format!(
170 "Invalid primary output configuration for process {process_id} \
171 (region: {region_id}, year: {year})"
172 )
173 })?;
174 }
175
176 if process.primary_output != primary_output {
178 Rc::get_mut(process).unwrap().primary_output = primary_output;
180 }
181 }
182
183 Ok(())
184}
185
186fn infer_primary_output(map: &IndexMap<CommodityID, ProcessFlow>) -> Result<Option<CommodityID>> {
190 let mut iter = map
191 .iter()
192 .filter_map(|(commodity_id, flow)| flow.is_output().then_some(commodity_id));
193
194 let Some(first_output) = iter.next() else {
195 return Ok(None);
197 };
198
199 ensure!(
200 iter.next().is_none(),
201 "Need to specify primary_output explicitly if there are multiple output flows"
202 );
203
204 Ok(Some(first_output.clone()))
205}
206
207fn check_flows_primary_output(
209 flows_map: &IndexMap<CommodityID, ProcessFlow>,
210 primary_output: Option<&CommodityID>,
211) -> Result<()> {
212 if let Some(primary_output) = primary_output {
213 let flow = flows_map.get(primary_output).with_context(|| {
214 format!("Primary output commodity '{primary_output}' isn't a process flow")
215 })?;
216
217 ensure!(
218 flow.is_output(),
219 "Primary output commodity '{primary_output}' isn't an output flow",
220 );
221 } else {
222 ensure!(
223 flows_map.values().all(ProcessFlow::is_input),
224 "First year is only inputs, but subsequent years have outputs, although no primary \
225 output is specified"
226 );
227 }
228
229 Ok(())
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::commodity::Commodity;
236 use crate::fixture::{assert_error, process, sed_commodity, svd_commodity};
237 use crate::process::{FlowType, Process, ProcessFlow, ProcessMap};
238 use crate::units::{FlowPerActivity, MoneyPerFlow};
239 use indexmap::IndexMap;
240 use itertools::Itertools;
241 use map_macro::hash_map;
242 use rstest::rstest;
243 use std::iter;
244 use std::rc::Rc;
245
246 fn flow(commodity: Rc<Commodity>, coeff: f64) -> ProcessFlow {
247 ProcessFlow {
248 commodity,
249 coeff: FlowPerActivity(coeff),
250 kind: FlowType::Fixed,
251 cost: MoneyPerFlow(0.0),
252 }
253 }
254
255 fn build_maps<I>(
256 process: Process,
257 flows: I,
258 ) -> (ProcessMap, HashMap<ProcessID, ProcessFlowsMap>)
259 where
260 I: Clone + Iterator<Item = (CommodityID, ProcessFlow)>,
261 {
262 let map: Rc<IndexMap<_, _>> = Rc::new(flows.clone().collect());
263 let flows_inner = iproduct!(&process.regions, &process.years)
264 .map(|(region_id, year)| ((region_id.clone(), *year), map.clone()))
265 .collect();
266 let flows = hash_map! {process.id.clone() => flows_inner};
267 let processes = iter::once((process.id.clone(), process.into())).collect();
268
269 (processes, flows)
270 }
271
272 #[rstest]
273 fn single_output_infer_primary(#[from(svd_commodity)] commodity: Commodity, process: Process) {
274 let commodity = Rc::new(commodity);
275 let (mut processes, flows_map) = build_maps(
276 process,
277 std::iter::once((commodity.id.clone(), flow(commodity.clone(), 1.0))),
278 );
279 assert!(validate_flows_and_update_primary_output(&mut processes, &flows_map).is_ok());
280 assert_eq!(
281 processes.values().exactly_one().unwrap().primary_output,
282 Some(commodity.id.clone())
283 );
284 }
285
286 #[rstest]
287 fn multiple_outputs_error(
288 #[from(svd_commodity)] commodity1: Commodity,
289 #[from(sed_commodity)] commodity2: Commodity,
290 process: Process,
291 ) {
292 let commodity1 = Rc::new(commodity1);
293 let commodity2 = Rc::new(commodity2);
294 let (mut processes, flows_map) = build_maps(
295 process,
296 [
297 (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
298 (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
299 ]
300 .into_iter(),
301 );
302 let res = validate_flows_and_update_primary_output(&mut processes, &flows_map);
303 assert_error!(res, "Could not infer primary_output for process process1");
304 }
305
306 #[rstest]
307 fn explicit_primary_output(
308 #[from(svd_commodity)] commodity1: Commodity,
309 #[from(sed_commodity)] commodity2: Commodity,
310 process: Process,
311 ) {
312 let commodity1 = Rc::new(commodity1);
313 let commodity2 = Rc::new(commodity2);
314 let mut process = process;
315 process.primary_output = Some(commodity2.id.clone());
316 let (mut processes, flows_map) = build_maps(
317 process,
318 [
319 (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
320 (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
321 ]
322 .into_iter(),
323 );
324 assert!(validate_flows_and_update_primary_output(&mut processes, &flows_map).is_ok());
325 assert_eq!(
326 processes.values().exactly_one().unwrap().primary_output,
327 Some(commodity2.id.clone())
328 );
329 }
330
331 #[rstest]
332 fn all_inputs_no_primary(
333 #[from(svd_commodity)] commodity1: Commodity,
334 #[from(sed_commodity)] commodity2: Commodity,
335 process: Process,
336 ) {
337 let commodity1 = Rc::new(commodity1);
338 let commodity2 = Rc::new(commodity2);
339 let (mut processes, flows_map) = build_maps(
340 process,
341 [
342 (commodity1.id.clone(), flow(commodity1.clone(), -1.0)),
343 (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
344 ]
345 .into_iter(),
346 );
347 assert!(validate_flows_and_update_primary_output(&mut processes, &flows_map).is_ok());
348 assert_eq!(
349 processes.values().exactly_one().unwrap().primary_output,
350 None
351 );
352 }
353}