Skip to content

Commit

Permalink
feat: add indexing pressure
Browse files Browse the repository at this point in the history
Signed-off-by: 黄振明 <[email protected]>
  • Loading branch information
黄振明 committed Jun 9, 2023
1 parent ed8a758 commit c5cbb48
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 17 deletions.
194 changes: 194 additions & 0 deletions collector/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ type filesystemIODeviceMetric struct {
Labels func(cluster string, node NodeStatsNodeResponse, device string) []string
}

type indexingPressureMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64
Labels func(cluster string, node NodeStatsNodeResponse) []string
}

// Nodes information struct
type Nodes struct {
logger log.Logger
Expand All @@ -188,6 +195,7 @@ type Nodes struct {
threadPoolMetrics []*threadPoolMetric
filesystemDataMetrics []*filesystemDataMetric
filesystemIODeviceMetrics []*filesystemIODeviceMetric
indexingPressureMetrics []*indexingPressureMetric
}

// NewNodes defines Nodes Prometheus metrics
Expand Down Expand Up @@ -1781,6 +1789,176 @@ func NewNodes(logger log.Logger, client *http.Client, url *url.URL, all bool, no
Labels: defaultFilesystemIODeviceLabelValues,
},
},
indexingPressureMetrics: []*indexingPressureMetric{
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "current_combined_coordinating_and_primary_in_bytes"),
"Memory consumed, in bytes, by indexing requests in the coordinating or primary stage.",
defaultNodeLabels, nil,
),
Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 {
return float64(indexingPressureMem.Current.CombinedCoordinatingAndPrimaryInBytes)
},
Labels: defaultNodeLabelValues,
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "current_coordinating_in_bytes"),
"Memory consumed, in bytes, by indexing requests in the coordinating stage.",
defaultNodeLabels, nil,
),
Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 {
return float64(indexingPressureMem.Current.CoordinatingInBytes)
},
Labels: defaultNodeLabelValues,
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "current_primary_in_bytes"),
"Memory consumed, in bytes, by indexing requests in the primary stage.",
defaultNodeLabels, nil,
),
Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 {
return float64(indexingPressureMem.Current.PrimaryInBytes)
},
Labels: defaultNodeLabelValues,
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "current_replica_in_bytes"),
"Memory consumed, in bytes, by indexing requests in the replica stage.",
defaultNodeLabels, nil,
),
Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 {
return float64(indexingPressureMem.Current.ReplicaInBytes)
},
Labels: defaultNodeLabelValues,
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "current_all_in_bytes"),
"Memory consumed, in bytes, by indexing requests in the coordinating, primary, or replica stage.",
defaultNodeLabels, nil,
),
Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 {
return float64(indexingPressureMem.Current.ReplicaInBytes)
},
Labels: defaultNodeLabelValues,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "total_combined_coordinating_and_primary_in_bytes"),
"Memory consumed, in bytes, by indexing requests in the coordinating or primary stage.",
defaultNodeLabels, nil,
),
Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 {
return float64(indexingPressureMem.Total.CombinedCoordinatingAndPrimaryInBytes)
},
Labels: defaultNodeLabelValues,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "total_coordinating_in_bytes"),
"Memory consumed, in bytes, by indexing requests in the coordinating stage.",
defaultNodeLabels, nil,
),
Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 {
return float64(indexingPressureMem.Total.CoordinatingInBytes)
},
Labels: defaultNodeLabelValues,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "total_primary_in_bytes"),
"Memory consumed, in bytes, by indexing requests in the primary stage.",
defaultNodeLabels, nil,
),
Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 {
return float64(indexingPressureMem.Total.PrimaryInBytes)
},
Labels: defaultNodeLabelValues,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "total_replica_in_bytes"),
"Memory consumed, in bytes, by indexing requests in the replica stage.",
defaultNodeLabels, nil,
),
Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 {
return float64(indexingPressureMem.Total.ReplicaInBytes)
},
Labels: defaultNodeLabelValues,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "total_all_in_bytes"),
"Memory consumed, in bytes, by indexing requests in the coordinating, primary, or replica stage.",
defaultNodeLabels, nil,
),
Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 {
return float64(indexingPressureMem.Total.AllInBytes)
},
Labels: defaultNodeLabelValues,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "total_coordinating_rejections"),
"Number of indexing requests rejected in the coordinating stage.",
defaultNodeLabels, nil,
),
Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 {
return float64(indexingPressureMem.Total.CoordinatingRejections)
},
Labels: defaultNodeLabelValues,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "total_primary_rejections"),
"Number of indexing requests rejected in the primary stage.",
defaultNodeLabels, nil,
),
Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 {
return float64(indexingPressureMem.Total.PrimaryRejections)
},
Labels: defaultNodeLabelValues,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "total_replica_rejections"),
"Number of indexing requests rejected in the replica stage.",
defaultNodeLabels, nil,
),
Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 {
return float64(indexingPressureMem.Total.ReplicaRejections)
},
Labels: defaultNodeLabelValues,
},
{
Type: prometheus.CounterValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "indexing_pressure", "limit_in_bytes"),
"Configured memory limit, in bytes, for the indexing requests. Replica requests have an automatic limit that is 1.5x this value.",
defaultNodeLabels, nil,
),
Value: func(indexingPressureMem NodestatsIndexingPressureMemoryResponse) float64 {
return float64(indexingPressureMem.LimitInBytes)
},
Labels: defaultNodeLabelValues,
},
},
}
}

