Skip to content

Commit

Permalink
feat(rust): update the postgres schema
Browse files Browse the repository at this point in the history
  • Loading branch information
etorreborre committed Jan 10, 2025
1 parent 0c4d9cb commit ce286dc
Show file tree
Hide file tree
Showing 21 changed files with 139 additions and 397 deletions.
97 changes: 8 additions & 89 deletions implementations/rust/ockam/ockam_api/src/cli_state/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use sysinfo::{Pid, ProcessStatus, ProcessesToUpdate, System};

use crate::cli_state::{random_name, NamedVault, Result};
use crate::cli_state::{CliState, CliStateError};
use crate::cloud::project::Project;
use crate::colors::color_primary;
use crate::config::lookup::InternetAddress;

Expand All @@ -27,17 +26,15 @@ use crate::{fmt_warn, ConnectionStatus};
/// The methods below support the creation and update of local nodes
impl CliState {
/// Create a node, with some optional associated values, and start it
#[instrument(skip_all, fields(node_name = node_name, identity_name = identity_name.clone(), project_name = project_name.clone()
))]
#[instrument(skip_all, fields(node_name = node_name, identity_name = identity_name.clone()))]
pub async fn start_node_with_optional_values(
&self,
node_name: &str,
identity_name: &Option<String>,
project_name: &Option<String>,
tcp_listener: Option<&TcpListener>,
) -> Result<NodeInfo> {
let mut node = self
.create_node_with_optional_values(node_name, identity_name, project_name)
.create_node_with_optional_identity(node_name, identity_name)
.await?;
if node.pid.is_none() {
let pid = process::id();
Expand All @@ -53,17 +50,12 @@ impl CliState {
Ok(node)
}

/// Create a node, with some optional associated values:
///
/// - an identity name. That identity is used by the `NodeManager` to create secure channels
/// - a project name. It is used to create policies on resources provisioned on a node (like a TCP outlet for example)
#[instrument(skip_all, fields(node_name = node_name, identity_name = identity_name.clone(), project_name = project_name.clone()
))]
pub async fn create_node_with_optional_values(
/// Create a node, with an optional identity name. That identity is used by the `NodeManager` to create secure channels
#[instrument(skip_all, fields(node_name = node_name, identity_name = identity_name.clone()))]
pub async fn create_node_with_optional_identity(
&self,
node_name: &str,
identity_name: &Option<String>,
project_name: &Option<String>,
) -> Result<NodeInfo> {
let identity = match identity_name {
Some(name) => self.get_named_identity(name).await?,
Expand All @@ -72,7 +64,6 @@ impl CliState {
let node = self
.create_node_with_identifier(node_name, &identity.identifier())
.await?;
self.set_node_project(node_name, project_name).await?;
Ok(node)
}

Expand Down Expand Up @@ -134,27 +125,6 @@ impl CliState {
Ok(())
}

/// This method can be used to start a local node first
/// then create a project, and associate it to the node
#[instrument(skip_all, fields(node_name = node_name, project_name = project_name.clone()))]
pub async fn set_node_project(
&self,
node_name: &str,
project_name: &Option<String>,
) -> Result<()> {
let project = match project_name {
Some(name) => Some(self.projects().get_project_by_name(name).await?),
None => self.projects().get_default_project().await.ok(),
};

if let Some(project) = project {
self.nodes_repository()
.set_node_project_name(node_name, project.name())
.await?
};
Ok(())
}

/// Remove a node:
///
/// - remove it from the repository
Expand Down Expand Up @@ -381,23 +351,6 @@ impl CliState {
}
}

/// Return the project associated to a node if there is one
#[instrument(skip_all, fields(node_name = node_name))]
pub async fn get_node_project(&self, node_name: &str) -> Result<Project> {
match self
.nodes_repository()
.get_node_project_name(node_name)
.await?
{
Some(project_name) => self.projects().get_project_by_name(&project_name).await,
None => Err(Error::new(
Origin::Api,
Kind::NotFound,
format!("there is no project associated to node {node_name}"),
))?,
}
}

/// Return the stdout log file used by a node
#[instrument(skip_all, fields(node_name = node_name))]
pub fn stdout_logs(&self, node_name: &str) -> Result<PathBuf> {
Expand Down Expand Up @@ -701,7 +654,6 @@ impl NodeInfo {
#[cfg(test)]
mod tests {
use super::*;
use crate::cloud::project::models::ProjectModel;
use crate::config::lookup::InternetAddress;
use std::net::SocketAddr;
use std::str::FromStr;
Expand Down Expand Up @@ -799,57 +751,24 @@ mod tests {
}

#[tokio::test]
async fn test_create_node_with_optional_values() -> Result<()> {
async fn test_create_node_with_optional_identity() -> Result<()> {
let cli = CliState::test().await?;

// a node can be created with just a name
let node = cli
.create_node_with_optional_values("node-1", &None, &None)
.create_node_with_optional_identity("node-1", &None)
.await?;
let result = cli.get_node(&node.name()).await?;
assert_eq!(result.name(), node.name());

// a node can be created with a name and an existing identity
let identity = cli.create_identity_with_name("name").await?;
let node = cli
.create_node_with_optional_values("node-2", &Some(identity.name()), &None)
.create_node_with_optional_identity("node-2", &Some(identity.name()))
.await?;
let result = cli.get_node(&node.name()).await?;
assert_eq!(result.identifier(), identity.identifier());

// a node can be created with a name, an existing identity and an existing project
let project = ProjectModel {
id: "project_id".to_string(),
name: "project_name".to_string(),
space_name: "1".to_string(),
access_route: "".to_string(),
users: vec![],
space_id: "1".to_string(),
identity: None,
project_change_history: None,
authority_access_route: None,
authority_identity: None,
okta_config: None,
kafka_config: None,
version: None,
running: None,
operation_id: None,
user_roles: vec![],
};
cli.projects()
.import_and_store_project(project.clone())
.await?;

let node = cli
.create_node_with_optional_values(
"node-4",
&Some(identity.name()),
&Some(project.name.clone()),
)
.await?;
let result = cli.get_node_project(&node.name()).await?;
assert_eq!(result.name(), &project.name);

Ok(())
}
}
54 changes: 29 additions & 25 deletions implementations/rust/ockam/ockam_api/src/cli_state/spaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,36 +82,40 @@ impl CliState {
mod test {
use super::*;
use crate::cloud::subscription::SubscriptionName;
use ockam_node::database::skip_if_postgres;

#[tokio::test]
async fn test_cli_spaces() -> Result<()> {
let cli = CliState::test().await?;
skip_if_postgres(|| async {
let cli = CliState::test().await?;

// the first created space becomes the default
let space1 = cli
.store_space(
"1",
"name1",
vec!["[email protected]", "[email protected]"],
Some(&Subscription::new(
SubscriptionName::Gold,
false,
None,
None,
None,
)),
)
.await?;
let result = cli.get_default_space().await?;
assert_eq!(result, space1);
// the first created space becomes the default
let space1 = cli
.store_space(
"1",
"name1",
vec!["[email protected]", "[email protected]"],
Some(&Subscription::new(
SubscriptionName::Gold,
false,
None,
None,
None,
)),
)
.await?;
let result = cli.get_default_space().await?;
assert_eq!(result, space1);

// the store method can be used to update a space
let updated_space1 = cli
.store_space("1", "name1", vec!["[email protected]"], None)
.await?;
let result = cli.get_default_space().await?;
assert_eq!(result, updated_space1);
// the store method can be used to update a space
let updated_space1 = cli
.store_space("1", "name1", vec!["[email protected]"], None)
.await?;
let result = cli.get_default_space().await?;
assert_eq!(result, updated_space1);

Ok(())
Ok(())
})
.await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,6 @@ pub trait NodesRepository: Send + Sync + 'static {

/// Unset the process id of a node
async fn set_no_node_pid(&self, node_name: &str) -> Result<()>;

/// Associate a node to a project
async fn set_node_project_name(&self, node_name: &str, project_name: &str) -> Result<()>;

/// Return the name of the project associated to a node
async fn get_node_project_name(&self, node_name: &str) -> Result<Option<String>>;
}

#[async_trait]
Expand Down Expand Up @@ -151,12 +145,4 @@ impl<T: NodesRepository> NodesRepository for AutoRetry<T> {
async fn set_no_node_pid(&self, node_name: &str) -> Result<()> {
retry!(self.wrapped.set_no_node_pid(node_name))
}

async fn set_node_project_name(&self, node_name: &str, project_name: &str) -> Result<()> {
retry!(self.wrapped.set_node_project_name(node_name, project_name))
}

async fn get_node_project_name(&self, node_name: &str) -> Result<Option<String>> {
retry!(self.wrapped.get_node_project_name(node_name))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,6 @@ impl NodesRepository for NodesSqlxDatabase {
sqlx::query("DELETE FROM tcp_outlet_status WHERE node_name = $1").bind(node_name);
query.execute(&mut *transaction).await.void()?;

let query = sqlx::query("DELETE FROM node_project WHERE node_name = $1").bind(node_name);
query.execute(&mut *transaction).await.void()?;

transaction.commit().await.void()
}

Expand Down Expand Up @@ -224,30 +221,6 @@ impl NodesRepository for NodesSqlxDatabase {
let query = query("UPDATE node SET pid=NULL WHERE name = $1").bind(node_name);
query.execute(&*self.database.pool).await.void()
}

async fn set_node_project_name(&self, node_name: &str, project_name: &str) -> Result<()> {
let query = query(
r#"
INSERT INTO node_project (node_name, project_name)
VALUES ($1, $2)
ON CONFLICT (node_name)
DO UPDATE SET project_name = $2"#,
)
.bind(node_name)
.bind(project_name);
Ok(query.execute(&*self.database.pool).await.void()?)
}

async fn get_node_project_name(&self, node_name: &str) -> Result<Option<String>> {
let query =
query("SELECT project_name FROM node_project WHERE node_name = $1").bind(node_name);
let row: Option<AnyRow> = query
.fetch_optional(&*self.database.pool)
.await
.into_core()?;
let project_name: Option<String> = row.map(|r| r.get(0));
Ok(project_name)
}
}

// Database serialization / deserialization
Expand Down Expand Up @@ -420,23 +393,6 @@ mod test {
.await
}

#[tokio::test]
async fn test_node_project() -> Result<()> {
with_dbs(|db| async move {
let repository: Arc<dyn NodesRepository> = Arc::new(NodesSqlxDatabase::new(db));

// a node can be associated to a project name
repository
.set_node_project_name("node_name", "project1")
.await?;
let result = repository.get_node_project_name("node_name").await?;
assert_eq!(result, Some("project1".into()));

Ok(())
})
.await
}

/// HELPERS
async fn create_identity() -> Result<Identifier> {
let identities = identities().await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,12 +635,12 @@ mod test {
SpacesRepository, SpacesSqlxDatabase, UsersRepository, UsersSqlxDatabase,
};
use crate::cloud::enroll::auth0::UserInfo;
use ockam_node::database::with_dbs;
use ockam_node::database::with_sqlite_dbs;
use std::sync::Arc;

#[tokio::test]
async fn test_repository() -> Result<()> {
with_dbs(|db| async move {
with_sqlite_dbs(|db| async move {
let repository: Arc<dyn UsersRepository> = Arc::new(UsersSqlxDatabase::new(db.clone()));
repository
.store_user(&UserInfo {
Expand Down Expand Up @@ -736,7 +736,7 @@ mod test {

#[tokio::test]
async fn test_store_project_space() -> Result<()> {
with_dbs(|db| async move {
with_sqlite_dbs(|db| async move {
let projects_repository: Arc<dyn ProjectsRepository> =
Arc::new(ProjectsSqlxDatabase::new(db.clone()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,13 @@ impl SubscriptionRow {
mod test {
use super::*;
use crate::cloud::subscription::SubscriptionName;
use ockam_node::database::with_dbs;
use ockam_node::database::with_sqlite_dbs;
use std::ops::Add;
use time::ext::NumericalDuration;

#[tokio::test]
async fn test_repository() -> Result<()> {
with_dbs(|db| async move {
with_sqlite_dbs(|db| async move {
let repository = SpacesSqlxDatabase::new(db);

// create and store 2 spaces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,12 @@ impl UserRow {
mod test {
use super::*;

use ockam_node::database::with_dbs;
use ockam_node::database::with_sqlite_dbs;
use std::sync::Arc;

#[tokio::test]
async fn test_repository() -> Result<()> {
with_dbs(|db| async move {
with_sqlite_dbs(|db| async move {
let repository: Arc<dyn UsersRepository> = Arc::new(UsersSqlxDatabase::new(db));

let my_email_address: EmailAddress = "[email protected]".try_into().unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ impl InMemoryNode {
.start_node_with_optional_values(
&defaults.node_name,
&Some(identity_name.to_string()),
&project_name,
Some(&tcp_listener),
)
.await
Expand Down
Loading

0 comments on commit ce286dc

Please sign in to comment.