Commit 721be71c by dongshufeng

prepare of tn input collection

parent 1397e3e8
pub mod dyn_topo; pub mod dyn_topo;
pub mod static_topo; pub mod static_topo;
pub mod tn_input;
pub const STATIC_TOPO_DF_NAME: &str = "static_topo"; pub const STATIC_TOPO_DF_NAME: &str = "static_topo";
pub const TERMINAL_DF_NAME: &str = "terminal_cn_dev"; pub const TERMINAL_DF_NAME: &str = "terminal_cn_dev";
......
use std::collections::HashMap;
use csv::StringRecordsIter;
use mems::model::dev::MeasPhase;
pub fn read_shunt_measures(records: &mut StringRecordsIter<&[u8]>)
-> Result<HashMap<u64, (u64, MeasPhase)>, String> {
let mut meas = HashMap::new();
// 按行读取csv
let mut row = 0;
loop {
match records.next() {
Some(Ok(record)) => {
let mut col = 0;
let mut point = 0u64;
let mut terminal = 0u64;
for str in record.iter() {
if col == 0 {
if let Ok(id) = str.parse() {
point = id;
} else {
return Err(format!("Wrong terminal input, row {row} col {col}"));
}
} else if col == 1 {
if let Ok(id) = str.parse() {
terminal = id;
} else {
return Err(format!("Wrong terminal input, row {row} col {col}"));
}
} else if col == 2 {
meas.insert(point, (terminal, MeasPhase::from(str)));
}
col += 1;
if col == 3 {
break;
}
}
if col != 3 {
return Err(format!("Wrong terminal input, expected col at least 3, actual {col}"));
}
}
Some(Err(e)) => {
return Err(format!("Wrong terminal input, err: {:?}", e));
}
None => {
break;
}
}
row += 1;
}
Ok(meas)
}
\ No newline at end of file
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use arrow_schema::{DataType, Field, Schema}; use arrow_schema::{DataType, Field, Schema};
use ds_common::{DEV_TOPO_DF_NAME, DYN_TOPO_DF_NAME, POINT_DF_NAME, STATIC_TOPO_DF_NAME, TERMINAL_DF_NAME}; use ds_common::{DEV_TOPO_DF_NAME, DYN_TOPO_DF_NAME, POINT_DF_NAME, SHUNT_MEAS_DF_NAME, STATIC_TOPO_DF_NAME, TERMINAL_DF_NAME};
use ds_common::dyn_topo::{read_dev_topo, read_dyn_topo}; use ds_common::dyn_topo::{read_dev_topo, read_dyn_topo};
use ds_common::static_topo::{read_point_terminal, read_static_topo, read_terminal_cn_dev}; use ds_common::static_topo::{read_point_terminal, read_static_topo, read_terminal_cn_dev};
use ds_common::tn_input::read_shunt_measures;
use eig_domain::DataUnit; use eig_domain::DataUnit;
use mems::model::{get_meas_from_plugin_input, get_wasm_result, PluginInput, PluginOutput}; use mems::model::{get_df_from_in_plugin, get_meas_from_plugin_input, get_wasm_result, PluginInput, PluginOutput};
use mems::model::dev::{MeasPhase, PsRsrType}; use mems::model::dev::{MeasPhase, PsRsrType};
#[no_mangle] #[no_mangle]
...@@ -16,12 +17,8 @@ pub unsafe fn run(ptr: i32, len: u32) -> u64 { ...@@ -16,12 +17,8 @@ pub unsafe fn run(ptr: i32, len: u32) -> u64 {
let input: PluginInput = serde_cbor::from_slice(slice).unwrap(); let input: PluginInput = serde_cbor::from_slice(slice).unwrap();
input input
}; };
let from = 0; let r2 = get_df_from_in_plugin(&input);
let mut error = None; let mut error = None;
let r1 = get_meas_from_plugin_input(&input);
if let Err(s) = &r1 {
error = Some(s.clone());
}
// static topo // static topo
// point, terminal // point, terminal
let mut points: Vec<Vec<u64>> = vec![]; let mut points: Vec<Vec<u64>> = vec![];
...@@ -34,46 +31,60 @@ pub unsafe fn run(ptr: i32, len: u32) -> u64 { ...@@ -34,46 +31,60 @@ pub unsafe fn run(ptr: i32, len: u32) -> u64 {
let mut dyn_topo: Vec<Vec<u64>>; let mut dyn_topo: Vec<Vec<u64>>;
// terminal, cn, tn, dev // terminal, cn, tn, dev
let mut dyn_dev_topo: Vec<Vec<u64>> = vec![]; let mut dyn_dev_topo: Vec<Vec<u64>> = vec![];
for i in 0..input.dfs_len.len() { // key is point id, value is (terminal id, measure phase)
let size = input.dfs_len[i] as usize; let mut point_terminal: HashMap<u64, (u64, MeasPhase)> = HashMap::with_capacity(0);
let end = from + size; let mut with_static = false;
let mut rdr = csv::ReaderBuilder::new().has_headers(true).from_reader(&input.bytes[from..end]); if let Err(s) = &r2 {
let mut records = rdr.records(); error = Some(s.clone());
// 对第i个边输入该节点的 dataframe 进行处理 } else {
if input.dfs[i] == DYN_TOPO_DF_NAME { let mut from = r2.unwrap();
match read_dyn_topo(&mut records) { for i in 0..input.dfs_len.len() {
Ok(v) => dyn_topo = v, let size = input.dfs_len[i] as usize;
Err(s) => { let end = from + size;
error = Some(s); let mut rdr = csv::ReaderBuilder::new().has_headers(true).from_reader(&input.bytes[from..end]);
break; let mut records = rdr.records();
// 对第i个边输入该节点的 dataframe 进行处理
if input.dfs[i] == DYN_TOPO_DF_NAME {
match read_dyn_topo(&mut records) {
Ok(v) => dyn_topo = v,
Err(s) => {
error = Some(s);
break;
}
} }
} } else if input.dfs[i] == DEV_TOPO_DF_NAME {
} else if input.dfs[i] == DEV_TOPO_DF_NAME { match read_dev_topo(&mut records) {
match read_dev_topo(&mut records) { Ok(v) => dyn_dev_topo = v,
Ok(v) => dyn_dev_topo = v, Err(s) => {
Err(s) => { error = Some(s);
error = Some(s); break;
break; }
}
} else if input.dfs[i] == STATIC_TOPO_DF_NAME {
with_static = true;
match read_static_topo(&mut records, None, Some(&mut dev_type)) {
Ok(_) => {},
Err(s) => error = Some(s),
}
} else if input.dfs[i] == TERMINAL_DF_NAME {
match read_terminal_cn_dev(&mut records) {
Ok(v) => terminals = v,
Err(s) => error = Some(s),
}
} else if input.dfs[i] == POINT_DF_NAME {
match read_point_terminal(&mut records, Some(&mut meas_phase)) {
Ok(v) => points = v,
Err(s) => error = Some(s),
}
} else if input.dfs[i] == SHUNT_MEAS_DF_NAME {
match read_shunt_measures(&mut records) {
Ok(v) => point_terminal = v,
Err(s) => error = Some(s),
} }
}
} else if input.dfs[i] == STATIC_TOPO_DF_NAME {
match read_static_topo(&mut records, None, Some(&mut dev_type)) {
Ok(_) => {},
Err(s) => error = Some(s),
}
} else if input.dfs[i] == TERMINAL_DF_NAME {
match read_terminal_cn_dev(&mut records) {
Ok(v) => terminals = v,
Err(s) => error = Some(s),
}
} else if input.dfs[i] == POINT_DF_NAME {
match read_point_terminal(&mut records, Some(&mut meas_phase)) {
Ok(v) => points = v,
Err(s) => error = Some(s),
} }
} }
} }
if error.is_none() { if error.is_some() {
let output = PluginOutput { let output = PluginOutput {
error_msg: error, error_msg: error,
schema: None, schema: None,
...@@ -81,72 +92,92 @@ pub unsafe fn run(ptr: i32, len: u32) -> u64 { ...@@ -81,72 +92,92 @@ pub unsafe fn run(ptr: i32, len: u32) -> u64 {
}; };
get_wasm_result(output) get_wasm_result(output)
} else { } else {
let mut csv_str = String::from("cn,tn\n"); if with_static {
let type1 = PsRsrType::SyncGenerator as u16; let mut csv_str = String::from("point,terminal,phase\n");
let type2 = PsRsrType::Load as u16; let type1 = PsRsrType::SyncGenerator as u16;
let type3 = PsRsrType::ShuntCompensator as u16; let type2 = PsRsrType::Load as u16;
let shunt_types = [type1, type2, type3]; let type3 = PsRsrType::ShuntCompensator as u16;
let mut terminal_with_shunt_dev = HashSet::new(); let shunt_types = [type1, type2, type3];
for v in terminals { let mut terminal_with_shunt_dev = HashSet::new();
let terminal = v[0]; for v in terminals {
let dev_id = v[2]; let terminal = v[0];
if let Some(dev_type) = dev_type.get(&dev_id) { let dev_id = v[2];
if shunt_types.contains(&dev_type) { if let Some(dev_type) = dev_type.get(&dev_id) {
terminal_with_shunt_dev.insert(terminal); if shunt_types.contains(&dev_type) {
terminal_with_shunt_dev.insert(terminal);
}
} }
} }
} let mut point_terminal = HashMap::with_capacity(points.len());
let mut point_terminal = HashMap::with_capacity(points.len()); for i in 0..points.len() {
for i in 0..points.len() { let point_id = points[i][0];
let point_id = points[i][0]; let terminal = points[i][1];
let terminal = points[i][1]; if terminal_with_shunt_dev.contains(&terminal) {
if terminal_with_shunt_dev.contains(&terminal) { let phase = meas_phase[i].to_string();
let phase = meas_phase[i].to_string(); csv_str.push_str(&format!("{point_id},{terminal},{phase}\n"));
csv_str.push_str(&format!("{point_id},{terminal},{phase}\n")); point_terminal.insert(point_id, (terminal, meas_phase[i].clone()));
point_terminal.insert(point_id, (terminal, meas_phase[i].clone())); }
} }
} // build schema
let schema = Schema::new(vec![
let (meas, units) = r1.unwrap(); Field::new("point", DataType::UInt64, false),
for v in dyn_dev_topo { Field::new("terminal", DataType::UInt64, false),
let terminal = v[0]; Field::new("phase", DataType::Utf8, false),
let tn = v[2]; ]);
let dev = v[3]; let csv_bytes = vec![(SHUNT_MEAS_DF_NAME.to_string(), csv_str.into_bytes())];
let dev_type = v[4] as u16; let output = PluginOutput {
} error_msg: None,
// 开始处理开关量 schema: Some(vec![schema]),
for m in meas { csv_bytes,
if let Some((terminal, phase)) = point_terminal.get(&m.point_id) { };
if terminal_with_shunt_dev.contains(terminal) { get_wasm_result(output)
if let Some(unit) = units.get(&m.point_id) { } else {
match unit { let r1 = get_meas_from_plugin_input(&input);
DataUnit::A => {} if let Err(s) = &r1 {
DataUnit::V => {} error = Some(s.clone());
DataUnit::kV => {} }
DataUnit::W => {} if error.is_some() {
DataUnit::kW => {} let output = PluginOutput {
DataUnit::MW => {} error_msg: error,
DataUnit::Var => {} schema: None,
DataUnit::kVar => {} csv_bytes: vec![],
DataUnit::MVar => {} };
_ => {} get_wasm_result(output)
} else {
let (meas, units) = r1.unwrap();
for v in dyn_dev_topo {
let terminal = v[0];
let tn = v[2];
let dev = v[3];
let dev_type = v[4] as u16;
}
// 开始处理开关量
for m in meas {
if let Some((terminal, phase)) = point_terminal.get(&m.point_id) {
if let Some(unit) = units.get(&m.point_id) {
match unit {
DataUnit::A => {}
DataUnit::V => {}
DataUnit::kV => {}
DataUnit::W => {}
DataUnit::kW => {}
DataUnit::MW => {}
DataUnit::Var => {}
DataUnit::kVar => {}
DataUnit::MVar => {}
_ => {}
}
} }
} }
} }
let mut csv_str = String::from("cn,tn\n");
let output = PluginOutput {
error_msg: error,
schema: None,
csv_bytes: vec![],
};
get_wasm_result(output)
} }
} }
let mut csv_str = String::from("cn,tn\n");
// build schema
let schema = Schema::new(vec![
Field::new("cn", DataType::UInt64, false),
Field::new("tn", DataType::UInt64, false),
]);
let csv_bytes = vec![("".to_string(), csv_str.into_bytes())];
let output = PluginOutput {
error_msg: None,
schema: Some(vec![schema]),
csv_bytes,
};
get_wasm_result(output)
} }
} }
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论