Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: Reduce MessageBus config to have a single host #200

Merged
merged 1 commit into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Or programmatically in the Optional field of the MessageBusConfig struct. For ex

```go
types.MessageBusConfig{
PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Optional: map[string]string{
"ClientId": "MyClientID",
"Username": "MyUser",
Expand Down Expand Up @@ -110,7 +110,7 @@ Or programmatically in the Optional field of the MessageBusConfig struct. For ex

```go
types.MessageBusConfig{
PublishHost: types.HostInfo{Host: "localhost", Port: 6379, Protocol: "redis"},
Broker: types.HostInfo{Host: "localhost", Port: 6379, Protocol: "redis"},
Optional: map[string]string{
"Password": "MyPassword",
}}
Expand All @@ -130,7 +130,7 @@ var messageBus messaging.MessageClient

var err error
messageBus, err = msgFactory.NewMessageClient(types.MessageBusConfig{
PublishHost: types.HostInfo{
Broker: types.HostInfo{
Host: Configuration.MessageQueue.Host,
Port: Configuration.MessageQueue.Port,
Protocol: Configuration.MessageQueue.Protocol,
Expand Down Expand Up @@ -167,7 +167,7 @@ This code snippet shows how to subscribe to the abstract message bus.

```go
messageBus, err := factory.NewMessageClient(types.MessageBusConfig{
SubscribeHost: types.HostInfo{
Broker: types.HostInfo{
Host: Configuration.MessageQueue.Host,
Port: Configuration.MessageQueue.Port,
Protocol: Configuration.MessageQueue.Protocol,
Expand Down
7 changes: 3 additions & 4 deletions internal/pkg/mqtt/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import (
"strconv"
"testing"

"github.com/edgexfoundry/go-mod-messaging/v3/internal/pkg"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/edgexfoundry/go-mod-messaging/v3/messaging/mqtt"
"github.com/edgexfoundry/go-mod-messaging/v3/pkg/types"
)

Expand All @@ -58,13 +58,13 @@ func TestIntegrationWithMQTTServer(t *testing.T) {
port, err := strconv.ParseInt(urlMQTT.Port(), 10, 0)
require.NoError(t, err, "Unable to parse the port")
configOptions := types.MessageBusConfig{
PublishHost: types.HostInfo{
Broker: types.HostInfo{
Host: urlMQTT.Hostname(),
Port: int(port),
Protocol: urlMQTT.Scheme,
},
Optional: map[string]string{
mqtt.ClientId: "integration-test-client",
pkg.ClientId: "integration-test-client",
},
}

Expand All @@ -82,7 +82,6 @@ func TestIntegrationWithMQTTServer(t *testing.T) {
err = client.Subscribe(topics, make(chan error))
require.NoError(t, err, "Failed to create subscription")
expectedMessage := types.MessageEnvelope{
Checksum: "123",
CorrelationID: "456",
Payload: []byte("Testing the MQTT client"),
ContentType: "application/text",
Expand Down
9 changes: 4 additions & 5 deletions internal/pkg/mqtt/client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package mqtt

import (
"errors"
"fmt"
"math/rand"
"net/url"
Expand Down Expand Up @@ -59,12 +60,10 @@ type MQTTClientOptions struct {
// CreateMQTTClientConfiguration constructs a MQTTClientConfig based on the provided MessageBusConfig.
func CreateMQTTClientConfiguration(messageBusConfig types.MessageBusConfig) (MQTTClientConfig, error) {
var brokerUrl string
if !messageBusConfig.PublishHost.IsHostInfoEmpty() {
brokerUrl = messageBusConfig.PublishHost.GetHostURL()
} else if !messageBusConfig.SubscribeHost.IsHostInfoEmpty() {
brokerUrl = messageBusConfig.SubscribeHost.GetHostURL()
if !messageBusConfig.Broker.IsHostInfoEmpty() {
brokerUrl = messageBusConfig.Broker.GetHostURL()
} else {
return MQTTClientConfig{}, fmt.Errorf("Specified empty broker info.")
return MQTTClientConfig{}, errors.New("broker info not specified")
}

_, err := url.Parse(brokerUrl)
Expand Down
12 changes: 6 additions & 6 deletions internal/pkg/mqtt/client_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestCreateMQTTClientConfiguration(t *testing.T) {
{
"Successfully load all configurations",
args{types.MessageBusConfig{
PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Optional: map[string]string{
pkg.Username: "TestUser",
pkg.Password: "TestPassword",
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestCreateMQTTClientConfiguration(t *testing.T) {
{
"Does not over write host configuration with optional properties",
args{types.MessageBusConfig{
PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Optional: map[string]string{
pkg.Username: "TestUser",
pkg.Password: "TestPassword",
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestCreateMQTTClientConfiguration(t *testing.T) {
{
"Invalid URL",
args{types.MessageBusConfig{
PublishHost: types.HostInfo{Host: " ", Port: 999999999999, Protocol: " "},
Broker: types.HostInfo{Host: " ", Port: 999999999999, Protocol: " "},
Optional: map[string]string{
// Other valid configurations
"ClientId": "TestClientID",
Expand All @@ -105,7 +105,7 @@ func TestCreateMQTTClientConfiguration(t *testing.T) {
{
"Invalid Int",
args{types.MessageBusConfig{
PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Optional: map[string]string{
"KeepAlive": "abc",
// Other valid configurations
Expand All @@ -116,7 +116,7 @@ func TestCreateMQTTClientConfiguration(t *testing.T) {
{
"Invalid Bool",
args{types.MessageBusConfig{
PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Optional: map[string]string{
"Retained": "abc",
}}},
Expand All @@ -126,7 +126,7 @@ func TestCreateMQTTClientConfiguration(t *testing.T) {
{
"Unknown configuration",
args{types.MessageBusConfig{
PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Optional: map[string]string{
pkg.Username: "TestUser",
pkg.Password: "TestPassword",
Expand Down
26 changes: 14 additions & 12 deletions internal/pkg/mqtt/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,17 @@ var SslHostInfo = types.HostInfo{Host: "localhost", Protocol: "ssl", Port: 8883}

// TestMessageBusConfig defines a simple configuration used for testing successful options parsing.
var TestMessageBusConfig = types.MessageBusConfig{
PublishHost: TcpsHostInfo,
Optional: OptionalPropertiesNoTls,
Broker: TcpsHostInfo,
Optional: OptionalPropertiesNoTls,
}
var TestMessageBusConfigTlsCreate = types.MessageBusConfig{
PublishHost: TlsHostInfo,
Optional: OptionalPropertiesCertCreate,
Broker: TlsHostInfo,
Optional: OptionalPropertiesCertCreate,
}

var TestMessageBusConfigTlsLoad = types.MessageBusConfig{
PublishHost: TlsHostInfo,
Optional: OptionalPropertiesCertLoad,
Broker: TlsHostInfo,
Optional: OptionalPropertiesCertLoad,
}

// MockToken implements Token and gives control over the information returned to the caller of the various
Expand Down Expand Up @@ -279,7 +279,7 @@ func mockClientCreator(connect MockToken, publish MockToken, subscribe MockToken
}

func TestInvalidClientOptions(t *testing.T) {
invalidOptions := types.MessageBusConfig{PublishHost: types.HostInfo{
invalidOptions := types.MessageBusConfig{Broker: types.HostInfo{
Host: " ",
Port: 0,
Protocol: " ",
Expand All @@ -292,7 +292,7 @@ func TestInvalidClientOptions(t *testing.T) {

func TestInvalidTlsOptions(t *testing.T) {
options := types.MessageBusConfig{
PublishHost: TlsHostInfo,
Broker: TlsHostInfo,
}
tests := []struct {
name string
Expand Down Expand Up @@ -582,8 +582,8 @@ func TestClientCreatorTLS(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
client, _ := NewMQTTClientWithCreator(
types.MessageBusConfig{
PublishHost: test.hostConfig,
Optional: test.optionalConfig,
Broker: test.hostConfig,
Optional: test.optionalConfig,
},
json.Marshal,
json.Unmarshal,
Expand All @@ -593,7 +593,9 @@ func TestClientCreatorTLS(t *testing.T) {
err := client.Connect()

// Expecting a connect error since creating mqtt client now at the beginning of the Connect() function
if err != nil && strings.Contains(err.Error(), "connect: connection refused") {
if err != nil &&
(strings.Contains(err.Error(), "connect: connection refused") || // Linux
strings.Contains(err.Error(), "Unable to connect")) { // Windows
err = nil
}

Expand Down Expand Up @@ -734,7 +736,7 @@ func TestClientCreatorTlsCreatorError(t *testing.T) {
}

func TestInvalidClientOptionsWithCreator(t *testing.T) {
invalidOptions := types.MessageBusConfig{PublishHost: types.HostInfo{
invalidOptions := types.MessageBusConfig{Broker: types.HostInfo{
Host: " ",
Port: 0,
Protocol: " ",
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/nats/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestClient_Subscribe(t *testing.T) {
}

func TestNewClientWithConnectionFactory(t *testing.T) {
cfg := types.MessageBusConfig{SubscribeHost: types.HostInfo{Host: "xyz", Protocol: "tcp", Port: 50}, Optional: map[string]string{}}
cfg := types.MessageBusConfig{Broker: types.HostInfo{Host: "xyz", Protocol: "tcp", Port: 50}, Optional: map[string]string{}}
var connector ConnectNats

t.Run("no connector", func(t *testing.T) {
Expand Down
9 changes: 4 additions & 5 deletions internal/pkg/nats/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"net/url"
"os"
Expand Down Expand Up @@ -70,12 +71,10 @@ type ClientOptions struct {
// CreateClientConfiguration constructs a ClientConfig based on the provided MessageBusConfig.
func CreateClientConfiguration(messageBusConfig types.MessageBusConfig) (ClientConfig, error) {
var brokerUrl string
if !messageBusConfig.PublishHost.IsHostInfoEmpty() {
brokerUrl = messageBusConfig.PublishHost.GetHostURL()
} else if !messageBusConfig.SubscribeHost.IsHostInfoEmpty() {
brokerUrl = messageBusConfig.SubscribeHost.GetHostURL()
if !messageBusConfig.Broker.IsHostInfoEmpty() {
brokerUrl = messageBusConfig.Broker.GetHostURL()
} else {
return ClientConfig{}, fmt.Errorf("neither a PublishHost or a SubscribeHost has been configured.")
return ClientConfig{}, errors.New("broker information no specified")
}

_, err := url.Parse(brokerUrl)
Expand Down
12 changes: 6 additions & 6 deletions internal/pkg/nats/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestCreateClientConfiguration(t *testing.T) {
{
"Successfully load all configurations",
args{types.MessageBusConfig{
PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Optional: map[string]string{
pkg.Username: "TestUser",
pkg.Password: "TestPassword",
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestCreateClientConfiguration(t *testing.T) {
{
"Does not over write host configuration with optional properties",
args{types.MessageBusConfig{
PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Optional: map[string]string{
pkg.Username: "TestUser",
pkg.Password: "TestPassword",
Expand All @@ -106,7 +106,7 @@ func TestCreateClientConfiguration(t *testing.T) {
{
"Invalid URL",
args{types.MessageBusConfig{
PublishHost: types.HostInfo{Host: " ", Port: 999999999999, Protocol: " "},
Broker: types.HostInfo{Host: " ", Port: 999999999999, Protocol: " "},
Optional: map[string]string{
// Other valid configurations
"ClientId": "TestClientID",
Expand All @@ -117,7 +117,7 @@ func TestCreateClientConfiguration(t *testing.T) {
{
"Invalid Int",
args{types.MessageBusConfig{
PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Optional: map[string]string{
pkg.ConnectTimeout: "abc",
// Other valid configurations
Expand All @@ -128,7 +128,7 @@ func TestCreateClientConfiguration(t *testing.T) {
{
"Invalid Bool",
args{types.MessageBusConfig{
PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Optional: map[string]string{
pkg.RetryOnFailedConnect: "abc",
}}},
Expand All @@ -138,7 +138,7 @@ func TestCreateClientConfiguration(t *testing.T) {
{
"Unknown configuration",
args{types.MessageBusConfig{
PublishHost: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Optional: map[string]string{
pkg.Username: "TestUser",
pkg.Password: "TestPassword",
Expand Down
Loading