1use super::super::*;
3use crate::commodity::{CommodityID, CommodityMap};
4use crate::process::{FlowType, ProcessFlow, ProcessFlowsMap, ProcessID, ProcessMap};
5use crate::region::parse_region_str;
6use crate::units::{FlowPerActivity, MoneyPerFlow};
7use crate::year::parse_year_str;
8use anyhow::{Context, Result, ensure};
9use itertools::iproduct;
10use serde::Deserialize;
11use std::collections::HashMap;
12use std::path::Path;
13use std::rc::Rc;
14
15const PROCESS_FLOWS_FILE_NAME: &str = "process_flows.csv";
16
17#[derive(PartialEq, Debug, Deserialize)]
18struct ProcessFlowRaw {
19 process_id: String,
20 commodity_id: String,
21 years: String,
22 regions: String,
23 coeff: FlowPerActivity,
24 #[serde(default)]
25 #[serde(rename = "type")]
26 kind: FlowType,
27 cost: Option<MoneyPerFlow>,
28}
29
30impl ProcessFlowRaw {
31 fn validate(&self) -> Result<()> {
32 ensure!(
34 self.coeff.is_normal(),
35 "Invalid value for coeff ({})",
36 self.coeff
37 );
38
39 ensure!(
41 self.kind == FlowType::Fixed,
42 "Commodity flexible assets are not currently supported"
43 );
44
45 if let Some(cost) = self.cost {
47 ensure!(
48 (0.0..f64::INFINITY).contains(&cost.value()),
49 "Invalid value for flow cost ({cost}). Must be >=0."
50 )
51 }
52
53 Ok(())
54 }
55}
56
57pub fn read_process_flows(
59 model_dir: &Path,
60 processes: &mut ProcessMap,
61 commodities: &CommodityMap,
62) -> Result<HashMap<ProcessID, ProcessFlowsMap>> {
63 let file_path = model_dir.join(PROCESS_FLOWS_FILE_NAME);
64 let process_flow_csv = read_csv(&file_path)?;
65 read_process_flows_from_iter(process_flow_csv, processes, commodities)
66 .with_context(|| input_err_msg(&file_path))
67}
68
69fn read_process_flows_from_iter<I>(
71 iter: I,
72 processes: &mut ProcessMap,
73 commodities: &CommodityMap,
74) -> Result<HashMap<ProcessID, ProcessFlowsMap>>
75where
76 I: Iterator<Item = ProcessFlowRaw>,
77{
78 let mut flows_map: HashMap<ProcessID, ProcessFlowsMap> = HashMap::new();
79 for record in iter {
80 record.validate()?;
81
82 let (id, process) = processes
84 .get_key_value(record.process_id.as_str())
85 .with_context(|| format!("Process {} not found", record.process_id))?;
86
87 let process_regions = &process.regions;
89 let record_regions =
90 parse_region_str(&record.regions, process_regions).with_context(|| {
91 format!("Invalid region for process {id}. Valid regions are {process_regions:?}")
92 })?;
93
94 let process_years = &process.years;
96 let record_years = parse_year_str(&record.years, process_years).with_context(|| {
97 format!("Invalid year for process {id}. Valid years are {process_years:?}")
98 })?;
99
100 let commodity = commodities
102 .get(record.commodity_id.as_str())
103 .with_context(|| format!("{} is not a valid commodity ID", &record.commodity_id))?;
104
105 let process_flow = ProcessFlow {
107 commodity: Rc::clone(commodity),
108 coeff: record.coeff,
109 kind: FlowType::Fixed,
110 cost: record.cost.unwrap_or(MoneyPerFlow(0.0)),
111 };
112
113 let region_year_map = flows_map.entry(id.clone()).or_default();
115 for (year, region_id) in iproduct!(record_years, record_regions.iter()) {
116 let flows_map = region_year_map
117 .entry((region_id.clone(), year))
118 .or_default();
119 let existing = flows_map
120 .insert(commodity.id.clone(), process_flow.clone())
121 .is_some();
122 ensure!(
123 !existing,
124 "Duplicate process flow entry for region {}, year {} and commodity {}",
125 region_id,
126 year,
127 commodity.id
128 );
129 }
130 }
131
132 validate_flows_and_update_primary_output(processes, &flows_map)?;
133
134 Ok(flows_map)
135}
136
137fn validate_flows_and_update_primary_output(
138 processes: &mut ProcessMap,
139 flows_map: &HashMap<ProcessID, ProcessFlowsMap>,
140) -> Result<()> {
141 for (process_id, process) in processes.iter_mut() {
142 let map = flows_map
143 .get(process_id)
144 .with_context(|| format!("Missing flows map for process {process_id}"))?;
145
146 ensure!(
147 map.len() == process.years.len() * process.regions.len(),
148 "Flows map for process {process_id} does not cover all regions and years"
149 );
150
151 let mut iter = iproduct!(process.years.iter(), process.regions.iter());
152
153 let primary_output = match &process.primary_output {
154 Some(primary_output) => Some(primary_output.clone()),
155 None => {
156 let (year, region_id) = iter.next().unwrap();
157 infer_primary_output(&map[&(region_id.clone(), *year)]).with_context(|| {
158 format!("Could not infer primary_output for process {process_id}")
159 })?
160 }
161 };
162
163 for (&year, region_id) in iter {
164 let flows = &map[&(region_id.clone(), year)];
165
166 check_flows_primary_output(flows, &primary_output).with_context(|| {
168 format!(
169 "Invalid primary output configuration for process {process_id} \
170 (region: {region_id}, year: {year})"
171 )
172 })?;
173 }
174
175 if process.primary_output != primary_output {
177 Rc::get_mut(process).unwrap().primary_output = primary_output;
179 }
180 }
181
182 Ok(())
183}
184
185fn infer_primary_output(map: &IndexMap<CommodityID, ProcessFlow>) -> Result<Option<CommodityID>> {
189 let mut iter = map
190 .iter()
191 .filter_map(|(commodity_id, flow)| flow.is_output().then_some(commodity_id));
192
193 let Some(first_output) = iter.next() else {
194 return Ok(None);
196 };
197
198 ensure!(
199 iter.next().is_none(),
200 "Need to specify primary_output explicitly if there are multiple output flows"
201 );
202
203 Ok(Some(first_output.clone()))
204}
205
206fn check_flows_primary_output(
208 flows_map: &IndexMap<CommodityID, ProcessFlow>,
209 primary_output: &Option<CommodityID>,
210) -> Result<()> {
211 if let Some(primary_output) = primary_output {
212 let flow = flows_map.get(primary_output).with_context(|| {
213 format!("Primary output commodity '{primary_output}' isn't a process flow",)
214 })?;
215
216 ensure!(
217 flow.is_output(),
218 "Primary output commodity '{primary_output}' isn't an output flow",
219 );
220 } else {
221 ensure!(
222 flows_map.values().all(|flow| flow.is_input()),
223 "First year is only inputs, but subsequent years have outputs, although no primary \
224 output is specified"
225 );
226 }
227
228 Ok(())
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use crate::commodity::Commodity;
235 use crate::fixture::{assert_error, process, sed_commodity, svd_commodity};
236 use crate::process::{FlowType, Process, ProcessFlow, ProcessMap};
237 use crate::units::{FlowPerActivity, MoneyPerFlow};
238 use indexmap::IndexMap;
239 use map_macro::hash_map;
240 use rstest::rstest;
241 use std::iter;
242 use std::rc::Rc;
243
244 fn flow(commodity: Rc<Commodity>, coeff: f64) -> ProcessFlow {
245 ProcessFlow {
246 commodity,
247 coeff: FlowPerActivity(coeff),
248 kind: FlowType::Fixed,
249 cost: MoneyPerFlow(0.0),
250 }
251 }
252
253 fn build_maps<I>(
254 process: Process,
255 flows: I,
256 ) -> (ProcessMap, HashMap<ProcessID, ProcessFlowsMap>)
257 where
258 I: Clone + Iterator<Item = (CommodityID, ProcessFlow)>,
259 {
260 let map: IndexMap<CommodityID, ProcessFlow> = flows.clone().collect();
261 let flows_inner = iproduct!(&process.regions, &process.years)
262 .map(|(region_id, year)| ((region_id.clone(), *year), map.clone()))
263 .collect();
264 let flows = hash_map! {process.id.clone() => flows_inner};
265 let processes = iter::once((process.id.clone(), process.into())).collect();
266
267 (processes, flows)
268 }
269
270 #[rstest]
271 fn single_output_infer_primary(#[from(svd_commodity)] commodity: Commodity, process: Process) {
272 let commodity = Rc::new(commodity);
273 let (mut processes, flows_map) = build_maps(
274 process,
275 std::iter::once((commodity.id.clone(), flow(commodity.clone(), 1.0))),
276 );
277 assert!(validate_flows_and_update_primary_output(&mut processes, &flows_map).is_ok());
278 assert_eq!(
279 processes.values().exactly_one().unwrap().primary_output,
280 Some(commodity.id.clone())
281 );
282 }
283
284 #[rstest]
285 fn multiple_outputs_error(
286 #[from(svd_commodity)] commodity1: Commodity,
287 #[from(sed_commodity)] commodity2: Commodity,
288 process: Process,
289 ) {
290 let commodity1 = Rc::new(commodity1);
291 let commodity2 = Rc::new(commodity2);
292 let (mut processes, flows_map) = build_maps(
293 process,
294 [
295 (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
296 (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
297 ]
298 .into_iter(),
299 );
300 let res = validate_flows_and_update_primary_output(&mut processes, &flows_map);
301 assert_error!(res, "Could not infer primary_output for process process1");
302 }
303
304 #[rstest]
305 fn explicit_primary_output(
306 #[from(svd_commodity)] commodity1: Commodity,
307 #[from(sed_commodity)] commodity2: Commodity,
308 process: Process,
309 ) {
310 let commodity1 = Rc::new(commodity1);
311 let commodity2 = Rc::new(commodity2);
312 let mut process = process;
313 process.primary_output = Some(commodity2.id.clone());
314 let (mut processes, flows_map) = build_maps(
315 process,
316 [
317 (commodity1.id.clone(), flow(commodity1.clone(), 1.0)),
318 (commodity2.id.clone(), flow(commodity2.clone(), 2.0)),
319 ]
320 .into_iter(),
321 );
322 assert!(validate_flows_and_update_primary_output(&mut processes, &flows_map).is_ok());
323 assert_eq!(
324 processes.values().exactly_one().unwrap().primary_output,
325 Some(commodity2.id.clone())
326 );
327 }
328
329 #[rstest]
330 fn all_inputs_no_primary(
331 #[from(svd_commodity)] commodity1: Commodity,
332 #[from(sed_commodity)] commodity2: Commodity,
333 process: Process,
334 ) {
335 let commodity1 = Rc::new(commodity1);
336 let commodity2 = Rc::new(commodity2);
337 let (mut processes, flows_map) = build_maps(
338 process,
339 [
340 (commodity1.id.clone(), flow(commodity1.clone(), -1.0)),
341 (commodity2.id.clone(), flow(commodity2.clone(), -2.0)),
342 ]
343 .into_iter(),
344 );
345 assert!(validate_flows_and_update_primary_output(&mut processes, &flows_map).is_ok());
346 assert_eq!(
347 processes.values().exactly_one().unwrap().primary_output,
348 None
349 );
350 }
351}