From e8e57f3b0af30f535f07ddfc2349a27bb1632bae Mon Sep 17 00:00:00 2001 From: Leonard Goodell Date: Sun, 4 Dec 2022 19:39:39 -0700 Subject: [PATCH] refactor!: Reduce MessageBus config to have a single host BREAKING CHANGE: Configuration now only needs single broker host info closes #196 Signed-off-by: Leonard Goodell --- README.md | 8 +-- internal/pkg/mqtt/client_integration_test.go | 7 +-- internal/pkg/mqtt/client_options.go | 9 ++- internal/pkg/mqtt/client_options_test.go | 12 ++-- internal/pkg/mqtt/client_test.go | 26 ++++---- internal/pkg/nats/client_test.go | 2 +- internal/pkg/nats/options.go | 9 ++- internal/pkg/nats/options_test.go | 12 ++-- internal/pkg/redis/client.go | 61 ++++++------------- internal/pkg/redis/client_integration_test.go | 4 +- internal/pkg/redis/client_test.go | 50 +++++++-------- messaging/factory.go | 4 +- messaging/factory_nats_test.go | 6 +- messaging/factory_no_nats_test.go | 12 ++-- messaging/factory_test.go | 6 +- messaging/mqtt/configuration_test.go | 2 +- pkg/types/host_info.go | 2 +- pkg/types/types.go | 10 ++- 18 files changed, 107 insertions(+), 135 deletions(-) diff --git a/README.md b/README.md index 6ae5ee13..d251c284 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ Or programmatically in the Optional field of the MessageBusConfig struct. For ex ```go types.MessageBusConfig{ - PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, + Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, Optional: map[string]string{ "ClientId": "MyClientID", "Username": "MyUser", @@ -110,7 +110,7 @@ Or programmatically in the Optional field of the MessageBusConfig struct. For ex ```go types.MessageBusConfig{ - PublishHost: types.HostInfo{Host: "localhost", Port: 6379, Protocol: "redis"}, + Broker: types.HostInfo{Host: "localhost", Port: 6379, Protocol: "redis"}, Optional: map[string]string{ "Password": "MyPassword", }} @@ -130,7 +130,7 @@ var messageBus messaging.MessageClient var err error messageBus, err = msgFactory.NewMessageClient(types.MessageBusConfig{ - PublishHost: types.HostInfo{ + Broker: types.HostInfo{ Host: Configuration.MessageQueue.Host, Port: Configuration.MessageQueue.Port, Protocol: Configuration.MessageQueue.Protocol, @@ -167,7 +167,7 @@ This code snippet shows how to subscribe to the abstract message bus. ```go messageBus, err := factory.NewMessageClient(types.MessageBusConfig{ - SubscribeHost: types.HostInfo{ + Broker: types.HostInfo{ Host: Configuration.MessageQueue.Host, Port: Configuration.MessageQueue.Port, Protocol: Configuration.MessageQueue.Protocol, diff --git a/internal/pkg/mqtt/client_integration_test.go b/internal/pkg/mqtt/client_integration_test.go index 4f4ff25c..7ad18d92 100644 --- a/internal/pkg/mqtt/client_integration_test.go +++ b/internal/pkg/mqtt/client_integration_test.go @@ -34,10 +34,10 @@ import ( "strconv" "testing" + "github.com/edgexfoundry/go-mod-messaging/v3/internal/pkg" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/edgexfoundry/go-mod-messaging/v3/messaging/mqtt" "github.com/edgexfoundry/go-mod-messaging/v3/pkg/types" ) @@ -58,13 +58,13 @@ func TestIntegrationWithMQTTServer(t *testing.T) { port, err := strconv.ParseInt(urlMQTT.Port(), 10, 0) require.NoError(t, err, "Unable to parse the port") configOptions := types.MessageBusConfig{ - PublishHost: types.HostInfo{ + Broker: types.HostInfo{ Host: urlMQTT.Hostname(), Port: int(port), Protocol: urlMQTT.Scheme, }, Optional: map[string]string{ - mqtt.ClientId: "integration-test-client", + pkg.ClientId: "integration-test-client", }, } @@ -82,7 +82,6 @@ func TestIntegrationWithMQTTServer(t *testing.T) { err = client.Subscribe(topics, make(chan error)) require.NoError(t, err, "Failed to create subscription") expectedMessage := types.MessageEnvelope{ - Checksum: "123", CorrelationID: "456", Payload: []byte("Testing the MQTT client"), ContentType: "application/text", diff --git a/internal/pkg/mqtt/client_options.go b/internal/pkg/mqtt/client_options.go index 93c74ecc..eb210084 100644 --- a/internal/pkg/mqtt/client_options.go +++ b/internal/pkg/mqtt/client_options.go @@ -15,6 +15,7 @@ package mqtt import ( + "errors" "fmt" "math/rand" "net/url" @@ -59,12 +60,10 @@ type MQTTClientOptions struct { // CreateMQTTClientConfiguration constructs a MQTTClientConfig based on the provided MessageBusConfig. func CreateMQTTClientConfiguration(messageBusConfig types.MessageBusConfig) (MQTTClientConfig, error) { var brokerUrl string - if !messageBusConfig.PublishHost.IsHostInfoEmpty() { - brokerUrl = messageBusConfig.PublishHost.GetHostURL() - } else if !messageBusConfig.SubscribeHost.IsHostInfoEmpty() { - brokerUrl = messageBusConfig.SubscribeHost.GetHostURL() + if !messageBusConfig.Broker.IsHostInfoEmpty() { + brokerUrl = messageBusConfig.Broker.GetHostURL() } else { - return MQTTClientConfig{}, fmt.Errorf("Specified empty broker info.") + return MQTTClientConfig{}, errors.New("broker info not specified") } _, err := url.Parse(brokerUrl) diff --git a/internal/pkg/mqtt/client_options_test.go b/internal/pkg/mqtt/client_options_test.go index ca04e70c..76c77b76 100644 --- a/internal/pkg/mqtt/client_options_test.go +++ b/internal/pkg/mqtt/client_options_test.go @@ -37,7 +37,7 @@ func TestCreateMQTTClientConfiguration(t *testing.T) { { "Successfully load all configurations", args{types.MessageBusConfig{ - PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, + Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, Optional: map[string]string{ pkg.Username: "TestUser", pkg.Password: "TestPassword", @@ -66,7 +66,7 @@ func TestCreateMQTTClientConfiguration(t *testing.T) { { "Does not over write host configuration with optional properties", args{types.MessageBusConfig{ - PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, + Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, Optional: map[string]string{ pkg.Username: "TestUser", pkg.Password: "TestPassword", @@ -94,7 +94,7 @@ func TestCreateMQTTClientConfiguration(t *testing.T) { { "Invalid URL", args{types.MessageBusConfig{ - PublishHost: types.HostInfo{Host: " ", Port: 999999999999, Protocol: " "}, + Broker: types.HostInfo{Host: " ", Port: 999999999999, Protocol: " "}, Optional: map[string]string{ // Other valid configurations "ClientId": "TestClientID", @@ -105,7 +105,7 @@ func TestCreateMQTTClientConfiguration(t *testing.T) { { "Invalid Int", args{types.MessageBusConfig{ - PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, + Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, Optional: map[string]string{ "KeepAlive": "abc", // Other valid configurations @@ -116,7 +116,7 @@ func TestCreateMQTTClientConfiguration(t *testing.T) { { "Invalid Bool", args{types.MessageBusConfig{ - PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, + Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, Optional: map[string]string{ "Retained": "abc", }}}, @@ -126,7 +126,7 @@ func TestCreateMQTTClientConfiguration(t *testing.T) { { "Unknown configuration", args{types.MessageBusConfig{ - PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, + Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, Optional: map[string]string{ pkg.Username: "TestUser", pkg.Password: "TestPassword", diff --git a/internal/pkg/mqtt/client_test.go b/internal/pkg/mqtt/client_test.go index 15f442da..3bcf9794 100644 --- a/internal/pkg/mqtt/client_test.go +++ b/internal/pkg/mqtt/client_test.go @@ -102,17 +102,17 @@ var SslHostInfo = types.HostInfo{Host: "localhost", Protocol: "ssl", Port: 8883} // TestMessageBusConfig defines a simple configuration used for testing successful options parsing. var TestMessageBusConfig = types.MessageBusConfig{ - PublishHost: TcpsHostInfo, - Optional: OptionalPropertiesNoTls, + Broker: TcpsHostInfo, + Optional: OptionalPropertiesNoTls, } var TestMessageBusConfigTlsCreate = types.MessageBusConfig{ - PublishHost: TlsHostInfo, - Optional: OptionalPropertiesCertCreate, + Broker: TlsHostInfo, + Optional: OptionalPropertiesCertCreate, } var TestMessageBusConfigTlsLoad = types.MessageBusConfig{ - PublishHost: TlsHostInfo, - Optional: OptionalPropertiesCertLoad, + Broker: TlsHostInfo, + Optional: OptionalPropertiesCertLoad, } // MockToken implements Token and gives control over the information returned to the caller of the various @@ -279,7 +279,7 @@ func mockClientCreator(connect MockToken, publish MockToken, subscribe MockToken } func TestInvalidClientOptions(t *testing.T) { - invalidOptions := types.MessageBusConfig{PublishHost: types.HostInfo{ + invalidOptions := types.MessageBusConfig{Broker: types.HostInfo{ Host: " ", Port: 0, Protocol: " ", @@ -292,7 +292,7 @@ func TestInvalidClientOptions(t *testing.T) { func TestInvalidTlsOptions(t *testing.T) { options := types.MessageBusConfig{ - PublishHost: TlsHostInfo, + Broker: TlsHostInfo, } tests := []struct { name string @@ -582,8 +582,8 @@ func TestClientCreatorTLS(t *testing.T) { t.Run(test.name, func(t *testing.T) { client, _ := NewMQTTClientWithCreator( types.MessageBusConfig{ - PublishHost: test.hostConfig, - Optional: test.optionalConfig, + Broker: test.hostConfig, + Optional: test.optionalConfig, }, json.Marshal, json.Unmarshal, @@ -593,7 +593,9 @@ func TestClientCreatorTLS(t *testing.T) { err := client.Connect() // Expecting a connect error since creating mqtt client now at the beginning of the Connect() function - if err != nil && strings.Contains(err.Error(), "connect: connection refused") { + if err != nil && + (strings.Contains(err.Error(), "connect: connection refused") || // Linux + strings.Contains(err.Error(), "Unable to connect")) { // Windows err = nil } @@ -734,7 +736,7 @@ func TestClientCreatorTlsCreatorError(t *testing.T) { } func TestInvalidClientOptionsWithCreator(t *testing.T) { - invalidOptions := types.MessageBusConfig{PublishHost: types.HostInfo{ + invalidOptions := types.MessageBusConfig{Broker: types.HostInfo{ Host: " ", Port: 0, Protocol: " ", diff --git a/internal/pkg/nats/client_test.go b/internal/pkg/nats/client_test.go index 21bd2448..41072eb8 100644 --- a/internal/pkg/nats/client_test.go +++ b/internal/pkg/nats/client_test.go @@ -239,7 +239,7 @@ func TestClient_Subscribe(t *testing.T) { } func TestNewClientWithConnectionFactory(t *testing.T) { - cfg := types.MessageBusConfig{SubscribeHost: types.HostInfo{Host: "xyz", Protocol: "tcp", Port: 50}, Optional: map[string]string{}} + cfg := types.MessageBusConfig{Broker: types.HostInfo{Host: "xyz", Protocol: "tcp", Port: 50}, Optional: map[string]string{}} var connector ConnectNats t.Run("no connector", func(t *testing.T) { diff --git a/internal/pkg/nats/options.go b/internal/pkg/nats/options.go index ffa99b97..9fad77e3 100644 --- a/internal/pkg/nats/options.go +++ b/internal/pkg/nats/options.go @@ -22,6 +22,7 @@ import ( "crypto/tls" "crypto/x509" "encoding/pem" + "errors" "fmt" "net/url" "os" @@ -70,12 +71,10 @@ type ClientOptions struct { // CreateClientConfiguration constructs a ClientConfig based on the provided MessageBusConfig. func CreateClientConfiguration(messageBusConfig types.MessageBusConfig) (ClientConfig, error) { var brokerUrl string - if !messageBusConfig.PublishHost.IsHostInfoEmpty() { - brokerUrl = messageBusConfig.PublishHost.GetHostURL() - } else if !messageBusConfig.SubscribeHost.IsHostInfoEmpty() { - brokerUrl = messageBusConfig.SubscribeHost.GetHostURL() + if !messageBusConfig.Broker.IsHostInfoEmpty() { + brokerUrl = messageBusConfig.Broker.GetHostURL() } else { - return ClientConfig{}, fmt.Errorf("neither a PublishHost or a SubscribeHost has been configured.") + return ClientConfig{}, errors.New("broker information no specified") } _, err := url.Parse(brokerUrl) diff --git a/internal/pkg/nats/options_test.go b/internal/pkg/nats/options_test.go index cba30956..e68c5bcf 100644 --- a/internal/pkg/nats/options_test.go +++ b/internal/pkg/nats/options_test.go @@ -43,7 +43,7 @@ func TestCreateClientConfiguration(t *testing.T) { { "Successfully load all configurations", args{types.MessageBusConfig{ - PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, + Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, Optional: map[string]string{ pkg.Username: "TestUser", pkg.Password: "TestPassword", @@ -80,7 +80,7 @@ func TestCreateClientConfiguration(t *testing.T) { { "Does not over write host configuration with optional properties", args{types.MessageBusConfig{ - PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, + Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, Optional: map[string]string{ pkg.Username: "TestUser", pkg.Password: "TestPassword", @@ -106,7 +106,7 @@ func TestCreateClientConfiguration(t *testing.T) { { "Invalid URL", args{types.MessageBusConfig{ - PublishHost: types.HostInfo{Host: " ", Port: 999999999999, Protocol: " "}, + Broker: types.HostInfo{Host: " ", Port: 999999999999, Protocol: " "}, Optional: map[string]string{ // Other valid configurations "ClientId": "TestClientID", @@ -117,7 +117,7 @@ func TestCreateClientConfiguration(t *testing.T) { { "Invalid Int", args{types.MessageBusConfig{ - PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, + Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, Optional: map[string]string{ pkg.ConnectTimeout: "abc", // Other valid configurations @@ -128,7 +128,7 @@ func TestCreateClientConfiguration(t *testing.T) { { "Invalid Bool", args{types.MessageBusConfig{ - PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, + Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, Optional: map[string]string{ pkg.RetryOnFailedConnect: "abc", }}}, @@ -138,7 +138,7 @@ func TestCreateClientConfiguration(t *testing.T) { { "Unknown configuration", args{types.MessageBusConfig{ - PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, + Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"}, Optional: map[string]string{ pkg.Username: "TestUser", pkg.Password: "TestPassword", diff --git a/internal/pkg/redis/client.go b/internal/pkg/redis/client.go index 8e1a8a6b..1fd9acb1 100644 --- a/internal/pkg/redis/client.go +++ b/internal/pkg/redis/client.go @@ -40,15 +40,11 @@ const ( // Client MessageClient implementation which provides functionality for sending and receiving messages using // Redis Pub/Sub. type Client struct { - // Client used for functionality related to reading messages - subscribeClient RedisClient + redisClient RedisClient // Used to avoid multiple subscriptions to the same topic existingTopics map[string]bool mapMutex *sync.Mutex - - // Client used for functionality related to sending messages - publishClient RedisClient } // NewClient creates a new Client based on the provided configuration. @@ -81,30 +77,12 @@ func NewClientWithCreator( return Client{}, err } - var publishClient, subscribeClient RedisClient + var client RedisClient // Create underlying client to use when publishing - if !messageBusConfig.PublishHost.IsHostInfoEmpty() { - publishClient, err = createRedisClient( - messageBusConfig.PublishHost.GetHostURL(), - optionalClientConfiguration, - tlsConfigurationOptions, - creator, - pairCreator, - keyLoader, - caCertCreator, - caCertLoader, - pemDecoder) - - if err != nil { - return Client{}, err - } - } - - // Create underlying client to use when subscribing - if !messageBusConfig.SubscribeHost.IsHostInfoEmpty() { - subscribeClient, err = createRedisClient( - messageBusConfig.SubscribeHost.GetHostURL(), + if !messageBusConfig.Broker.IsHostInfoEmpty() { + client, err = createRedisClient( + messageBusConfig.Broker.GetHostURL(), optionalClientConfiguration, tlsConfigurationOptions, creator, @@ -120,10 +98,9 @@ func NewClientWithCreator( } return Client{ - subscribeClient: subscribeClient, - existingTopics: make(map[string]bool), - mapMutex: new(sync.Mutex), - publishClient: publishClient, + redisClient: client, + existingTopics: make(map[string]bool), + mapMutex: new(sync.Mutex), }, nil } @@ -135,8 +112,8 @@ func (c Client) Connect() error { // Publish sends the provided message to appropriate Redis Pub/Sub. func (c Client) Publish(message types.MessageEnvelope, topic string) error { - if c.publishClient == nil { - return pkg.NewMissingConfigurationErr("PublishHostInfo", "Unable to create a connection for publishing") + if c.redisClient == nil { + return pkg.NewMissingConfigurationErr("Broker", "Unable to create a connection for publishing") } if topic == "" { @@ -146,9 +123,9 @@ func (c Client) Publish(message types.MessageEnvelope, topic string) error { topic = convertToRedisTopicScheme(topic) var err error - if err = c.publishClient.Send(topic, message); err != nil && strings.Contains(err.Error(), "EOF") { + if err = c.redisClient.Send(topic, message); err != nil && strings.Contains(err.Error(), "EOF") { // Redis may have been restarted and the first attempt will fail with EOF, so need to try again - err = c.publishClient.Send(topic, message) + err = c.redisClient.Send(topic, message) } return err @@ -157,8 +134,8 @@ func (c Client) Publish(message types.MessageEnvelope, topic string) error { // Subscribe creates background processes which reads messages from the appropriate Redis Pub/Sub and sends to the // provided channels func (c Client) Subscribe(topics []types.TopicChannel, messageErrors chan error) error { - if c.subscribeClient == nil { - return pkg.NewMissingConfigurationErr("SubscribeHostInfo", "Unable to create a connection for subscribing") + if c.redisClient == nil { + return pkg.NewMissingConfigurationErr("Broker", "Unable to create a connection for subscribing") } err := c.validateTopics(topics) @@ -175,7 +152,7 @@ func (c Client) Subscribe(topics []types.TopicChannel, messageErrors chan error) for { - message, err := c.subscribeClient.Receive(topicName) + message, err := c.redisClient.Receive(topicName) if err != nil { // This handles case when getting same repeated error due to Redis connectivity issue // Avoids starving of other threads/processes and recipient spamming the log file. @@ -204,15 +181,15 @@ func (c Client) Subscribe(topics []types.TopicChannel, messageErrors chan error) // Disconnect closes connections to the Redis server. func (c Client) Disconnect() error { var disconnectErrors []string - if c.publishClient != nil { - err := c.publishClient.Close() + if c.redisClient != nil { + err := c.redisClient.Close() if err != nil { disconnectErrors = append(disconnectErrors, fmt.Sprintf("Unable to disconnect publish client: %v", err)) } } - if c.subscribeClient != nil { - err := c.subscribeClient.Close() + if c.redisClient != nil { + err := c.redisClient.Close() if err != nil { disconnectErrors = append(disconnectErrors, fmt.Sprintf("Unable to disconnect subscribe client: %v", err)) } diff --git a/internal/pkg/redis/client_integration_test.go b/internal/pkg/redis/client_integration_test.go index 429364f2..319375db 100644 --- a/internal/pkg/redis/client_integration_test.go +++ b/internal/pkg/redis/client_integration_test.go @@ -50,13 +50,11 @@ const ( func TestRedisStreamsClientIntegration(t *testing.T) { redisHostInfo := getRedisHostInfo(t) client, err := NewClient(types.MessageBusConfig{ - PublishHost: redisHostInfo, - SubscribeHost: redisHostInfo, + Broker: redisHostInfo, }) require.NoError(t, err, "Failed to create Redis client") testMessage := types.MessageEnvelope{ - Checksum: "abcde", CorrelationID: "12345", Payload: []byte("test-message")} diff --git a/internal/pkg/redis/client_test.go b/internal/pkg/redis/client_test.go index 97c75696..a0b77a49 100644 --- a/internal/pkg/redis/client_test.go +++ b/internal/pkg/redis/client_test.go @@ -62,7 +62,7 @@ func TestNewClient(t *testing.T) { { name: "Successfully create client with optional configuration", messageBusConfig: types.MessageBusConfig{ - PublishHost: HostInfo, + Broker: HostInfo, Optional: map[string]string{ pkg.Password: "Password", }, @@ -73,7 +73,7 @@ func TestNewClient(t *testing.T) { { name: "Invalid Redis Server", messageBusConfig: types.MessageBusConfig{ - PublishHost: types.HostInfo{ + Broker: types.HostInfo{ Host: "!@#$", Port: -1, Protocol: "!@#", @@ -109,8 +109,8 @@ func TestNewClientWithCreator(t *testing.T) { wantErr bool }{ { - name: "Client with Publish Host", - messageBusConfig: types.MessageBusConfig{PublishHost: HostInfo}, + name: "Client with Publish Broker", + messageBusConfig: types.MessageBusConfig{Broker: HostInfo}, creator: mockRedisClientCreator(nil, nil), pairCreator: mockCertCreator(nil), keyLoader: mockCertLoader(nil), @@ -118,15 +118,15 @@ func TestNewClientWithCreator(t *testing.T) { }, { name: "Create publisher error", - messageBusConfig: types.MessageBusConfig{PublishHost: HostInfo}, + messageBusConfig: types.MessageBusConfig{Broker: HostInfo}, creator: mockRedisClientCreator(nil, errors.New("test error")), pairCreator: mockCertCreator(nil), keyLoader: mockCertLoader(nil), wantErr: true, }, { - name: "Client with Subscribe Host", - messageBusConfig: types.MessageBusConfig{SubscribeHost: HostInfo}, + name: "Client with Subscribe Broker", + messageBusConfig: types.MessageBusConfig{Broker: HostInfo}, creator: mockRedisClientCreator(nil, nil), pairCreator: mockCertCreator(nil), keyLoader: mockCertLoader(nil), @@ -134,7 +134,7 @@ func TestNewClientWithCreator(t *testing.T) { }, { name: "Client subscriber error", - messageBusConfig: types.MessageBusConfig{SubscribeHost: HostInfo}, + messageBusConfig: types.MessageBusConfig{Broker: HostInfo}, creator: mockRedisClientCreator(nil, errors.New("test error")), pairCreator: mockCertCreator(nil), keyLoader: mockCertLoader(nil), @@ -142,7 +142,7 @@ func TestNewClientWithCreator(t *testing.T) { }, { name: "Client with optional configuration", - messageBusConfig: types.MessageBusConfig{SubscribeHost: HostInfo, Optional: map[string]string{"Password": "TestPassword"}}, + messageBusConfig: types.MessageBusConfig{Broker: HostInfo, Optional: map[string]string{"Password": "TestPassword"}}, creator: mockRedisClientCreator(nil, nil), pairCreator: mockCertCreator(nil), keyLoader: mockCertLoader(nil), @@ -150,7 +150,7 @@ func TestNewClientWithCreator(t *testing.T) { }, { name: "Client with valid TLS configuration", - messageBusConfig: types.MessageBusConfig{SubscribeHost: HostInfo, Optional: map[string]string{ + messageBusConfig: types.MessageBusConfig{Broker: HostInfo, Optional: map[string]string{ "CertFile": "certFile", "KeyFile": "keyFile", "CertPEMBlock": "certPRMBlock", @@ -163,7 +163,7 @@ func TestNewClientWithCreator(t *testing.T) { }, { name: "Client with valid TLS configuration (cacert file)", - messageBusConfig: types.MessageBusConfig{SubscribeHost: HostInfo, Optional: map[string]string{ + messageBusConfig: types.MessageBusConfig{Broker: HostInfo, Optional: map[string]string{ "CaFile": "caCertFile", }}, creator: mockRedisClientCreator(nil, nil), @@ -174,7 +174,7 @@ func TestNewClientWithCreator(t *testing.T) { }, { name: "Client with valid TLS configuration (cacert PEM block)", - messageBusConfig: types.MessageBusConfig{SubscribeHost: HostInfo, Optional: map[string]string{ + messageBusConfig: types.MessageBusConfig{Broker: HostInfo, Optional: map[string]string{ "CaPEMBlock": "caCertPEMBlock", }}, creator: mockRedisClientCreator(nil, nil), @@ -184,7 +184,7 @@ func TestNewClientWithCreator(t *testing.T) { }, { name: "Client with invalid TLS configuration", - messageBusConfig: types.MessageBusConfig{SubscribeHost: HostInfo, Optional: map[string]string{ + messageBusConfig: types.MessageBusConfig{Broker: HostInfo, Optional: map[string]string{ "SkipCertVerify": "NotABool", }}, creator: mockRedisClientCreator(nil, nil), @@ -194,7 +194,7 @@ func TestNewClientWithCreator(t *testing.T) { }, { name: "Client TLS creation error", - messageBusConfig: types.MessageBusConfig{SubscribeHost: HostInfo, Optional: map[string]string{ + messageBusConfig: types.MessageBusConfig{Broker: HostInfo, Optional: map[string]string{ "CertFile": "certFile", "KeyFile": "keyFile", "CertPEMBlock": "certPRMBlock", @@ -207,7 +207,7 @@ func TestNewClientWithCreator(t *testing.T) { }, { name: "Client TLS creation error - cacert file not found", - messageBusConfig: types.MessageBusConfig{SubscribeHost: HostInfo, Optional: map[string]string{ + messageBusConfig: types.MessageBusConfig{Broker: HostInfo, Optional: map[string]string{ "CaFile": "caCertFile", }}, creator: mockRedisClientCreator(nil, nil), @@ -216,7 +216,7 @@ func TestNewClientWithCreator(t *testing.T) { }, { name: "Client TLS creation error - cacert file without PEM block", - messageBusConfig: types.MessageBusConfig{SubscribeHost: HostInfo, Optional: map[string]string{ + messageBusConfig: types.MessageBusConfig{Broker: HostInfo, Optional: map[string]string{ "CaFile": "caCertFile", }}, creator: mockRedisClientCreator(nil, nil), @@ -226,7 +226,7 @@ func TestNewClientWithCreator(t *testing.T) { }, { name: "Client TLS creation error - invalid cacert", - messageBusConfig: types.MessageBusConfig{SubscribeHost: HostInfo, Optional: map[string]string{ + messageBusConfig: types.MessageBusConfig{Broker: HostInfo, Optional: map[string]string{ "CaPEMBlock": "caCertPEMBlock", }}, creator: mockRedisClientCreator(nil, nil), @@ -330,7 +330,7 @@ func TestClient_Publish(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { c, err := NewClientWithCreator(types.MessageBusConfig{ - PublishHost: HostInfo, + Broker: HostInfo, }, tt.redisClientCreator, mockCertCreator(nil), mockCertLoader(nil), mockCaCertCreator(nil), mockCaCertLoader(nil), mockPemDecoder(&pem.Block{})) @@ -392,7 +392,7 @@ func TestClient_Subscribe(t *testing.T) { // by the Receive method is client specific. c, err := NewClientWithCreator( types.MessageBusConfig{ - SubscribeHost: HostInfo, + Broker: HostInfo, }, mockSubscriptionClientCreator(tt.numberOfMessages, tt.numberOfErrors), mockCertCreator(nil), @@ -591,7 +591,7 @@ func TestClient_Disconnect(t *testing.T) { arg: nil, ret: []interface{}{nil}, }}, nil), - messageBusConfig: types.MessageBusConfig{PublishHost: HostInfo}, + messageBusConfig: types.MessageBusConfig{Broker: HostInfo}, wantErr: false, }, { @@ -602,7 +602,7 @@ func TestClient_Disconnect(t *testing.T) { arg: nil, ret: []interface{}{errors.New("test error")}, }}, nil), - messageBusConfig: types.MessageBusConfig{PublishHost: HostInfo}, + messageBusConfig: types.MessageBusConfig{Broker: HostInfo}, wantErr: true, }, { @@ -613,7 +613,7 @@ func TestClient_Disconnect(t *testing.T) { arg: nil, ret: []interface{}{nil}, }}, nil), - messageBusConfig: types.MessageBusConfig{SubscribeHost: HostInfo}, + messageBusConfig: types.MessageBusConfig{Broker: HostInfo}, wantErr: false, }, { @@ -624,7 +624,7 @@ func TestClient_Disconnect(t *testing.T) { arg: nil, ret: []interface{}{errors.New("test error")}, }}, nil), - messageBusConfig: types.MessageBusConfig{SubscribeHost: HostInfo}, + messageBusConfig: types.MessageBusConfig{Broker: HostInfo}, wantErr: true, }, { @@ -635,7 +635,7 @@ func TestClient_Disconnect(t *testing.T) { arg: nil, ret: []interface{}{nil}, }}, nil), - messageBusConfig: types.MessageBusConfig{PublishHost: HostInfo, SubscribeHost: HostInfo}, + messageBusConfig: types.MessageBusConfig{Broker: HostInfo}, wantErr: false, }, { @@ -646,7 +646,7 @@ func TestClient_Disconnect(t *testing.T) { arg: nil, ret: []interface{}{errors.New("test error")}, }}, nil), - messageBusConfig: types.MessageBusConfig{PublishHost: HostInfo, SubscribeHost: HostInfo}, + messageBusConfig: types.MessageBusConfig{Broker: HostInfo}, wantErr: true, }, } diff --git a/messaging/factory.go b/messaging/factory.go index a36a30d6..86315531 100644 --- a/messaging/factory.go +++ b/messaging/factory.go @@ -48,8 +48,8 @@ const ( // the "Type" from the configuration func NewMessageClient(msgConfig types.MessageBusConfig) (MessageClient, error) { - if msgConfig.PublishHost.IsHostInfoEmpty() && msgConfig.SubscribeHost.IsHostInfoEmpty() { - return nil, fmt.Errorf("unable to create messageClient: host info not set") + if msgConfig.Broker.IsHostInfoEmpty() { + return nil, fmt.Errorf("unable to create messageClient: Broker info not set") } switch lowerMsgType := strings.ToLower(msgConfig.Type); lowerMsgType { diff --git a/messaging/factory_nats_test.go b/messaging/factory_nats_test.go index c9379c14..a0ff32be 100644 --- a/messaging/factory_nats_test.go +++ b/messaging/factory_nats_test.go @@ -29,7 +29,7 @@ import ( ) var natsConfig = types.MessageBusConfig{ - PublishHost: types.HostInfo{ + Broker: types.HostInfo{ Host: "*", Port: 4222, Protocol: "nats", @@ -39,7 +39,7 @@ var natsConfig = types.MessageBusConfig{ func TestNewMessageClientNatsCore(t *testing.T) { messageBusConfig := natsConfig messageBusConfig.Type = NatsCore - messageBusConfig.SubscribeHost = types.HostInfo{Host: uuid.NewString(), Port: 37, Protocol: "nats"} + messageBusConfig.Broker = types.HostInfo{Host: uuid.NewString(), Port: 37, Protocol: "nats"} c, err := NewMessageClient(messageBusConfig) @@ -50,7 +50,7 @@ func TestNewMessageClientNatsCore(t *testing.T) { func TestNewMessageClientNatsJetstream(t *testing.T) { messageBusConfig := natsConfig messageBusConfig.Type = NatsJetStream - messageBusConfig.SubscribeHost = types.HostInfo{Host: uuid.NewString(), Port: 37, Protocol: "nats"} + messageBusConfig.Broker = types.HostInfo{Host: uuid.NewString(), Port: 37, Protocol: "nats"} c, err := NewMessageClient(messageBusConfig) diff --git a/messaging/factory_no_nats_test.go b/messaging/factory_no_nats_test.go index 55ee5d6d..cc15384d 100644 --- a/messaging/factory_no_nats_test.go +++ b/messaging/factory_no_nats_test.go @@ -28,17 +28,17 @@ import ( ) var natsConfig = types.MessageBusConfig{ - PublishHost: types.HostInfo{ - Host: "*", - Port: 4222, - Protocol: "tcp", + Broker: types.HostInfo{ + Host: "localhost", + Port: 6379, + Protocol: "redis", }, } func TestNewMessageClientNatsCore(t *testing.T) { messageBusConfig := natsConfig messageBusConfig.Type = messaging.NatsCore - messageBusConfig.SubscribeHost = types.HostInfo{Host: uuid.NewString(), Port: 37, Protocol: "nats"} + messageBusConfig.Broker = types.HostInfo{Host: uuid.NewString(), Port: 37, Protocol: "nats"} _, err := messaging.NewMessageClient(messageBusConfig) @@ -48,7 +48,7 @@ func TestNewMessageClientNatsCore(t *testing.T) { func TestNewMessageClientNatsJetstream(t *testing.T) { messageBusConfig := natsConfig messageBusConfig.Type = messaging.NatsJetStream - messageBusConfig.SubscribeHost = types.HostInfo{Host: uuid.NewString(), Port: 37, Protocol: "nats"} + messageBusConfig.Broker = types.HostInfo{Host: uuid.NewString(), Port: 37, Protocol: "nats"} _, err := messaging.NewMessageClient(messageBusConfig) diff --git a/messaging/factory_test.go b/messaging/factory_test.go index 3deef68b..0cbaeaaa 100644 --- a/messaging/factory_test.go +++ b/messaging/factory_test.go @@ -24,7 +24,7 @@ import ( ) var msgConfig = types.MessageBusConfig{ - PublishHost: types.HostInfo{ + Broker: types.HostInfo{ Host: "localhost", Port: 6379, Protocol: "redis", @@ -64,8 +64,8 @@ func TestNewMessageClientBogusType(t *testing.T) { func TestNewMessageClientEmptyHostAndPortNumber(t *testing.T) { - msgConfig.PublishHost.Host = "" - msgConfig.PublishHost.Port = 0 + msgConfig.Broker.Host = "" + msgConfig.Broker.Port = 0 _, err := NewMessageClient(msgConfig) if assert.Error(t, err, "Expected message type error") == false { t.Fatal() diff --git a/messaging/mqtt/configuration_test.go b/messaging/mqtt/configuration_test.go index bf5c5d50..346c0592 100644 --- a/messaging/mqtt/configuration_test.go +++ b/messaging/mqtt/configuration_test.go @@ -161,7 +161,7 @@ func TestClientOptionsIntegration(t *testing.T) { Build() messageBusConfig := types.MessageBusConfig{ - PublishHost: types.HostInfo{ + Broker: types.HostInfo{ Host: host, Port: port, Protocol: protocol, diff --git a/pkg/types/host_info.go b/pkg/types/host_info.go index e1ca1daf..80c6e3e0 100644 --- a/pkg/types/host_info.go +++ b/pkg/types/host_info.go @@ -23,7 +23,7 @@ const ( ) // HostInfo is the URL information of the host as the following scheme: -// ://: +// ://: type HostInfo struct { // Host is the hostname or IP address of the messaging broker, if applicable. Host string diff --git a/pkg/types/types.go b/pkg/types/types.go index 16f64f80..1a0e5119 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -27,13 +27,11 @@ type TopicChannel struct { // MessageBusConfig defines the messaging information need to connect to the message bus // in a publish-subscribe pattern type MessageBusConfig struct { - // PublishHost contains the connection information for a publishing on MessageBus - PublishHost HostInfo - // SubscribeHost contains the connection information for a subscribing on MessageBus - SubscribeHost HostInfo + // Broker contains the connection information for publishing and subscribing to the broker for the EdgeX MessageBus + Broker HostInfo // Type indicates the message queue platform being used. eg. "redis" for Redis Pub/Sub Type string - // Optional contains all other properties of message bus that is specific to - // certain concrete implementation like MQTT's QoS, for example + // Optional contains all other properties of message bus that are specific to + // certain concrete implementations like MQTT's QoS, for example. Optional map[string]string }