Skip to content

Commit

Permalink
refactor!: Rework MessageBus configuration for all services to use co…
Browse files Browse the repository at this point in the history
…nsitantly (#410)

BREAKING CHANGE: MessageQueue renamed to MessageBus and fields changed. See v3 Migration guide.

Signed-off-by: Leonard Goodell <[email protected]>
  • Loading branch information
Lenny Goodell authored Jan 10, 2023
1 parent cd249ec commit 3599fd1
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 41 deletions.
39 changes: 24 additions & 15 deletions bootstrap/handlers/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,19 @@ func MessagingBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupT
lc := container.LoggingClientFrom(dic.Get)
configuration := container.ConfigurationFrom(dic.Get)

messageQueue := configuration.GetBootstrap().MessageQueue
if len(messageQueue.Host) == 0 {
lc.Error("MessageQueue configuration not set or missing from service's GetBootstrap() implementation")
messageBus := configuration.GetBootstrap().MessageBus
if messageBus.Disabled {
lc.Info("MessageBus is disabled in configuration, skipping setup.")
return true
}

if len(messageBus.Host) == 0 {
lc.Error("MessageBus configuration not set or missing from service's GetBootstrap() implementation")
return false
}

// Make sure the MessageBus password is not leaked into the Service Config that can be retrieved via the /config endpoint
messageBusInfo := deepCopy(messageQueue)
messageBusInfo := deepCopy(messageBus)

if len(messageBusInfo.AuthMode) > 0 &&
!strings.EqualFold(strings.TrimSpace(messageBusInfo.AuthMode), boostrapMessaging.AuthModeNone) {
Expand Down Expand Up @@ -97,12 +102,12 @@ func MessagingBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupT
})

lc.Infof(
"Connected to %s Message Bus @ %s://%s:%d publishing on '%s' prefix topic with AuthMode='%s'",
"Connected to %s Message Bus @ %s://%s:%d using topics [%v] with AuthMode='%s'",
messageBusInfo.Type,
messageBusInfo.Protocol,
messageBusInfo.Host,
messageBusInfo.Port,
messageBusInfo.PublishTopicPrefix,
messageBusInfo.Topics,
messageBusInfo.AuthMode)

return true
Expand All @@ -115,16 +120,20 @@ func MessagingBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupT

func deepCopy(target config.MessageBusInfo) config.MessageBusInfo {
result := config.MessageBusInfo{
Type: target.Type,
Protocol: target.Protocol,
Host: target.Host,
Port: target.Port,
PublishTopicPrefix: target.PublishTopicPrefix,
SubscribeTopic: target.SubscribeTopic,
AuthMode: target.AuthMode,
SecretName: target.SecretName,
SubscribeEnabled: target.SubscribeEnabled,
Disabled: target.Disabled,
Type: target.Type,
Protocol: target.Protocol,
Host: target.Host,
Port: target.Port,
AuthMode: target.AuthMode,
SecretName: target.SecretName,
}

result.Topics = make(map[string]string)
for key, value := range target.Topics {
result.Topics[key] = value
}

result.Optional = make(map[string]string)
for key, value := range target.Optional {
result.Optional[key] = value
Expand Down
24 changes: 13 additions & 11 deletions bootstrap/handlers/messaging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ func TestMain(m *testing.M) {

func TestBootstrapHandler(t *testing.T) {
validCreateClientSecure := config.MessageBusInfo{
Type: messaging.Redis,
Protocol: "redis",
Host: "localhost",
Port: 6379,
PublishTopicPrefix: "edgex/events/#",
AuthMode: boostrapMessaging.AuthModeUsernamePassword,
SecretName: "redisdb",
Type: messaging.Redis,
Protocol: "redis",
Host: "localhost",
Port: 6379,
Topics: map[string]string{
config.MessageBusPublishTopicPrefix: "edgex/events/#",
},
AuthMode: boostrapMessaging.AuthModeUsernamePassword,
SecretName: "redisdb",
}

validCreateClientNonSecure := validCreateClientSecure
Expand All @@ -68,7 +70,7 @@ func TestBootstrapHandler(t *testing.T) {

tests := []struct {
Name string
MessageQueue config.MessageBusInfo
MessageBus config.MessageBusInfo
Secure bool
ExpectedResult bool
ExpectClient bool
Expand All @@ -82,10 +84,10 @@ func TestBootstrapHandler(t *testing.T) {
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
providerMock := &mocks.SecretProvider{}
providerMock.On("GetSecret", test.MessageQueue.SecretName).Return(usernameSecretData, nil)
providerMock.On("GetSecret", test.MessageBus.SecretName).Return(usernameSecretData, nil)
configMock := &mocks.Configuration{}
configMock.On("GetBootstrap").Return(config.BootstrapConfiguration{
MessageQueue: test.MessageQueue,
MessageBus: test.MessageBus,
})

dic.Update(di.ServiceConstructorMap{
Expand All @@ -102,7 +104,7 @@ func TestBootstrapHandler(t *testing.T) {

actual := MessagingBootstrapHandler(context.Background(), &sync.WaitGroup{}, startup.NewTimer(1, 1), dic)
assert.Equal(t, test.ExpectedResult, actual)
assert.Empty(t, test.MessageQueue.Optional)
assert.Empty(t, test.MessageBus.Optional)
if test.ExpectClient {
assert.NotNil(t, container.MessagingClientFrom(dic.Get))
} else {
Expand Down
28 changes: 13 additions & 15 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ import (
"github.com/edgexfoundry/go-mod-secrets/v3/pkg/types"
)

const DefaultHttpProtocol = "http"
const (
DefaultHttpProtocol = "http"
MessageBusPublishTopicPrefix = "PublishTopicPrefix"
MessageBusSubscribeTopic = "SubscribeTopic"
)

// ServiceInfo contains configuration settings necessary for the basic operation of any EdgeX service.
type ServiceInfo struct {
Expand Down Expand Up @@ -191,12 +195,14 @@ type BootstrapConfiguration struct {
Config ConfigProviderInfo
Registry RegistryInfo
SecretStore SecretStoreInfo
MessageQueue MessageBusInfo
MessageBus MessageBusInfo
ExternalMQTT ExternalMQTTInfo
}

// MessageBusInfo provides parameters related to connecting to a message bus as a publisher
// MessageBusInfo provides parameters related to connecting to the EdgeX MessageBus
type MessageBusInfo struct {
// Disabled indicates if the use of the EdgeX MessageBus is disabled.
Disabled bool
// Indicates the message bus implementation to use, i.e. zero, mqtt, redisstreams...
Type string
// Protocol indicates the protocol to use when accessing the message bus.
Expand All @@ -205,28 +211,20 @@ type MessageBusInfo struct {
Host string
// Port defines the port on which to access the message bus.
Port int
// PublishTopicPrefix indicates the topic prefix the data is published to.
PublishTopicPrefix string
// SubscribeTopic indicates the topic in which to subscribe.
SubscribeTopic string
// AuthMode specifies the type of secure connection to the message bus which are 'none', 'usernamepassword'
// 'clientcert' or 'cacert'. Not all option supported by each implementation.
// ZMQ doesn't support any Authmode beyond 'none', RedisStreams only supports 'none' & 'usernamepassword'
// while MQTT supports all options.
// RedisStreams only supports 'none' & 'usernamepassword' while MQTT and NATS support all options.
AuthMode string
// SecretName is the name of the secret in the SecretStore that contains the Auth Credentials. The credential are
// dynamically loaded using this name and store the Option property below where the implementation expected to
// find them.
SecretName string
// Topics allows MessageBusInfo to be more flexible with respect to topics.
Topics map[string]string
// Provides additional configuration properties which do not fit within the existing field.
// Typically the key is the name of the configuration property and the value is a string representation of the
// Typically, the key is the name of the configuration property and the value is a string representation of the
// desired value for the configuration property.
Optional map[string]string
// SubscribeEnabled indicates whether enable the subscription to the Message Queue
SubscribeEnabled bool
// Topics allows MessageBusInfo to be more flexible with respect to topics.
// TODO: move PublishTopicPrefix and SubscribeTopic to Topics in EdgeX 3.0
Topics map[string]string
}

type ExternalMQTTInfo struct {
Expand Down

0 comments on commit 3599fd1

Please sign in to comment.