Commit e6bcba7b by dongshufeng

add ds power flow plugins

parent 4207617b
......@@ -6,7 +6,10 @@ members = [
"eig-aoe",
"eig-db",
"mems",
"mems/examples/wasm-dn-static-topo",
"mems/examples/ds-static-topo",
"mems/examples/ds-dev-ohm-cal",
"mems/examples/ds-dyn-topo",
"mems/examples/ds-3phase-pf",
]
[workspace.package]
......
......@@ -5,11 +5,6 @@ authors = ["dongshufeng <dongshufeng@zju.edu.cn>"]
edition.workspace = true
rust-version.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
strict-model = []
model = ["strict-model"]
[dependencies]
log = "^0.4"
async-trait = "0.1"
......@@ -25,5 +20,5 @@ protobuf = "3.5"
ndarray = { version = "0.15", features = ["serde"] }
num-complex = "0.4"
# this repo
eig-domain = { path = "../eig-domain", default-features = false, features = ["model"]}
eig-expr = { path = "../eig-expr", default-features = false, features = ["model"] }
\ No newline at end of file
eig-domain = { path = "../eig-domain"}
eig-expr = { path = "../eig-expr"}
\ No newline at end of file
......@@ -5,9 +5,6 @@ authors = ["dongshufeng <dongshufeng@zju.edu.cn>"]
edition.workspace = true
rust-version.workspace = true
[features]
model = []
[dependencies]
log = "0.4"
bytes = "1.6"
......@@ -15,5 +12,5 @@ byteorder = "1.5"
serde = { version = "1.0", features = ["derive"] }
async-channel = "2.3"
eig-domain = { path = "../eig-domain", default-features = false, features = ["model"]}
eig-aoe = { path = "../eig-aoe", default-features = false, features = ["strict-model"] }
\ No newline at end of file
eig-domain = { path = "../eig-domain"}
eig-aoe = { path = "../eig-aoe"}
\ No newline at end of file
......@@ -6,11 +6,6 @@ edition.workspace = true
rust-version.workspace = true
build = "build.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
with-serde = []
model = ["with-serde"]
[dependencies]
log = "0.4"
bytes = "1.6"
......@@ -21,13 +16,14 @@ calamine = { version = "0.25" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
protobuf = { version = "3.5", features = ["with-bytes"] }
# this project
eig-expr = { path = "../eig-expr", default-features = false, features = ["model"] }
encoding_rs = "0.8"
zip = { version = "2.1", default-features = false, features = ["deflate"] }
tempfile = { version = "3.10" }
walkdir = { version = "2.5" }
# this project
eig-expr = { path = "../eig-expr"}
[build-dependencies]
# protobuf-codegen-pure = "2.28"
protobuf-codegen = "3.5"
......
......@@ -5,9 +5,6 @@ authors = ["dongshufeng <dongshufeng@zju.edu.cn>"]
edition.workspace = true
rust-version.workspace = true
[features]
model = []
[dependencies]
fnv = "1.0"
nom = "7.1"
......
......@@ -12,7 +12,7 @@ byteorder = "1.5"
csv = "1.3"
protobuf = { version = "3.5", features = ["with-bytes"] }
serde = { version = "1.0", features = ["derive"] }
serde_cbor ="0.11"
serde_cbor = "0.11"
serde_json = "1.0"
petgraph = "0.6"
rayon = "1.10"
......@@ -20,10 +20,10 @@ num-traits = "0.2"
cron = "0.12"
async-channel = "2.3"
base64 = "0.22"
arrow-schema = { version = "52.1", features = ["serde"]}
arrow-schema = { version = "52.1", features = ["serde"] }
# this project
eig-domain = { path = "../eig-domain", default-features = false, features = ["model"] }
eig-expr = { path = "../eig-expr", default-features = false, features = ["model"] }
eig-aoe = { path = "../eig-aoe", default-features = false, features = ["strict-model"] }
eig-db = { path = "../eig-db", default-features = false, features = ["model"] }
eig-domain = { path = "../eig-domain" }
eig-expr = { path = "../eig-expr" }
eig-aoe = { path = "../eig-aoe" }
eig-db = { path = "../eig-db" }
[package]
name = "ds-3phase-pf"
version = "0.1.0"
edition = "2021"
[lib]
crate-type = ["cdylib"]
[dependencies]
log = "0.4"
serde_json = "1.0"
serde_cbor = "0.11"
arrow-schema = { version = "52.1", features = ["serde"] }
petgraph = "0.6"
csv = "1.3.0"
num-complex = "0.4"
ndarray = "0.15"
nalgebra = "0.33"
eig-domain = { path = "../../../eig-domain" }
mems = { path = "../../../mems" }
\ No newline at end of file
#![allow(non_snake_case)]
use std::collections::HashMap;
use arrow_schema::{DataType, Field, Schema};
use ndarray::Array2;
use mems::model::{get_wasm_result, PluginInput, PluginOutput};
use crate::read::{read_dev_matrix, read_dev_topo, read_dyn_topo, read_tn_input};
mod read;
mod nlp;
const DYN_TOPO_DF_NAME: &str = "dyn_topo";
const DEV_TOPO_DF_NAME: &str = "dev_topo";
const DEV_CONDUCTOR_DF_NAME: &str = "dev_ohm";
const TN_INPUT_DF_NAME: &str = "tn_input";
#[no_mangle]
pub unsafe fn run(ptr: i32, len: u32) -> u64 {
// 从内存中获取字符串
let input = unsafe {
let slice = std::slice::from_raw_parts(ptr as _, len as _);
let input: PluginInput = serde_cbor::from_slice(slice).unwrap();
input
};
let from = 0;
let mut error = None;
// get from dynamic topology wasm node
// cn, tn
let mut dyn_topo: Vec<Vec<u64>>;
// terminal, cn, tn, dev
let mut dev_topo: Vec<Vec<u64>>;
// dev id, conductor matrix, get from conductor impedance cal wasm node
let mut dev_conductor: HashMap<u64, Vec<Array2<f64>>>;
// tn id with input
let mut input_tns;
// input pos
let mut input_phases;
// input types
let mut input_types;
// input values
let mut input_values;
for i in 0..input.dfs_len.len() {
let size = input.dfs_len[i] as usize;
let end = from + size;
let mut rdr = csv::ReaderBuilder::new().has_headers(true).from_reader(&input.bytes[from..end]);
let mut records = rdr.records();
// 对第i个边输入该节点的 dataframe 进行处理
if input.dfs[i] == DYN_TOPO_DF_NAME {
match read_dyn_topo(&mut records) {
Ok(v) => dyn_topo = v,
Err(s) => {
error = Some(s);
break;
}
}
} else if input.dfs[i] == DEV_TOPO_DF_NAME {
match read_dev_topo(&mut records) {
Ok(v) => dev_topo = v,
Err(s) => {
error = Some(s);
break;
}
}
} else if input.dfs[i] == DEV_CONDUCTOR_DF_NAME {
match read_dev_matrix(&mut records) {
Ok(v) => dev_conductor = v,
Err(s) => {
error = Some(s);
break;
}
}
} else if input.dfs[i] == TN_INPUT_DF_NAME {
match read_tn_input(&mut records) {
Ok(v) => (input_tns, input_phases, input_types, input_values) = v,
Err(s) => {
error = Some(s);
break;
}
}
}
}
if error.is_some() {
let output = PluginOutput {
error_msg: error,
schema: None,
csv_bytes: vec![],
};
get_wasm_result(output)
} else {
let mut csv_str = String::from("cn,tn\n");
// build schema
let schema = Schema::new(vec![
Field::new("cn", DataType::UInt64, false),
Field::new("tn", DataType::UInt64, false),
]);
let csv_bytes = vec![("".to_string(), csv_str.into_bytes())];
let output = PluginOutput {
error_msg: None,
schema: Some(vec![schema]),
csv_bytes,
};
get_wasm_result(output)
}
}
\ No newline at end of file
use std::collections::HashMap;
use ndarray::{Array, Array2, Ix2};
use num_complex::Complex64;
use eig_domain::DataUnit;
use mems::model::dev::MeasPhase;
pub fn get_pf_nlp_constraints(
// cn, tn
dyn_topo: Vec<Vec<u64>>,
// terminal, cn, tn, dev
dev_topo: Vec<Vec<u64>>,
dev_matrix: HashMap<u64, Vec<Array2<f64>>>,
input_tns: Vec<u64>,
input_phases: Vec<Vec<MeasPhase>>,
input_types: Vec<Vec<DataUnit>>,
input_values: Vec<Vec<f64>>) -> Option<Vec<String>> {
let mut constraint = Vec::with_capacity(dyn_topo.len());
for i in 0..input_tns.len() {
for j in 0..input_types[i].len() {
match input_types[i][j] {
DataUnit::kW => {
let mut active_exp = String::new();
constraint.push(active_exp);
match input_phases[i][j] {
MeasPhase::Total => {}
MeasPhase::A => {}
MeasPhase::B => {}
MeasPhase::C => {}
MeasPhase::A0 => {}
MeasPhase::B0 => {}
MeasPhase::C0 => {}
MeasPhase::CT => {}
MeasPhase::PT => {}
MeasPhase::Unknown => return None,
}
}
DataUnit::kVar => {
let mut reactive_exp = String::new();
constraint.push(reactive_exp);
}
DataUnit::kV => {}
_ => {}
}
}
}
Some(constraint)
}
pub fn get_pf_nlp_variables(tns: &[u64]) -> String {
let mut variable = String::new();
// 生成变量名
for tn in tns {
variable.push_str(&format!("V_{tn}_A:[0/2],D_{tn}_A:[-3.2/3.2],V_{tn}_B:[0/2],D_{tn}_B:[-3.2/3.2],V_{tn}_C:[0/2],D_{tn}_C:[-3.2/3.2]"));
}
variable
}
fn get_pq_of_acline(r_x: Array<Complex64, Ix2>) -> Option<(String, String)> {
let mut mode = 0; //判断相位的模式
if r_x[[0, 0]] != Complex64::new(0.0, 0.0) {
mode += 1;
}
if r_x[[1, 1]] != Complex64::new(0.0, 0.0) {
mode += 2;
}
if r_x[[2, 2]] != Complex64::new(0.0, 0.0) {
mode += 4;
}
// 计算导纳阵
let result = match mode {
// A 或 B 或 C r_x[[2,2]].inv().unwrap()
1 => {
let gb = r_x[[0, 0]].inv();
format!("{}*x1", gb.re)
}
2 => {
let gb = r_x[[1, 1]].inv();
format!("{}*x1", gb.re)
}
4 => {
let gb = r_x[[2, 2]].inv();
format!("{}*x1", gb.re)
}
// AB
3 => {
let rx = nalgebra::Matrix2::new(
r_x[[0, 0]], r_x[[0, 1]],
r_x[[1, 0]], r_x[[1, 1]]);
let gb = rx.try_inverse().unwrap();
format!("{}*x1-{}*x2", gb.m11, gb.m12)
}
// AC
5 => {
let rx = nalgebra::Matrix2::new(
r_x[[0, 0]], r_x[[0, 2]],
r_x[[2, 0]], r_x[[2, 2]]);
let gb = rx.try_inverse().unwrap();
format!("{}*x1", gb.m11)
}
// BC
6 => {
let rx = nalgebra::Matrix2::new(
r_x[[1, 1]], r_x[[1, 2]],
r_x[[2, 1]], r_x[[2, 2]]);
let gb = rx.try_inverse().unwrap();
format!("{}*x1", gb.m11)
}
// ABC
7 => {
let rx = nalgebra::Matrix3::new(
r_x[[0, 0]], r_x[[0, 1]], r_x[[0, 2]],
r_x[[1, 0]], r_x[[1, 1]], r_x[[1, 2]],
r_x[[2, 0]], r_x[[2, 1]], r_x[[2, 2]]);
let gb = rx.try_inverse().unwrap();
format!("{}*x1-{}*x2-{}*x3", gb.m11, gb.m12, gb.m13)
}
_ => { return None; }
};
Some((result.clone(), result.clone()))
}
\ No newline at end of file
use std::collections::HashMap;
use csv::StringRecordsIter;
use ndarray::Array2;
use eig_domain::DataUnit;
use mems::model::dev::MeasPhase;
const MAT_SIZE: usize = 2 * 54;
pub(crate) fn read_dyn_topo(records: &mut StringRecordsIter<&[u8]>)
-> Result<Vec<Vec<u64>>, String> {
let mut dyn_topo = Vec::new();
// 按行读取csv
let mut row = 0;
loop {
match records.next() {
Some(Ok(record)) => {
let mut col = 0;
dyn_topo.push(vec![0u64; 2]);
for str in record.iter() {
if let Ok(id) = str.parse() {
dyn_topo[row][col] = id;
} else {
return Err(format!("Wrong dynamic topology input, row {row} col {col}"));
}
col += 1;
if col == 2 {
break;
}
}
if col != 2 {
return Err(format!("Wrong dynamic topology input, expected col 2, actual {col}"));
}
}
Some(Err(e)) => {
return Err(format!("Wrong dynamic topology input, err: {:?}", e));
}
None => {
break;
}
}
row += 1;
}
Ok(dyn_topo)
}
pub(crate) fn read_dev_topo(records: &mut StringRecordsIter<&[u8]>)
-> Result<Vec<Vec<u64>>, String> {
let mut dev_topo = Vec::new();
// 按行读取csv
let mut row = 0;
loop {
match records.next() {
Some(Ok(record)) => {
let mut col = 0;
dev_topo.push(vec![0u64; 4]);
for str in record.iter() {
if let Ok(id) = str.parse() {
dev_topo[row][col] = id;
} else {
return Err(format!("Wrong device topology, row {row} col {col}"));
}
col += 1;
if col == 4 {
break;
}
}
if col != 4 {
return Err(format!("Wrong device topology input, expected col 4, actual {col}"));
}
}
Some(Err(e)) => {
return Err(format!("Wrong device topology input, err: {:?}", e));
}
None => {
break;
}
}
row += 1;
}
Ok(dev_topo)
}
pub(crate) fn read_dev_matrix(records: &mut StringRecordsIter<&[u8]>)
-> Result<HashMap<u64, Vec<Array2<f64>>>, String> {
let mut map = HashMap::new();
let mut dev_id = 0u64;
let mut matrix: Vec<f64> = Vec::with_capacity(MAT_SIZE);
let mut row = 0;
loop {
match records.next() {
Some(Ok(record)) => {
let mut col = 0;
for str in record.iter() {
if col == 0 {
if let Ok(id) = str.parse() {
if dev_id != id {
if dev_id != 0 {
if matrix.len() != MAT_SIZE {
return Err(format!("matrix len must be {MAT_SIZE}"));
} else {
let v = create_rx(&matrix);
map.insert(dev_id, v);
}
}
dev_id = id;
matrix.clear();
}
} else {
return Err(format!("Wrong dev matrix, row {row} col {col}"));
}
} else {
if let Ok(f) = str.parse() {
matrix.push(f);
} else {
return Err(format!("Wrong dev matrix, row {row} col {col}"));
}
}
col += 1;
}
}
Some(Err(e)) => {
return Err(format!("Wrong dev matrix, err: {:?}", e));
}
None => {
break;
}
}
row += 1;
}
if dev_id != 0 {
if matrix.len() != MAT_SIZE {
return Err(format!("matrix len must be {MAT_SIZE}"));
} else {
let v = create_rx(&matrix);
map.insert(dev_id, v);
}
}
Ok(map)
}
pub(crate) fn read_tn_input(records: &mut StringRecordsIter<&[u8]>)
-> Result<(Vec<u64>, Vec<Vec<MeasPhase>>, Vec<Vec<DataUnit>>, Vec<Vec<f64>>), String> {
let mut tn = Vec::new();
let mut input_type = Vec::new();
let mut input_phase = Vec::new();
let mut value = Vec::new();
// 按行读取csv
let mut row = 0;
loop {
match records.next() {
Some(Ok(record)) => {
let mut col = 0;
for str in record.iter() {
if col == 0 {
if let Ok(v) = str.parse() {
tn.push(v);
} else {
return Err(format!("Wrong bus input, row {row} col {col}"));
}
} else if col == 1 {
if let Ok(v) = serde_json::from_str(str) {
input_phase.push(v);
} else {
return Err(format!("Wrong bus input, row {row} col {col}"));
}
} else if col == 2 {
if let Ok(v) = serde_json::from_str(str) {
input_type.push(v);
} else {
return Err(format!("Wrong bus input, row {row} col {col}"));
}
} else if col == 3 {
if let Ok(v) = serde_json::from_str(str) {
value.push(v);
} else {
return Err(format!("Wrong bus input, row {row} col {col}"));
}
}
col += 1;
if col == 4 {
break;
}
}
if col != 4 {
return Err(format!("Wrong bus input, expected col 4, actual {col}"));
}
}
Some(Err(e)) => {
return Err(format!("Wrong bus input, err: {:?}", e));
}
None => {
break;
}
}
row += 1;
}
Ok((tn, input_phase, input_type, value))
}
fn create_rx(matrix: &[f64]) -> Vec<Array2<f64>> {
let r = Array2::from_shape_vec((3, 3), matrix[0..9].to_vec()).unwrap();
let x = Array2::from_shape_vec((3, 3), matrix[9..18].to_vec()).unwrap();
let v = vec![r, x];
v
}
#[test]
fn test_unit_parse() {
let units = vec![DataUnit::kVar, DataUnit::kW];
let s = serde_json::to_string(&units).unwrap();
assert_eq!("[\"kVar\",\"kW\"]", s);
let r: Vec<DataUnit> = serde_json::from_str("[\"kVar\", \"kW\"]").unwrap();
assert_eq!(r, units);
}
\ No newline at end of file
[package]
name = "ds-dev-ohm-cal"
version = "0.1.0"
edition = "2021"
[lib]
crate-type = ["cdylib"]
[dependencies]
log = "0.4"
serde_json = "1.0"
serde_cbor = "0.11"
arrow-schema = { version = "52.1", features = ["serde"] }
petgraph = "0.6"
eig-domain = { path = "../../../eig-domain" }
mems = { path = "../../../mems" }
csv = "1.3.0"
ndarray = "0.15"
use std::collections::HashMap;
use arrow_schema::{DataType, Field, Schema};
use csv::StringRecordsIter;
use log::{info, warn};
use ndarray::{Array2, ArrayBase, Ix2, OwnedRepr};
use eig_domain::PropValue;
use mems::model::{get_csv_str, get_df_from_in_plugin, get_island_from_plugin_input, get_wasm_result, PluginInput, PluginOutput};
use mems::model::dev::PsRsrType;
#[no_mangle]
pub unsafe fn run(ptr: i32, len: u32) -> u64 {
info!("Read plugin input firstly");
// 从内存中获取字符串
let input = unsafe {
let slice = std::slice::from_raw_parts(ptr as _, len as _);
let input: PluginInput = serde_cbor::from_slice(slice).unwrap();
input
};
let mut error = None;
let r = get_island_from_plugin_input(&input);
if let Err(s) = &r {
error = Some(s.clone());
}
let r2 = get_df_from_in_plugin(&input);
if let Err(s) = &r2 {
error = Some(s.clone());
}
let mut config= HashMap::with_capacity(0);
let mut csv_str = String::from("dev_id,conductor\n");
if error.is_none() {
let (island, prop_defs, defines) = r.unwrap();
let from = r2.unwrap();
info!("input dataframe num from edges is {}", input.dfs.len());
for i in 0..input.dfs_len.len() {
let size = input.dfs_len[i] as usize;
let end = from + size;
let mut rdr = csv::ReaderBuilder::new().has_headers(true).from_reader(&input.bytes[from..end]);
let mut records = rdr.records();
match read_config( &mut records) {
Ok(v) => config = v,
Err(s) => error = Some(s),
}
break;
}
if error.is_none() {
let mut prop_defines = HashMap::with_capacity(prop_defs.len());
for def in prop_defs.into_iter() {
prop_defines.insert(def.id, def);
}
for (_, rsr) in &island.resources {
if let Some(def) = defines.get(&rsr.define_id) {
if def.rsr_type == PsRsrType::ACline {
let dev_id = rsr.id;
let line_conf = rsr.get_prop_value("model", &island.prop_groups, &prop_defines);
let length = rsr.get_prop_value("length", &island.prop_groups, &prop_defines);
if let PropValue::Str(s) = line_conf {
if let Some((mat_re, mat_im)) = config.get(&s) {
if let Some(f) = length.get_f64() {
let ratio = f / 1000.0;
let mut v1 = (mat_re * ratio).into_raw_vec();
let v2 = (mat_im * ratio).into_raw_vec();
v1.extend(v2);
let s = get_csv_str(&serde_json::to_string(&v1).unwrap());
csv_str.push_str(&format!("{dev_id},{s}\n"));
} else {
warn!("Length is not set for acline {}", rsr.name);
continue;
}
} else {
warn!("!!Failed to find matrix for line_conf: {s}");
}
}
}
}
}
}
}
if error.is_none() {
// build schema
let schema = Schema::new(vec![
Field::new("dev_id", DataType::UInt64, false),
Field::new("conductor", DataType::Utf8, false),
]);
let csv_bytes = vec![("".to_string(), csv_str.into_bytes())];
let output = PluginOutput {
error_msg: None,
schema: Some(vec![schema]),
csv_bytes,
};
get_wasm_result(output)
} else {
let output = PluginOutput {
error_msg: error,
schema: None,
csv_bytes: vec![],
};
get_wasm_result(output)
}
}
fn read_config(records: &mut StringRecordsIter<&[u8]>)
-> Result<HashMap<String, (ArrayBase<OwnedRepr<f64>, Ix2>, ArrayBase<OwnedRepr<f64>, Ix2>)>, String>{
let mut config: HashMap<String, (ArrayBase<OwnedRepr<f64>, Ix2>, ArrayBase<OwnedRepr<f64>, Ix2>)> = HashMap::new();
// 按行读取csv
loop {
match records.next() {
Some(Ok(record)) => {
let mut col = 0;
let mut name = "dev_ohm".to_string();
let mut ohm_per_km = "".to_string();
for str in record.iter() {
if col == 0 {
name = str.to_string();
} else {
ohm_per_km = str.to_string();
}
col += 1;
if col == 2 {
break;
}
}
if col != 2 {
return Err(format!("Wrong config input, expected col 2, actual {col}"));
}
match serde_json::from_str::<[f64; 18]>(&ohm_per_km) {
Ok(ohm) => {
let mat_re = Array2::from_shape_vec((3, 3), ohm[0..9].to_vec()).unwrap();
let mat_im = Array2::from_shape_vec((3, 3), ohm[9..18].to_vec()).unwrap();
config.insert(name, (mat_re, mat_im));
}
Err(e) => {
return Err(format!("Failed to parse matrix from {ohm_per_km}, err: {:?}", e));
}
}
}
Some(Err(e)) => {
return Err(format!("{:?}", e));
}
None => {
break;
}
}
}
Ok(config)
}
\ No newline at end of file
[package]
name = "ds-dyn-topo"
version = "0.1.0"
edition = "2021"
[lib]
crate-type = ["cdylib"]
[dependencies]
log = "0.4"
serde_cbor = "0.11"
arrow-schema = { version = "52.1", features = ["serde"] }
petgraph = "0.6"
eig-domain = { path = "../../../eig-domain" }
mems = { path = "../../../mems" }
csv = "1.3.0"
\ No newline at end of file
use std::collections::HashMap;
use arrow_schema::{DataType, Field, Schema};
use eig_domain::DataUnit;
use mems::model::{get_df_from_in_plugin, get_meas_from_plugin_input, get_wasm_result, PluginInput, PluginOutput};
use crate::read::{read_edges, read_points, read_terminals};
mod read;
const STATIC_TOPO_DF_NAME: &str = "static_topo";
const TERMINAL_DF_NAME: &str = "terminal_cn_dev";
const POINT_DF_NAME: &str = "point_terminal_phase";
const DYN_TOPO_DF_NAME: &str = "dyn_topo";
const DEV_TOPO_DF_NAME: &str = "dev_topo";
#[no_mangle]
pub unsafe fn run(ptr: i32, len: u32) -> u64 {
// 从内存中获取字符串
let input = unsafe {
let slice = std::slice::from_raw_parts(ptr as _, len as _);
let input: PluginInput = serde_cbor::from_slice(slice).unwrap();
input
};
let mut error = None;
let r1 = get_meas_from_plugin_input(&input);
if let Err(s) = &r1 {
error = Some(s.clone());
}
let r2 = get_df_from_in_plugin(&input);
// source, target, dev
let mut edges: Vec<Vec<u64>> = vec![];
// switch id to normal open
let mut normal_open: HashMap<u64, bool> = HashMap::with_capacity(0);
// terminal, cn, dev
let mut terminals: Vec<Vec<u64>> = vec![];
// point, terminal
let mut points: Vec<Vec<u64>> = vec![];
if error.is_none() {
if let Err(s) = &r2 {
error = Some(s.clone());
} else {
let mut from = r2.unwrap();
for i in 0..input.dfs_len.len() {
let size = input.dfs_len[i] as usize;
let end = from + size;
let mut rdr = csv::ReaderBuilder::new().has_headers(true).from_reader(&input.bytes[from..end]);
let mut records = rdr.records();
// 开始读取输入的static topology DataFrame
if input.dfs[i] == STATIC_TOPO_DF_NAME {
match read_edges(&mut records) {
Ok(v) => (edges, normal_open) = v,
Err(s) => error = Some(s),
}
} else if input.dfs[i] == TERMINAL_DF_NAME {
match read_terminals(&mut records) {
Ok(v) => terminals = v,
Err(s) => error = Some(s),
}
} else if input.dfs[i] == POINT_DF_NAME {
match read_points(&mut records) {
Ok(v) => points = v,
Err(s) => error = Some(s),
}
}
from += size;
}
}
}
if error.is_none() {
let (meas, units) = r1.unwrap();
let mut point_map: HashMap<u64, u64> = HashMap::with_capacity(points.len());
let mut terminal_cn: HashMap<u64, u64> = HashMap::with_capacity(points.len());
let mut terminal_dev: HashMap<u64, u64> = HashMap::with_capacity(points.len());
for ids in points {
point_map.insert(ids[0], ids[1]);
}
for ids in terminals {
terminal_cn.insert(ids[0], ids[1]);
terminal_dev.insert(ids[0], ids[2]);
}
// 开始构建
let mut switch_to_cn: HashMap<u64, u64> = HashMap::with_capacity(terminal_cn.len() / 2);
// 开始处理开关量
for m in meas {
if let Some(unit) = units.get(&m.point_id) {
if DataUnit::OnOrOff == *unit {
if let Some(terminal_id) = point_map.get(&m.point_id) {
if let Some(cn_id) = terminal_cn.get(terminal_id) {
if let Some(dev_id) = terminal_dev.get(terminal_id) {
if m.get_value2() > 0 {
switch_to_cn.insert(*dev_id, *cn_id);
}
}
}
}
}
}
}
// build tns
let mut cn_tn: HashMap<u64, usize> = HashMap::with_capacity(terminal_cn.len() / 2);
for v in edges {
let cn1 = v[0];
let cn2 = v[1];
let dev_id = v[2];
// switch with measure
if let Some(cn) = switch_to_cn.get(&dev_id) {
if *cn == cn1 {
if let Some(tn) = cn_tn.get(cn) {
cn_tn.insert(cn2, *tn);
} else {
let tn = cn_tn.len() + 1;
cn_tn.insert(cn1, tn);
cn_tn.insert(cn2, tn);
}
} else if *cn == cn2 {
if let Some(tn) = cn_tn.get(cn) {
cn_tn.insert(cn1, *tn);
} else {
let tn = cn_tn.len() + 1;
cn_tn.insert(cn1, tn);
cn_tn.insert(cn2, tn);
}
}
}
// this is a closed switch with no measure
else if let Some(false) = normal_open.get(&dev_id) {
if let Some(tn) = cn_tn.get(&cn1) {
cn_tn.insert(cn2, *tn);
} else if let Some(tn) = cn_tn.get(&cn2) {
cn_tn.insert(cn1, *tn);
} else {
let tn = cn_tn.len() + 1;
cn_tn.insert(cn1, tn);
cn_tn.insert(cn2, tn);
}
}
// this is open switch or not switch
else {
let tn = cn_tn.len() + 1;
cn_tn.insert(cn1, tn);
let tn = tn + 1;
cn_tn.insert(cn2, tn);
}
}
// build topology
let mut topo_csv = String::from("cn,tn\n");
for (cn, tn) in &cn_tn {
topo_csv.push_str(&format!("{cn},{tn}\n"));
}
// build topology schema
let topo_schema = Schema::new(vec![
Field::new("cn", DataType::UInt64, false),
Field::new("tn", DataType::UInt64, false),
]);
// build dev connection
let mut dev_csv = String::from("terminal,cn,tn,dev\n");
for (terminal, dev) in terminal_dev {
if switch_to_cn.contains_key(&dev) {
continue;
}
if let Some(cn) = terminal_cn.get(&terminal) {
if let Some(tn) = cn_tn.get(cn) {
dev_csv.push_str(&format!("{terminal},{cn},{tn},{dev}\n"));
}
}
}
// build dev connection schema
let dev_schema = Schema::new(vec![
Field::new("terminal", DataType::UInt64, false),
Field::new("cn", DataType::UInt64, false),
Field::new("tn", DataType::UInt64, false),
Field::new("dev", DataType::UInt64, false),
]);
let csv_bytes = vec![(DYN_TOPO_DF_NAME.to_string(), topo_csv.into_bytes()),
(DEV_TOPO_DF_NAME.to_string(), dev_csv.into_bytes())];
let output = PluginOutput {
error_msg: None,
schema: Some(vec![topo_schema, dev_schema]),
csv_bytes,
};
get_wasm_result(output)
} else {
let output = PluginOutput {
error_msg: error,
schema: None,
csv_bytes: vec![],
};
get_wasm_result(output)
}
}
\ No newline at end of file
use std::collections::HashMap;
use csv::StringRecordsIter;
use mems::model::dev::PsRsrType;
pub(crate) fn read_points(records: &mut StringRecordsIter<&[u8]>) -> Result<Vec<Vec<u64>>, String> {
let mut points = Vec::new();
// 按行读取csv
let mut row = 0;
loop {
points.push(vec![0u64; 2]);
match records.next() {
Some(Ok(record)) => {
let mut col = 0;
for str in record.iter() {
if let Ok(id) = str.parse() {
points[row][col] = id;
} else {
return Err(format!("Wrong point input, row {row} col {col}"));
}
col += 1;
if col == 2 {
break;
}
}
if col != 2 {
return Err(format!("Wrong point input, expected col at least 2, actual {col}"));
}
}
Some(Err(e)) => {
return Err(format!("Wrong point input, err: {:?}", e));
}
None => {
break;
}
}
row += 1;
}
Ok(points)
}
pub(crate) fn read_terminals(records: &mut StringRecordsIter<&[u8]>) -> Result<Vec<Vec<u64>>, String> {
let mut terminals: Vec<Vec<u64>> = Vec::new();
// 按行读取csv
let mut row = 0;
loop {
terminals.push(vec![0u64; 3]);
match records.next() {
Some(Ok(record)) => {
let mut col = 0;
for str in record.iter() {
if let Ok(id) = str.parse() {
terminals[row][col] = id;
} else {
return Err(format!("Wrong terminal input, row {row} col {col}"));
}
col += 1;
if col == 3 {
break;
}
}
if col != 3 {
return Err(format!("Wrong terminal input, expected col at least 3, actual {col}"));
}
}
Some(Err(e)) => {
return Err(format!("Wrong terminal input, err: {:?}", e));
}
None => {
break;
}
}
row += 1;
}
Ok(terminals)
}
pub(crate) fn read_edges(records: &mut StringRecordsIter<&[u8]>)
-> Result<(Vec<Vec<u64>>, HashMap<u64, bool>), String> {
let mut edges = Vec::new();
let mut normal_open = HashMap::new();
let mut row = 0;
let swich_type = PsRsrType::Switch as u16;
// 按行读取csv
loop {
edges.push(vec![0u64; 3]);
match records.next() {
Some(Ok(record)) => {
let mut col = 0;
let mut is_switch = false;
for str in record.iter() {
if col < 3 {
if let Ok(id) = str.parse() {
edges[row][col] = id;
} else {
return Err(format!("Wrong static topology input, row {row} col {col}"));
}
} else if col == 3 {
if let Ok(type_u16) = str.parse::<u16>() {
is_switch = type_u16 == swich_type;
} else {
return Err(format!("Wrong static topology input, row {row} col {col}"));
}
} else if col == 4 && is_switch {
if let Ok(b) = str.parse::<bool>() {
normal_open.insert(edges[row][2], b);
} else {
return Err(format!("Wrong static topology input, row {row} col {col}"));
}
}
col += 1;
if col == 5 {
break;
}
}
if col != 5 {
return Err(format!("Wrong static topology input, expected col at least 5, actual {col}"));
}
}
Some(Err(e)) => {
return Err(format!("Wrong static topology input, err: {:?}", e));
}
None => {
break;
}
}
row += 1;
}
Ok((edges, normal_open))
}
\ No newline at end of file
[package]
name = "wasm-dn-static-topo"
name = "ds-static-topo"
version = "0.1.0"
edition = "2021"
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论