Skip to content

Commit

Permalink
(#78) Network viewer - New API JSON-RPC getStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
mario4tier committed Oct 10, 2023
1 parent 22c715f commit cd75919
Show file tree
Hide file tree
Showing 21 changed files with 1,135 additions and 226 deletions.
106 changes: 90 additions & 16 deletions rust/suibase/crates/suibase-daemon/src/admin_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ use crate::basic_types::*;

use crate::network_monitor::NetMonTx;
use crate::proxy_server::ProxyServer;
use crate::shared_types::{Globals, InputPort, SafeWorkdirs, WorkdirProxyConfig, Workdirs};
use crate::shared_types::{
Globals, GlobalsProxyMT, GlobalsStatusMT, GlobalsWorkdirsMT, InputPort, WorkdirProxyConfig,
WorkdirsST,
};
use crate::workdirs_watcher::WorkdirsWatcher;
use crate::workers::ShellWorker;

use anyhow::Result;

Expand All @@ -16,13 +20,13 @@ use tokio_graceful_shutdown::{FutureExt, NestedSubsystem, SubsystemHandle};
//
// The AdminController does:
// - Process all system/configuration-level events that are easier to handle when done sequentially
// (implemented by dequeing and processing one event at the time).
// (implemented by dequeuing and processing one event at the time).
// - Handle events to hot-reload the suibase.yaml
// - Handle events for various user actions (e.g. from JSONRPCServer).
// - Responsible to keep one "ProxyServer" per workdir running (localnet, devnet, testnet ...)
// - Responsible to keep one "ProxyServer" and "ShellProcessor" running per workdir.
//
// Globals: InputPort Instantiation
// ================================
// globals.proxy: InputPort Instantiation
// =======================================
// One InputPort is instantiated per workdir (localnet, devnet, testnet ...).
//
// Once instantiated, it is never deleted. Subsequently, the ProxyServer is also started
Expand All @@ -34,20 +38,41 @@ use tokio_graceful_shutdown::{FutureExt, NestedSubsystem, SubsystemHandle};
pub struct AdminController {
idx: Option<ManagedVecUSize>,
globals: Globals,

admctrl_rx: AdminControllerRx,
admctrl_tx: AdminControllerTx,
netmon_tx: NetMonTx,
workdirs: Workdirs,

wd_tracking: AutoSizeVec<WorkdirTracking>,
port_tracking: AutoSizeVec<InputPortTracking>,
}

pub type AdminControllerTx = tokio::sync::mpsc::Sender<AdminControllerMsg>;
pub type AdminControllerRx = tokio::sync::mpsc::Receiver<AdminControllerMsg>;

#[derive(Default, Debug)]
#[derive(Default)]
struct WorkdirTracking {
last_read_config: Option<WorkdirProxyConfig>,
shell_worker_tx: Option<AdminControllerTx>,
shell_worker_handle: Option<NestedSubsystem>, // Set when the shell_worker is started.
}

impl WorkdirTracking {
pub fn new() -> Self {
Self {
last_read_config: None,
shell_worker_tx: None,
shell_worker_handle: None,
}
}
}
impl std::fmt::Debug for WorkdirTracking {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WorkdirTracking")
// NestedSubsystem does not implement Debug
.field("last_read_config", &self.last_read_config)
.finish()
}
}

#[derive(Default)]
Expand All @@ -56,6 +81,15 @@ struct InputPortTracking {
port_number: u16, // port number used when the proxy_server was started.
}

impl std::fmt::Debug for InputPortTracking {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WorkdirTracking")
// NestedSubsystem does not implement Debug
.field("port_number", &self.port_number)
.finish()
}
}

