diff --git a/bootstrap/handlers/clients.go b/bootstrap/handlers/clients.go index babdfa67..4dc49c89 100644 --- a/bootstrap/handlers/clients.go +++ b/bootstrap/handlers/clients.go @@ -110,11 +110,8 @@ func (cb *ClientsBootstrap) BootstrapHandler( return false } - client, err = clientsMessaging.NewCommandClient(messageClient, serviceInfo.Topics, timeout) - if err != nil { - lc.Errorf("Unable to create messaging based Command Client: %v", err) - return false - } + baseTopic := config.GetBootstrap().MessageBus.GetBaseTopicPrefix() + client = clientsMessaging.NewCommandClient(messageClient, baseTopic, timeout) lc.Infof("Using messaging for '%s' clients", serviceKey) } else { diff --git a/bootstrap/handlers/clients_test.go b/bootstrap/handlers/clients_test.go index 53e7d969..6ecf07ec 100644 --- a/bootstrap/handlers/clients_test.go +++ b/bootstrap/handlers/clients_test.go @@ -63,12 +63,6 @@ func TestClientsBootstrapHandler(t *testing.T) { commandMessagingClientInfo := config.ClientInfo{ UseMessageBus: true, - Topics: map[string]string{ - "QueryRequestTopic": "queryRequest", - "QueryResponseTopic": "queryResponse", - "CommandRequestTopicPrefix": "commandRequest", - "CommandResponseTopic": "commandResponse", - }, } notificationClientInfo := config.ClientInfo{ @@ -297,22 +291,13 @@ func TestCommandMessagingClientErrors(t *testing.T) { validDuration := "30s" invalidDuration := "xyz" - validTopics := map[string]string{ - "QueryRequestTopic": "queryRequest", - "QueryResponseTopic": "queryResponse", - "CommandRequestTopicPrefix": "commandRequest", - "CommandResponseTopic": "commandResponse", - } - tests := []struct { Name string MessagingClientPresent bool TimeoutDuration string - SubscribeError bool }{ - {"Missing Messaging Client", false, validDuration, false}, - {"Error creating Messaging Client", true, validDuration, true}, - {"Bad Timeout duration", true, invalidDuration, false}, + {"Missing Messaging Client", false, validDuration}, + {"Bad Timeout duration", true, invalidDuration}, } for _, test := range tests { @@ -322,16 +307,10 @@ func TestCommandMessagingClientErrors(t *testing.T) { mockLogger.On("Errorf", mock.Anything, mock.Anything) mockMessaging := &messagingMocks.MessageClient{} - if test.SubscribeError { - mockMessaging.On("Subscribe", mock.Anything, mock.Anything).Return(errors.New("failed")) - } else { - mockMessaging.On("Subscribe", mock.Anything, mock.Anything).Return(nil) - } clients := make(map[string]config.ClientInfo) clients[common.CoreCommandServiceKey] = config.ClientInfo{ UseMessageBus: true, - Topics: validTopics, } bootstrapConfig := config.BootstrapConfiguration{ diff --git a/bootstrap/handlers/messaging.go b/bootstrap/handlers/messaging.go index 1fc4ec28..db2182e9 100644 --- a/bootstrap/handlers/messaging.go +++ b/bootstrap/handlers/messaging.go @@ -102,12 +102,11 @@ func MessagingBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupT }) lc.Infof( - "Connected to %s Message Bus @ %s://%s:%d using topics [%v] with AuthMode='%s'", + "Connected to %s Message Bus @ %s://%s:%d with AuthMode='%s'", messageBusInfo.Type, messageBusInfo.Protocol, messageBusInfo.Host, messageBusInfo.Port, - messageBusInfo.Topics, messageBusInfo.AuthMode) return true @@ -120,18 +119,14 @@ func MessagingBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupT func deepCopy(target config.MessageBusInfo) config.MessageBusInfo { result := config.MessageBusInfo{ - Disabled: target.Disabled, - Type: target.Type, - Protocol: target.Protocol, - Host: target.Host, - Port: target.Port, - AuthMode: target.AuthMode, - SecretName: target.SecretName, - } - - result.Topics = make(map[string]string) - for key, value := range target.Topics { - result.Topics[key] = value + Disabled: target.Disabled, + Type: target.Type, + Protocol: target.Protocol, + Host: target.Host, + Port: target.Port, + AuthMode: target.AuthMode, + SecretName: target.SecretName, + BaseTopicPrefix: target.BaseTopicPrefix, } result.Optional = make(map[string]string) diff --git a/bootstrap/handlers/messaging_test.go b/bootstrap/handlers/messaging_test.go index 198e9597..7dbea1e1 100644 --- a/bootstrap/handlers/messaging_test.go +++ b/bootstrap/handlers/messaging_test.go @@ -40,13 +40,10 @@ func TestMain(m *testing.M) { func TestBootstrapHandler(t *testing.T) { validCreateClientSecure := config.MessageBusInfo{ - Type: messaging.Redis, - Protocol: "redis", - Host: "localhost", - Port: 6379, - Topics: map[string]string{ - config.MessageBusPublishTopicPrefix: "edgex/events/#", - }, + Type: messaging.Redis, + Protocol: "redis", + Host: "localhost", + Port: 6379, AuthMode: boostrapMessaging.AuthModeUsernamePassword, SecretName: "redisdb", } diff --git a/bootstrap/handlers/metrics.go b/bootstrap/handlers/metrics.go index bed4f424..fb7caa15 100644 --- a/bootstrap/handlers/metrics.go +++ b/bootstrap/handlers/metrics.go @@ -65,7 +65,8 @@ func (s *ServiceMetrics) BootstrapHandler(ctx context.Context, wg *sync.WaitGrou interval = math.MaxInt64 } - reporter := metrics.NewMessageBusReporter(lc, s.serviceName, dic, telemetryConfig) + baseTopic := serviceConfig.GetBootstrap().MessageBus.GetBaseTopicPrefix() + reporter := metrics.NewMessageBusReporter(lc, baseTopic, s.serviceName, dic, telemetryConfig) manager := metrics.NewManager(lc, interval, reporter) manager.Run(ctx, wg) diff --git a/bootstrap/handlers/metrics_test.go b/bootstrap/handlers/metrics_test.go index ab288435..1ba012ff 100644 --- a/bootstrap/handlers/metrics_test.go +++ b/bootstrap/handlers/metrics_test.go @@ -50,6 +50,7 @@ func TestServiceMetrics_BootstrapHandler(t *testing.T) { mockMessagingClient := &mocks.MessageClient{} mockConfiguration := &mocks2.Configuration{} + mockConfiguration.On("GetBootstrap").Return(config.BootstrapConfiguration{}) dic := di.NewContainer(di.ServiceConstructorMap{ container.LoggingClientInterfaceName: func(get di.Get) interface{} { @@ -64,10 +65,9 @@ func TestServiceMetrics_BootstrapHandler(t *testing.T) { }) expectedTelemetryInfo := config.TelemetryInfo{ - Interval: test.Interval, - PublishTopicPrefix: "/metrics", - Metrics: make(map[string]bool), - Tags: make(map[string]string), + Interval: test.Interval, + Metrics: make(map[string]bool), + Tags: make(map[string]string), } mockConfiguration.On("GetTelemetryInfo").Return(&expectedTelemetryInfo) diff --git a/bootstrap/metrics/reporter.go b/bootstrap/metrics/reporter.go index cab0e5e5..9ebfdff9 100644 --- a/bootstrap/metrics/reporter.go +++ b/bootstrap/metrics/reporter.go @@ -57,20 +57,22 @@ const ( ) type messageBusReporter struct { - lc logger.LoggingClient - serviceName string - dic *di.Container - messageClient messaging.MessageClient - config *config.TelemetryInfo + lc logger.LoggingClient + serviceName string + dic *di.Container + messageClient messaging.MessageClient + config *config.TelemetryInfo + baseMetricsTopic string } // NewMessageBusReporter creates a new MessageBus reporter which reports metrics to the EdgeX MessageBus -func NewMessageBusReporter(lc logger.LoggingClient, serviceName string, dic *di.Container, config *config.TelemetryInfo) interfaces.MetricsReporter { +func NewMessageBusReporter(lc logger.LoggingClient, baseTopic string, serviceName string, dic *di.Container, config *config.TelemetryInfo) interfaces.MetricsReporter { reporter := &messageBusReporter{ - lc: lc, - serviceName: serviceName, - dic: dic, - config: config, + lc: lc, + serviceName: serviceName, + dic: dic, + config: config, + baseMetricsTopic: common.BuildTopic(baseTopic, common.MetricsPublishTopic, serviceName), } return reporter @@ -181,7 +183,7 @@ func (r *messageBusReporter) Report(registry gometrics.Registry, metricTags map[ ContentType: common.ContentTypeJSON, } - topic := fmt.Sprintf("%s/%s", r.baseTopic(), name) + topic := common.BuildTopic(r.baseMetricsTopic, name) if err := r.messageClient.Publish(message, topic); err != nil { errs = multierror.Append(errs, fmt.Errorf("failed to publish metric '%s' to topic '%s': %s", name, topic, err.Error())) return @@ -190,15 +192,11 @@ func (r *messageBusReporter) Report(registry gometrics.Registry, metricTags map[ } }) - r.lc.Debugf("Publish %d metrics to the '%s' base topic", publishedCount, r.baseTopic()) + r.lc.Debugf("Publish %d metrics to the '%s' base topic", publishedCount, r.baseMetricsTopic) return errs } -func (r *messageBusReporter) baseTopic() string { - return fmt.Sprintf("%s/%s", r.config.PublishTopicPrefix, r.serviceName) -} - func buildMetricTags(tags map[string]string) []dtos.MetricTag { var metricTags []dtos.MetricTag diff --git a/bootstrap/metrics/reporter_test.go b/bootstrap/metrics/reporter_test.go index c9591ecf..bf879b4e 100644 --- a/bootstrap/metrics/reporter_test.go +++ b/bootstrap/metrics/reporter_test.go @@ -16,9 +16,9 @@ package metrics import ( "encoding/json" - "fmt" "testing" + "github.com/edgexfoundry/go-mod-core-contracts/v3/common" gometrics "github.com/rcrowley/go-metrics" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -37,12 +37,10 @@ import ( func TestNewMessageBusReporter(t *testing.T) { expectedServiceName := "test-service" - baseTopic := "metrics" - expectedBaseTopic := "metrics/test-service" + expectedBaseTopic := common.BuildTopic(common.DefaultBaseTopic, common.MetricsPublishTopic, expectedServiceName) expectedTelemetryConfig := &config.TelemetryInfo{ - Interval: "30s", - PublishTopicPrefix: baseTopic, + Interval: "30s", Metrics: map[string]bool{ "MyMetric": true, }, @@ -73,12 +71,12 @@ func TestNewMessageBusReporter(t *testing.T) { for _, test := range tests { t.Run(test.Name, func(t *testing.T) { - r := NewMessageBusReporter(logger.NewMockClient(), test.ExpectedServiceName, nil, expectedTelemetryConfig) + r := NewMessageBusReporter(logger.NewMockClient(), common.DefaultBaseTopic, test.ExpectedServiceName, nil, expectedTelemetryConfig) actual := r.(*messageBusReporter) assert.NotNil(t, actual) assert.Equal(t, expectedServiceName, actual.serviceName) assert.Equal(t, expectedTelemetryConfig, actual.config) - assert.Equal(t, expectedBaseTopic, actual.baseTopic()) + assert.Equal(t, expectedBaseTopic, actual.baseMetricsTopic) }) } @@ -88,12 +86,10 @@ func TestMessageBusReporter_Report(t *testing.T) { expectedServiceName := "test-service" expectedMetricName := "test-metric" unexpectedMetricName := "disabled-metric" - baseTopic := "metrics" - expectedTopic := fmt.Sprintf("%s/%s/%s", baseTopic, expectedServiceName, expectedMetricName) + expectedTopic := common.BuildTopic(common.DefaultBaseTopic, common.MetricsPublishTopic, expectedServiceName, expectedMetricName) expectedTelemetryConfig := &config.TelemetryInfo{ - Interval: "30s", - PublishTopicPrefix: baseTopic, + Interval: "30s", Metrics: map[string]bool{ expectedMetricName: true, unexpectedMetricName: false, @@ -222,7 +218,7 @@ func TestMessageBusReporter_Report(t *testing.T) { }, }) - target := NewMessageBusReporter(logger.NewMockClient(), expectedServiceName, dic, expectedTelemetryConfig) + target := NewMessageBusReporter(logger.NewMockClient(), common.DefaultBaseTopic, expectedServiceName, dic, expectedTelemetryConfig) if test.Metric != nil { err = reg.Register(expectedMetricName, test.Metric) diff --git a/config/types.go b/config/types.go index 5d376d0d..ffd0708c 100644 --- a/config/types.go +++ b/config/types.go @@ -26,9 +26,7 @@ import ( ) const ( - DefaultHttpProtocol = "http" - MessageBusPublishTopicPrefix = "PublishTopicPrefix" - MessageBusSubscribeTopic = "SubscribeTopic" + DefaultHttpProtocol = "http" ) const ( @@ -138,8 +136,6 @@ type ClientInfo struct { Protocol string // UseMessageBus indicates weather to use Messaging version of client UseMessageBus bool - // Topics holds the MessageBus topics used by the client to communicate to the service - Topics map[string]string } func (c ClientInfo) Url() string { @@ -258,14 +254,23 @@ type MessageBusInfo struct { // dynamically loaded using this name and store the Option property below where the implementation expected to // find them. SecretName string - // Topics allows MessageBusInfo to be more flexible with respect to topics. - Topics map[string]string + // BaseTopicPrefix is the base topic prefix that all topics start with. + // If not set the DefaultBaseTopic constant is used. + BaseTopicPrefix string // Provides additional configuration properties which do not fit within the existing field. // Typically, the key is the name of the configuration property and the value is a string representation of the // desired value for the configuration property. Optional map[string]string } +func (m MessageBusInfo) GetBaseTopicPrefix() string { + if len(m.BaseTopicPrefix) == 0 { + return common.DefaultBaseTopic + } + + return m.BaseTopicPrefix +} + type ExternalMQTTInfo struct { // Url contains the fully qualified URL to connect to the MQTT broker Url string @@ -312,9 +317,6 @@ func (p MessageBusInfo) URL() string { type TelemetryInfo struct { // Interval is the time duration in which to collect and report the service's metrics Interval string - // PublishTopicPrefix is the base topic in which to publish (report) the service's metrics to the EdgeX MessageBus - // The service name and the metric name are appended to this base topic. i.e. // - PublishTopicPrefix string // Metrics is the list of service's metrics that can be collected. Each of the service's metrics must be in the list // and set to true if enable or false if disabled. Metrics map[string]bool diff --git a/go.mod b/go.mod index ae18d7ec..ad3d68f6 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,8 @@ go 1.18 require ( github.com/eclipse/paho.mqtt.golang v1.4.2 github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.3 - github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.11 - github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.6 + github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.14 + github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.9 github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3 github.com/edgexfoundry/go-mod-secrets/v3 v3.0.0-dev.7 github.com/google/uuid v1.3.0 diff --git a/go.sum b/go.sum index c69e8318..2a00fd26 100644 --- a/go.sum +++ b/go.sum @@ -28,10 +28,10 @@ github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6r github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA= github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.3 h1:0Ew4PzLSFJ+sb7AYtvb9m1mRN45Sh0ELU1HdMCel5t8= github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.3/go.mod h1:ESOWI4GokQfQ3Bn2hGsdfOVx5idj7QEdCPT/SAQDd9M= -github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.11 h1:B3ZK8APdjxD3+D0JDg/9Cb5KRE8tT/xaXxwrZ6Y46DE= -github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.11/go.mod h1:4lpZUM54ZareGU/yuAJvLEw0BoJ43SvCj1LO+gsKm9c= -github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.6 h1:JaCP/iw7ahuBCCLuZG9Z2JDDRgQa9V+lZ6ZHZtSb+yQ= -github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.6/go.mod h1:1Vtp3Zwsie1ODeF2CjHbp6Vhgjmx4URyCQ4rJHQg89I= +github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.14 h1:o7CFEIyKn/quin5lrAlUbUu9x1dnecK0tZs5waLhdCc= +github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.14/go.mod h1:4lpZUM54ZareGU/yuAJvLEw0BoJ43SvCj1LO+gsKm9c= +github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.9 h1:CUUieXQ8roD4M770GXj1he707V3V9Jiygk302+dwvKk= +github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.9/go.mod h1:iKBxmZkc7jdOrT99+IR1nyg7PlRgooAQMhZxDh2mTUQ= github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3 h1:QgZF9f70Cwpvkjw3tP1aiVGHc+yNFJNzW6hO8pDs3fg= github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3/go.mod h1:2w8v0sv+i21nY+DY6JV4PFxsNTuxpGAjlNFlFMTfZkk= github.com/edgexfoundry/go-mod-secrets/v3 v3.0.0-dev.7 h1:9Mn389IHlgoPgGUpwnLzoe/shylkQI+nXI3E/AfnYDA=