Skip to content

Commit

Permalink
refactor!: Replace topics from config with new constants (#434)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Topics no longer in configuration

Signed-off-by: Leonard Goodell <[email protected]>
  • Loading branch information
Lenny Goodell authored Feb 14, 2023
1 parent c1043a3 commit 45461fa
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 98 deletions.
7 changes: 2 additions & 5 deletions bootstrap/handlers/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,8 @@ func (cb *ClientsBootstrap) BootstrapHandler(
return false
}

client, err = clientsMessaging.NewCommandClient(messageClient, serviceInfo.Topics, timeout)
if err != nil {
lc.Errorf("Unable to create messaging based Command Client: %v", err)
return false
}
baseTopic := config.GetBootstrap().MessageBus.GetBaseTopicPrefix()
client = clientsMessaging.NewCommandClient(messageClient, baseTopic, timeout)

lc.Infof("Using messaging for '%s' clients", serviceKey)
} else {
Expand Down
25 changes: 2 additions & 23 deletions bootstrap/handlers/clients_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,6 @@ func TestClientsBootstrapHandler(t *testing.T) {

commandMessagingClientInfo := config.ClientInfo{
UseMessageBus: true,
Topics: map[string]string{
"QueryRequestTopic": "queryRequest",
"QueryResponseTopic": "queryResponse",
"CommandRequestTopicPrefix": "commandRequest",
"CommandResponseTopic": "commandResponse",
},
}

notificationClientInfo := config.ClientInfo{
Expand Down Expand Up @@ -297,22 +291,13 @@ func TestCommandMessagingClientErrors(t *testing.T) {
validDuration := "30s"
invalidDuration := "xyz"

validTopics := map[string]string{
"QueryRequestTopic": "queryRequest",
"QueryResponseTopic": "queryResponse",
"CommandRequestTopicPrefix": "commandRequest",
"CommandResponseTopic": "commandResponse",
}

tests := []struct {
Name string
MessagingClientPresent bool
TimeoutDuration string
SubscribeError bool
}{
{"Missing Messaging Client", false, validDuration, false},
{"Error creating Messaging Client", true, validDuration, true},
{"Bad Timeout duration", true, invalidDuration, false},
{"Missing Messaging Client", false, validDuration},
{"Bad Timeout duration", true, invalidDuration},
}

for _, test := range tests {
Expand All @@ -322,16 +307,10 @@ func TestCommandMessagingClientErrors(t *testing.T) {
mockLogger.On("Errorf", mock.Anything, mock.Anything)

mockMessaging := &messagingMocks.MessageClient{}
if test.SubscribeError {
mockMessaging.On("Subscribe", mock.Anything, mock.Anything).Return(errors.New("failed"))
} else {
mockMessaging.On("Subscribe", mock.Anything, mock.Anything).Return(nil)
}

clients := make(map[string]config.ClientInfo)
clients[common.CoreCommandServiceKey] = config.ClientInfo{
UseMessageBus: true,
Topics: validTopics,
}

bootstrapConfig := config.BootstrapConfiguration{
Expand Down
23 changes: 9 additions & 14 deletions bootstrap/handlers/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,11 @@ func MessagingBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupT
})

lc.Infof(
"Connected to %s Message Bus @ %s://%s:%d using topics [%v] with AuthMode='%s'",
"Connected to %s Message Bus @ %s://%s:%d with AuthMode='%s'",
messageBusInfo.Type,
messageBusInfo.Protocol,
messageBusInfo.Host,
messageBusInfo.Port,
messageBusInfo.Topics,
messageBusInfo.AuthMode)

return true
Expand All @@ -120,18 +119,14 @@ func MessagingBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupT

func deepCopy(target config.MessageBusInfo) config.MessageBusInfo {
result := config.MessageBusInfo{
Disabled: target.Disabled,
Type: target.Type,
Protocol: target.Protocol,
Host: target.Host,
Port: target.Port,
AuthMode: target.AuthMode,
SecretName: target.SecretName,
}

result.Topics = make(map[string]string)
for key, value := range target.Topics {
result.Topics[key] = value
Disabled: target.Disabled,
Type: target.Type,
Protocol: target.Protocol,
Host: target.Host,
Port: target.Port,
AuthMode: target.AuthMode,
SecretName: target.SecretName,
BaseTopicPrefix: target.BaseTopicPrefix,
}

result.Optional = make(map[string]string)
Expand Down
11 changes: 4 additions & 7 deletions bootstrap/handlers/messaging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,10 @@ func TestMain(m *testing.M) {

func TestBootstrapHandler(t *testing.T) {
validCreateClientSecure := config.MessageBusInfo{
Type: messaging.Redis,
Protocol: "redis",
Host: "localhost",
Port: 6379,
Topics: map[string]string{
config.MessageBusPublishTopicPrefix: "edgex/events/#",
},
Type: messaging.Redis,
Protocol: "redis",
Host: "localhost",
Port: 6379,
AuthMode: boostrapMessaging.AuthModeUsernamePassword,
SecretName: "redisdb",
}
Expand Down
3 changes: 2 additions & 1 deletion bootstrap/handlers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func (s *ServiceMetrics) BootstrapHandler(ctx context.Context, wg *sync.WaitGrou
interval = math.MaxInt64
}

reporter := metrics.NewMessageBusReporter(lc, s.serviceName, dic, telemetryConfig)
baseTopic := serviceConfig.GetBootstrap().MessageBus.GetBaseTopicPrefix()
reporter := metrics.NewMessageBusReporter(lc, baseTopic, s.serviceName, dic, telemetryConfig)
manager := metrics.NewManager(lc, interval, reporter)

manager.Run(ctx, wg)
Expand Down
8 changes: 4 additions & 4 deletions bootstrap/handlers/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestServiceMetrics_BootstrapHandler(t *testing.T) {

mockMessagingClient := &mocks.MessageClient{}
mockConfiguration := &mocks2.Configuration{}
mockConfiguration.On("GetBootstrap").Return(config.BootstrapConfiguration{})

dic := di.NewContainer(di.ServiceConstructorMap{
container.LoggingClientInterfaceName: func(get di.Get) interface{} {
Expand All @@ -64,10 +65,9 @@ func TestServiceMetrics_BootstrapHandler(t *testing.T) {
})

expectedTelemetryInfo := config.TelemetryInfo{
Interval: test.Interval,
PublishTopicPrefix: "/metrics",
Metrics: make(map[string]bool),
Tags: make(map[string]string),
Interval: test.Interval,
Metrics: make(map[string]bool),
Tags: make(map[string]string),
}

mockConfiguration.On("GetTelemetryInfo").Return(&expectedTelemetryInfo)
Expand Down
30 changes: 14 additions & 16 deletions bootstrap/metrics/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,22 @@ const (
)

type messageBusReporter struct {
lc logger.LoggingClient
serviceName string
dic *di.Container
messageClient messaging.MessageClient
config *config.TelemetryInfo
lc logger.LoggingClient
serviceName string
dic *di.Container
messageClient messaging.MessageClient
config *config.TelemetryInfo
baseMetricsTopic string
}

// NewMessageBusReporter creates a new MessageBus reporter which reports metrics to the EdgeX MessageBus
func NewMessageBusReporter(lc logger.LoggingClient, serviceName string, dic *di.Container, config *config.TelemetryInfo) interfaces.MetricsReporter {
func NewMessageBusReporter(lc logger.LoggingClient, baseTopic string, serviceName string, dic *di.Container, config *config.TelemetryInfo) interfaces.MetricsReporter {
reporter := &messageBusReporter{
lc: lc,
serviceName: serviceName,
dic: dic,
config: config,
lc: lc,
serviceName: serviceName,
dic: dic,
config: config,
baseMetricsTopic: common.BuildTopic(baseTopic, common.MetricsPublishTopic, serviceName),
}

return reporter
Expand Down Expand Up @@ -181,7 +183,7 @@ func (r *messageBusReporter) Report(registry gometrics.Registry, metricTags map[
ContentType: common.ContentTypeJSON,
}

topic := fmt.Sprintf("%s/%s", r.baseTopic(), name)
topic := common.BuildTopic(r.baseMetricsTopic, name)
if err := r.messageClient.Publish(message, topic); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to publish metric '%s' to topic '%s': %s", name, topic, err.Error()))
return
Expand All @@ -190,15 +192,11 @@ func (r *messageBusReporter) Report(registry gometrics.Registry, metricTags map[
}
})

r.lc.Debugf("Publish %d metrics to the '%s' base topic", publishedCount, r.baseTopic())
r.lc.Debugf("Publish %d metrics to the '%s' base topic", publishedCount, r.baseMetricsTopic)

return errs
}

func (r *messageBusReporter) baseTopic() string {
return fmt.Sprintf("%s/%s", r.config.PublishTopicPrefix, r.serviceName)
}

func buildMetricTags(tags map[string]string) []dtos.MetricTag {
var metricTags []dtos.MetricTag

Expand Down
20 changes: 8 additions & 12 deletions bootstrap/metrics/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package metrics

import (
"encoding/json"
"fmt"
"testing"

"github.com/edgexfoundry/go-mod-core-contracts/v3/common"
gometrics "github.com/rcrowley/go-metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand All @@ -37,12 +37,10 @@ import (

func TestNewMessageBusReporter(t *testing.T) {
expectedServiceName := "test-service"
baseTopic := "metrics"
expectedBaseTopic := "metrics/test-service"
expectedBaseTopic := common.BuildTopic(common.DefaultBaseTopic, common.MetricsPublishTopic, expectedServiceName)

expectedTelemetryConfig := &config.TelemetryInfo{
Interval: "30s",
PublishTopicPrefix: baseTopic,
Interval: "30s",
Metrics: map[string]bool{
"MyMetric": true,
},
Expand Down Expand Up @@ -73,12 +71,12 @@ func TestNewMessageBusReporter(t *testing.T) {

for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
r := NewMessageBusReporter(logger.NewMockClient(), test.ExpectedServiceName, nil, expectedTelemetryConfig)
r := NewMessageBusReporter(logger.NewMockClient(), common.DefaultBaseTopic, test.ExpectedServiceName, nil, expectedTelemetryConfig)
actual := r.(*messageBusReporter)
assert.NotNil(t, actual)
assert.Equal(t, expectedServiceName, actual.serviceName)
assert.Equal(t, expectedTelemetryConfig, actual.config)
assert.Equal(t, expectedBaseTopic, actual.baseTopic())
assert.Equal(t, expectedBaseTopic, actual.baseMetricsTopic)
})
}

Expand All @@ -88,12 +86,10 @@ func TestMessageBusReporter_Report(t *testing.T) {
expectedServiceName := "test-service"
expectedMetricName := "test-metric"
unexpectedMetricName := "disabled-metric"
baseTopic := "metrics"
expectedTopic := fmt.Sprintf("%s/%s/%s", baseTopic, expectedServiceName, expectedMetricName)
expectedTopic := common.BuildTopic(common.DefaultBaseTopic, common.MetricsPublishTopic, expectedServiceName, expectedMetricName)

expectedTelemetryConfig := &config.TelemetryInfo{
Interval: "30s",
PublishTopicPrefix: baseTopic,
Interval: "30s",
Metrics: map[string]bool{
expectedMetricName: true,
unexpectedMetricName: false,
Expand Down Expand Up @@ -222,7 +218,7 @@ func TestMessageBusReporter_Report(t *testing.T) {
},
})

target := NewMessageBusReporter(logger.NewMockClient(), expectedServiceName, dic, expectedTelemetryConfig)
target := NewMessageBusReporter(logger.NewMockClient(), common.DefaultBaseTopic, expectedServiceName, dic, expectedTelemetryConfig)

if test.Metric != nil {
err = reg.Register(expectedMetricName, test.Metric)
Expand Down
22 changes: 12 additions & 10 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import (
)

const (
DefaultHttpProtocol = "http"
MessageBusPublishTopicPrefix = "PublishTopicPrefix"
MessageBusSubscribeTopic = "SubscribeTopic"
DefaultHttpProtocol = "http"
)

const (
Expand Down Expand Up @@ -138,8 +136,6 @@ type ClientInfo struct {
Protocol string
// UseMessageBus indicates weather to use Messaging version of client
UseMessageBus bool
// Topics holds the MessageBus topics used by the client to communicate to the service
Topics map[string]string
}

func (c ClientInfo) Url() string {
Expand Down Expand Up @@ -258,14 +254,23 @@ type MessageBusInfo struct {
// dynamically loaded using this name and store the Option property below where the implementation expected to
// find them.
SecretName string
// Topics allows MessageBusInfo to be more flexible with respect to topics.
Topics map[string]string
// BaseTopicPrefix is the base topic prefix that all topics start with.
// If not set the DefaultBaseTopic constant is used.
BaseTopicPrefix string
// Provides additional configuration properties which do not fit within the existing field.
// Typically, the key is the name of the configuration property and the value is a string representation of the
// desired value for the configuration property.
Optional map[string]string
}

func (m MessageBusInfo) GetBaseTopicPrefix() string {
if len(m.BaseTopicPrefix) == 0 {
return common.DefaultBaseTopic
}

return m.BaseTopicPrefix
}

type ExternalMQTTInfo struct {
// Url contains the fully qualified URL to connect to the MQTT broker
Url string
Expand Down Expand Up @@ -312,9 +317,6 @@ func (p MessageBusInfo) URL() string {
type TelemetryInfo struct {
// Interval is the time duration in which to collect and report the service's metrics
Interval string
// PublishTopicPrefix is the base topic in which to publish (report) the service's metrics to the EdgeX MessageBus
// The service name and the metric name are appended to this base topic. i.e. <prefix>/<service-name>/<metric-name>
PublishTopicPrefix string
// Metrics is the list of service's metrics that can be collected. Each of the service's metrics must be in the list
// and set to true if enable or false if disabled.
Metrics map[string]bool
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ go 1.18
require (
github.com/eclipse/paho.mqtt.golang v1.4.2
github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.3
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.11
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.6
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.14
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.9
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3
github.com/edgexfoundry/go-mod-secrets/v3 v3.0.0-dev.7
github.com/google/uuid v1.3.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6r
github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.3 h1:0Ew4PzLSFJ+sb7AYtvb9m1mRN45Sh0ELU1HdMCel5t8=
github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.3/go.mod h1:ESOWI4GokQfQ3Bn2hGsdfOVx5idj7QEdCPT/SAQDd9M=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.11 h1:B3ZK8APdjxD3+D0JDg/9Cb5KRE8tT/xaXxwrZ6Y46DE=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.11/go.mod h1:4lpZUM54ZareGU/yuAJvLEw0BoJ43SvCj1LO+gsKm9c=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.6 h1:JaCP/iw7ahuBCCLuZG9Z2JDDRgQa9V+lZ6ZHZtSb+yQ=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.6/go.mod h1:1Vtp3Zwsie1ODeF2CjHbp6Vhgjmx4URyCQ4rJHQg89I=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.14 h1:o7CFEIyKn/quin5lrAlUbUu9x1dnecK0tZs5waLhdCc=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.14/go.mod h1:4lpZUM54ZareGU/yuAJvLEw0BoJ43SvCj1LO+gsKm9c=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.9 h1:CUUieXQ8roD4M770GXj1he707V3V9Jiygk302+dwvKk=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.9/go.mod h1:iKBxmZkc7jdOrT99+IR1nyg7PlRgooAQMhZxDh2mTUQ=
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3 h1:QgZF9f70Cwpvkjw3tP1aiVGHc+yNFJNzW6hO8pDs3fg=
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3/go.mod h1:2w8v0sv+i21nY+DY6JV4PFxsNTuxpGAjlNFlFMTfZkk=
github.com/edgexfoundry/go-mod-secrets/v3 v3.0.0-dev.7 h1:9Mn389IHlgoPgGUpwnLzoe/shylkQI+nXI3E/AfnYDA=
Expand Down

0 comments on commit 45461fa

Please sign in to comment.