pub struct AdminControllerMsg {
// Message sent toward the AdminController from various sources.
pub event_id: AdminControllerEventID,
Expand Down Expand Up @@ -92,19 +126,18 @@ impl std::fmt::Debug for AdminControllerMsg {
pub type AdminControllerEventID = u8;
pub const EVENT_NOTIF_CONFIG_FILE_CHANGE: u8 = 1;
pub const EVENT_DEBUG_PRINT: u8 = 2;
pub const EVENT_SHELL_EXEC: u8 = 3;

impl AdminController {
pub fn new(
globals: Globals,
workdirs: Workdirs,
admctrl_rx: AdminControllerRx,
admctrl_tx: AdminControllerTx,
netmon_tx: NetMonTx,
) -> Self {
Self {
idx: None,
globals,
workdirs,
admctrl_rx,
admctrl_tx,
netmon_tx,
Expand All @@ -113,6 +146,44 @@ impl AdminController {
}
}

async fn process_shell_exec_msg(&mut self, msg: AdminControllerMsg, subsys: &SubsystemHandle) {
// Simply forward to the proper ShellWorker (one worker per workdir).
if msg.event_id != EVENT_SHELL_EXEC {
log::error!("Unexpected event_id {:?}", msg.event_id);
// Do nothing. Consume the message.
return;
}

if msg.workdir_idx.is_none() {
log::error!("EVENT_SHELL_EXEC missing workdir_idx");
return;
}
let workdir_idx = msg.workdir_idx.unwrap();

// Find the corresponding ShellWorker in wd_tracking using the workdir_idx.
let wd_tracking = self.wd_tracking.get_mut(workdir_idx);

// Instantiate and start the ShellWorker if not already done.
if wd_tracking.shell_worker_handle.is_none() {
let (shell_worker_tx, shell_worker_rx) = tokio::sync::mpsc::channel(100);
wd_tracking.shell_worker_tx = Some(shell_worker_tx);
let shell_worker =
ShellWorker::new(self.globals.clone(), shell_worker_rx, Some(workdir_idx));
wd_tracking.shell_worker_handle =
Some(subsys.start("proxy-server", move |a| shell_worker.run(a)));
}

if wd_tracking.shell_worker_tx.is_none() {
log::error!("EVENT_SHELL_EXEC missing shell_worker_tx");
return;
}
let shell_worker_tx = wd_tracking.shell_worker_tx.as_ref().unwrap();

// Forward the message to the ShellWorker.
// TODO Error handling
shell_worker_tx.send(msg).await.unwrap();
}

async fn process_debug_print_msg(&mut self, msg: AdminControllerMsg) {
// Send a response to the return channel with the debug print of a few
// relevant internal states, particularly the configuration tracking.
Expand Down Expand Up @@ -204,7 +275,7 @@ impl AdminController {
let workdir_idx: u8;
let workdir_name: String;
{
let workdirs_guard = self.workdirs.read().await;
let workdirs_guard = self.globals.workdirs.read().await;
let workdirs = &*workdirs_guard;

let workdir_search_result = workdirs.find_workdir(&path);
Expand All @@ -217,7 +288,7 @@ impl AdminController {
workdir_idx = found_workdir_idx;
workdir_name = workdir.name().to_string();

// Load the 3xsuibase.yaml. The default, common and user version in order.
// Load the 3 suibase.yaml files. The default, common and user version in order.
let try_load = workdir_config
.load_and_merge_from_file(&workdir.suibase_yaml_default().to_string_lossy());
if try_load.is_err() {
Expand Down Expand Up @@ -261,7 +332,7 @@ impl AdminController {
// Apply the configuration to the globals.
let config_applied: Option<(ManagedVecUSize, u16)> = {
// Get a write lock on the globals.
let mut globals_guard = self.globals.write().await;
let mut globals_guard = self.globals.proxy.write().await;
let globals = &mut *globals_guard;

// Apply the config to add/modify the related InputPort in the globals (as needed).
Expand All @@ -283,7 +354,7 @@ impl AdminController {
Self::apply_workdir_config(input_port, &workdir_config);
Some((port_idx, input_port.port_number()))
} else {
// TODO Verify there is no conflicting port assigment.
// TODO Verify there is no conflicting port assignment.

// No InputPort yet for that workdir... so create it.
let mut input_port =
Expand All @@ -302,7 +373,7 @@ impl AdminController {

if port_tracking.proxy_server_handle.is_none() {
let proxy_server = ProxyServer::new();
let globals = self.globals.clone();
let globals = self.globals.proxy.clone();
let netmon_tx = self.netmon_tx.clone();
port_tracking.proxy_server_handle = Some(subsys.start("proxy-server", move |a| {
proxy_server.run(a, port_idx, globals, netmon_tx)
Expand Down Expand Up @@ -343,6 +414,9 @@ impl AdminController {
EVENT_NOTIF_CONFIG_FILE_CHANGE => {
self.process_config_msg(msg, subsys).await;
}
EVENT_SHELL_EXEC => {
self.process_shell_exec_msg(msg, subsys).await;
}
_ => {
log::error!("Unknown event_id {}", msg.event_id);
}
Expand All @@ -363,7 +437,7 @@ impl AdminController {
// send back to this thread on the AdminController channel.
{
let admctrl_tx = self.admctrl_tx.clone();
let workdirs_watcher = WorkdirsWatcher::new(self.workdirs.clone(), admctrl_tx);
let workdirs_watcher = WorkdirsWatcher::new(self.globals.workdirs.clone(), admctrl_tx);
subsys.start("workdirs-watcher", move |a| workdirs_watcher.run(a));
}

Expand Down Expand Up @@ -394,7 +468,7 @@ fn test_load_config_from_suibase_default() {
// Note: More of a functional test. Suibase need to be installed.

// Test a known "standard" localnet suibase.yaml
let workdirs = SafeWorkdirs::new();
let workdirs = WorkdirsST::new();
let mut path = std::path::PathBuf::from(workdirs.suibase_home());
path.push("scripts");
path.push("defaults");
Expand Down
8 changes: 8 additions & 0 deletions rust/suibase/crates/suibase-daemon/src/api/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
How to modify the API?

(1) def_methods.rs : defines all the API methods and responses.

(2) Implement in one of these files:
- impl_general_api.rs : General interface to Suibase.
- impl_proxy_api.rs : Specific to the proxy/multi-link feature.

33 changes: 27 additions & 6 deletions rust/suibase/crates/suibase-daemon/src/api/api_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,22 @@ use tokio::time::{interval, Duration};
use anyhow::Result;
use tokio_graceful_shutdown::{FutureExt, SubsystemHandle, Toplevel};

use crate::{admin_controller::AdminControllerTx, shared_types::Globals};
use crate::{
admin_controller::AdminControllerTx,
shared_types::{Globals, GlobalsProxyMT, GlobalsStatusMT},
};

use super::GeneralApiServer;
use crate::api::impl_general_api::GeneralApiImpl;

use super::ProxyApiServer;
use crate::api::proxy_api::ProxyApiImpl;
use crate::api::impl_proxy_api::ProxyApiImpl;

use hyper::Method;
use jsonrpsee::server::{AllowHosts, RpcModule, Server, ServerBuilder};
use jsonrpsee::{
core::server::rpc_module::Methods,
server::{AllowHosts, RpcModule, Server, ServerBuilder},
};
use std::net::SocketAddr;
use tower_http::cors::{Any, CorsLayer};

Expand Down Expand Up @@ -108,10 +117,22 @@ impl JSONRPCServer {
.build(SocketAddr::from(([127, 0, 0, 1], 44399)))
.await?;

let proxy_api = ProxyApiImpl::new(self.globals.clone(), self.admctrl_tx.clone());
let methods = proxy_api.into_rpc();
let mut all_methods = Methods::new();

let proxy_api = ProxyApiImpl::new(self.globals.proxy.clone(), self.admctrl_tx.clone());
let proxy_methods = proxy_api.into_rpc();

if let Err(e) = all_methods.merge(proxy_methods) {
log::error!("Error merging proxy_methods: {}", e);
}

let general_api = GeneralApiImpl::new(self.globals.clone(), self.admctrl_tx.clone());
let general_methods = general_api.into_rpc();
if let Err(e) = all_methods.merge(general_methods) {
log::error!("Error merging general_methods: {}", e);
}

let start_result = server.start(methods);
let start_result = server.start(all_methods);

if let Ok(handle) = start_result {
//let addr = server.local_addr()?;
Expand Down
80 changes: 80 additions & 0 deletions rust/suibase/crates/suibase-daemon/src/api/def_header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;

use crate::shared_types::UuidST;

#[serde_as]
#[derive(Clone, Default, Debug, JsonSchema, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Header {
// Header fields
// =============
// - method:
// A string echoing the method of the request.
//
// - key:
// A string echoing one of the "key" parameter of the request (e.g. the workdir requested).
// This field is optional and its interpretation depends on the method.
//
// - data_uuid:
// A sortable hex 64 bytes (UUID v7). Increments with every data modification.
//
// - method_uuid:
// A hex 64 bytes (UUID v4) that changes every time a new generated data_uuid is unexpectedly
// lower than the previous one for this method (e.g. system time went backward) or the PID of
// the process changes. Complements data_uuid for added reliability on various edge cases.
//
pub method: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub method_uuid: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub data_uuid: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub key: Option<String>,
}

// Class to conveniently add UUID versioning to any data structure.
//
// That versioning can be used to initialize the method_uuid and data_uuid fields of a Header

// TODO Implement PartialEq and PartialOrd to use only the uuid field for comparison.
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct Versioned<T> {
uuid: UuidST,
data: T,
}

impl<T: Clone + PartialEq> Versioned<T> {
pub fn new(data: T) -> Self {
Self {
uuid: UuidST::new(),
data,
}
}

// if data is different, then increment version, else no-op.
pub fn set(&mut self, new_data: &T) -> UuidST {
if new_data != &self.data {
self.data = new_data.clone();
self.uuid.increment();
}
self.uuid.clone()
}

// readonly access
pub fn get_data(&self) -> &T {
&self.data
}

pub fn get_uuid(&self) -> &UuidST {
&self.uuid
}

// Write version into a Header structure.
pub fn init_header_uuids(&self, header: &mut Header) {
let (method_uuid, data_uuid) = self.uuid.get();
header.method_uuid = Some(method_uuid.to_string());
header.data_uuid = Some(data_uuid.to_string());
}
}
Loading

0 comments on commit cd75919

Please sign in to comment.