Commit fda0317b by dongshufeng

change dynmaic topo logic

parent 3c2f2ee7
......@@ -3,7 +3,7 @@ use std::collections::HashMap;
use arrow_schema::{DataType, Field, Schema};
use eig_domain::DataUnit;
use mems::model::{get_df_from_in_plugin, get_meas_from_plugin_input, get_wasm_result, PluginInput, PluginOutput};
use mems::model::{get_df_from_in_plugin, get_meas_from_plugin_input, get_wasm_result, ModelType, PluginInput, PluginOutput};
use crate::read::{read_edges, read_points, read_terminals};
mod read;
......@@ -142,41 +142,59 @@ pub unsafe fn run(ptr: i32, len: u32) -> u64 {
cn_tn.insert(cn2, tn);
}
}
// build topology
let mut topo_csv = String::from("cn,tn\n");
for (cn, tn) in &cn_tn {
topo_csv.push_str(&format!("{cn},{tn}\n"));
// get outgoing edges
let mut outgoing = vec![];
for model_input in &input.model {
match model_input {
ModelType::Outgoing(edge_name) => {
outgoing = edge_name.clone();
}
_ => {}
}
}
// build topology schema
let topo_schema = Schema::new(vec![
Field::new("cn", DataType::UInt64, false),
Field::new("tn", DataType::UInt64, false),
]);
// build dev connection
let mut dev_csv = String::from("terminal,cn,tn,dev\n");
for (terminal, dev) in terminal_dev {
if switch_to_cn.contains_key(&dev) {
continue;
let mut csv_bytes = Vec::with_capacity(2);
let mut schema = Vec::with_capacity(2);
if outgoing.is_empty() || outgoing.contains(&DYN_TOPO_DF_NAME.to_string()) ||
(!outgoing.contains(&DYN_TOPO_DF_NAME.to_string()) && !outgoing.contains(&DEV_TOPO_DF_NAME.to_string())) {
// build topology
let mut topo_csv = String::from("cn,tn\n");
for (cn, tn) in &cn_tn {
topo_csv.push_str(&format!("{cn},{tn}\n"));
}
if let Some(cn) = terminal_cn.get(&terminal) {
if let Some(tn) = cn_tn.get(cn) {
dev_csv.push_str(&format!("{terminal},{cn},{tn},{dev}\n"));
// build topology schema
let topo_schema = Schema::new(vec![
Field::new("cn", DataType::UInt64, false),
Field::new("tn", DataType::UInt64, false),
]);
csv_bytes.push((DYN_TOPO_DF_NAME.to_string(), topo_csv.into_bytes()));
schema.push(topo_schema);
}
if outgoing.contains(&DEV_TOPO_DF_NAME.to_string()) {
// build dev connection
let mut dev_csv = String::from("terminal,cn,tn,dev\n");
for (terminal, dev) in terminal_dev {
if switch_to_cn.contains_key(&dev) {
continue;
}
if let Some(cn) = terminal_cn.get(&terminal) {
if let Some(tn) = cn_tn.get(cn) {
dev_csv.push_str(&format!("{terminal},{cn},{tn},{dev}\n"));
}
}
}
// build dev connection schema
let dev_schema = Schema::new(vec![
Field::new("terminal", DataType::UInt64, false),
Field::new("cn", DataType::UInt64, false),
Field::new("tn", DataType::UInt64, false),
Field::new("dev", DataType::UInt64, false),
]);
csv_bytes.push((DEV_TOPO_DF_NAME.to_string(), dev_csv.into_bytes()));
schema.push(dev_schema);
}
// build dev connection schema
let dev_schema = Schema::new(vec![
Field::new("terminal", DataType::UInt64, false),
Field::new("cn", DataType::UInt64, false),
Field::new("tn", DataType::UInt64, false),
Field::new("dev", DataType::UInt64, false),
]);
let csv_bytes = vec![(DYN_TOPO_DF_NAME.to_string(), topo_csv.into_bytes()),
(DEV_TOPO_DF_NAME.to_string(), dev_csv.into_bytes())];
let output = PluginOutput {
error_msg: None,
schema: Some(vec![topo_schema, dev_schema]),
schema: Some(schema),
csv_bytes,
};
get_wasm_result(output)
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论