Expand All @@ -1801,6 +1979,9 @@ func (c *Nodes) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range c.filesystemIODeviceMetrics {
ch <- metric.Desc
}
for _, metric := range c.indexingPressureMetrics {
ch <- metric.Desc
}
ch <- c.up.Desc()
ch <- c.totalScrapes.Desc()
ch <- c.jsonParseFailures.Desc()
Expand Down Expand Up @@ -1955,5 +2136,18 @@ func (c *Nodes) Collect(ch chan<- prometheus.Metric) {
}
}

// indexing_pressure Stats https://github.com/prometheus-community/elasticsearch_exporter/issues/638
// https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules-indexing-pressure.html
for _, indexingPressureMem := range node.IndexingPressure {
for _, metric := range c.indexingPressureMetrics {
ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(indexingPressureMem),
metric.Labels(nodeStatsResp.ClusterName, node)...,
)
}
}

}
}
61 changes: 44 additions & 17 deletions collector/nodes_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,24 @@ type nodeStatsResponse struct {

// NodeStatsNodeResponse defines node stats information structure for nodes
type NodeStatsNodeResponse struct {
Name string `json:"name"`
Host string `json:"host"`
Timestamp int64 `json:"timestamp"`
TransportAddress string `json:"transport_address"`
Hostname string `json:"hostname"`
Roles []string `json:"roles"`
Attributes map[string]string `json:"attributes"`
Indices NodeStatsIndicesResponse `json:"indices"`
OS NodeStatsOSResponse `json:"os"`
Network NodeStatsNetworkResponse `json:"network"`
FS NodeStatsFSResponse `json:"fs"`
ThreadPool map[string]NodeStatsThreadPoolPoolResponse `json:"thread_pool"`
JVM NodeStatsJVMResponse `json:"jvm"`
Breakers map[string]NodeStatsBreakersResponse `json:"breakers"`
HTTP map[string]interface{} `json:"http"`
Transport NodeStatsTransportResponse `json:"transport"`
Process NodeStatsProcessResponse `json:"process"`
Name string `json:"name"`
Host string `json:"host"`
Timestamp int64 `json:"timestamp"`
TransportAddress string `json:"transport_address"`
Hostname string `json:"hostname"`
Roles []string `json:"roles"`
Attributes map[string]string `json:"attributes"`
Indices NodeStatsIndicesResponse `json:"indices"`
OS NodeStatsOSResponse `json:"os"`
Network NodeStatsNetworkResponse `json:"network"`
FS NodeStatsFSResponse `json:"fs"`
ThreadPool map[string]NodeStatsThreadPoolPoolResponse `json:"thread_pool"`
JVM NodeStatsJVMResponse `json:"jvm"`
Breakers map[string]NodeStatsBreakersResponse `json:"breakers"`
HTTP map[string]interface{} `json:"http"`
Transport NodeStatsTransportResponse `json:"transport"`
Process NodeStatsProcessResponse `json:"process"`
IndexingPressure map[string]NodestatsIndexingPressureMemoryResponse `json:"indexing_pressure"`
}

// NodeStatsBreakersResponse is a representation of a statistics about the field data circuit breaker
Expand Down Expand Up @@ -317,6 +318,32 @@ type NodeStatsProcessResponse struct {
Memory NodeStatsProcessMemResponse `json:"mem"`
}

// https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-stats.html#cluster-nodes-stats-api-response-body-indexing-pressure
type NodestatsIndexingPressureMemoryResponse struct {
Current NodestatsIndexingPressureMemoryCurrentResponse `json:"current"`
Total NodestatsIndexingPressureMemoryTotalResponse `json:"total"`
LimitInBytes int64 `json:"limit_in_bytes"`
}

type NodestatsIndexingPressureMemoryCurrentResponse struct {
CombinedCoordinatingAndPrimaryInBytes int64 `json:"combined_coordinating_and_primary_in_bytes"`
CoordinatingInBytes int64 `json:"coordinating_in_bytes"`
PrimaryInBytes int64 `json:"primary_in_bytes"`
ReplicaInBytes int64 `json:"replica_in_bytes"`
AllInBytes int64 `json:"all_in_bytes"`
}

type NodestatsIndexingPressureMemoryTotalResponse struct {
CombinedCoordinatingAndPrimaryInBytes int64 `json:"combined_coordinating_and_primary_in_bytes"`
CoordinatingInBytes int64 `json:"coordinating_in_bytes"`
PrimaryInBytes int64 `json:"primary_in_bytes"`
ReplicaInBytes int64 `json:"replica_in_bytes"`
AllInBytes int64 `json:"all_in_bytes"`
CoordinatingRejections int64 `json:"coordinating_rejections"`
PrimaryRejections int64 `json:"primary_rejections"`
ReplicaRejections int64 `json:"replica_rejections"`
}

// NodeStatsProcessMemResponse defines node stats process memory usage structure
type NodeStatsProcessMemResponse struct {
Resident int64 `json:"resident_in_bytes"`
Expand Down

0 comments on commit c5cbb48

Please sign in to comment.