Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implementation of ipfixlookupprocessor #30195

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions processor/ipfixlookupprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
149 changes: 149 additions & 0 deletions processor/ipfixlookupprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# IPFIX Lookup Processor
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Distributions | [contrib]|
| Issues |[![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aconnector%2Fipfix%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aconnector%2Fipfix) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aconnector%2Fipfix%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aconnector%2Fipfix) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@fizzers123](https://www.github.com/fizzers123), [@SuniAve](https://www.github.com/SuniAve) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib

## Supported Pipeline Types

| [Exporter Pipeline Type] | [Receiver Pipeline Type] | [Stability Level] |
| ------------------------ | ------------------------ | ----------------- |
| traces | traces | [development] |

[Exporter Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#exporter-pipeline-type
[Receiver Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#receiver-pipeline-type
[Stability Level]: https://github.com/open-telemetry/opentelemetry-collector#stability-levels
<!-- end autogenerated section -->

[ElasticSearch]: https://www.elastic.co/elasticsearch/

The `ipfixlookup` processor can be used to inject IPFIX spans into existing traces.

## Prerequisits

You need to store your Netflow/IPFIX logs in [ElasticSearch] to use this processor. Please check the [Netflow Integration](https://docs.elastic.co/en/integrations/netflow) for more information.

### Default Configuration

The `ipfixlookup` processorr will require the following minimum configuration.


For example, in the following configuration, the processor will connect to the specified [ElasticSearch] instance and search for IPFIX/Netflow events in the specified time window.
```yaml
processors:
groupbytrace:
wait_duration: 100s
num_traces: 1000
num_workers: 2
ipfix_lookup:
elastic_search:
connection:
addresses:
- https://<elastic-search-address>:9200/
username: elastic
password: <password>
certificate_fingerprint: <certificate fingerprint>
timing:
lookup_window: 25

service:
pipelines:
traces:
receivers: [otlp]
processors: [groupbytrace, ipfix_lookup]
exporters: [otlp/jaeger, debug]
```

### Custom lookup fields



Optionally, you can specify the fields the processor will look up in [ElasticSearch] and match within the spans

| Full Path Configuration | Description |
| -------------------------------------------- | --------------------------------------------------------------------------------------------- |
| `ipfix_lookup.query_parameters` | Parameters used for querying the IPFIX lookup processor. |
| `ipfix_lookup.base_query.field_name` | The name of the field used in the base query. |
| `ipfix_lookup.base_query.field_value` | The value of the field used in the base query. |
| `ipfix_lookup.device_identifier` | The field used to identify the device in IPFIX records. |
| `ipfix_lookup.lookup_fields.source_ip` | Field representing the source IP address. |
| `ipfix_lookup.lookup_fields.source_port` | Field representing the source port. |
| `ipfix_lookup.lookup_fields.destination_ip` | Field representing the destination IP address. |
| `ipfix_lookup.lookup_fields.destination_port`| Field representing the destination port. |
| `ipfix_lookup.span_attribute_fields` | Fields to be added as attributes to the new span. In [gjson](https://gjson.dev/) format. |
| `ipfix_lookup.spans.source_ips` | Fields representing the source IP address of the span. |
| `ipfix_lookup.spans.source_ports` | Fields representing the source port of the span. |
| `ipfix_lookup.spans.destination_ip_and_port` | Fields representing the destination IP and port of the span. Like: `192.168.10.10:443` |
| `ipfix_lookup.spans.destination_ips` | Fields representing the destination IP address of the span. |
| `ipfix_lookup.spans.destination_ports` | Fields representing the destination port of the span. |

The configuration below shows the default values:


```yaml
processors:
groupbytrace:
wait_duration: 10s
num_traces: 1000
num_workers: 2
ipfix_lookup:
query_parameters:
base_query:
field_name: input.type
field_value: netflow
device_identifier: "fields.observer\\.ip.0"
lookup_fields:
source_ip: source.ip
source_port: source.port
destination_ip: destination.ip
destination_port: destination.port
span_attribute_fields:
- "@this"
- "fields.event\\.duration.0"
- "fields.observer\\.ip.0"
- "fields.source\\.ip.0"
- "fields.source\\.port.0"
- "fields.destination\\.ip.0"
- "fields.destination\\.port.0"
- "fields.netflow\\.ip_next_hop_ipv4_address"
spans:
span_fields:
source_ips:
- net.peer.ip
- net.peer.name
- src.ip
source_ports:
- net.peer.port
- src.port
destination_ip_and_port:
- http.host
destination_ips:
- dst.ip
- net.peer.name
destination_ports:
- dst.port
```

### Timings:

This processor is responsible for looking through the spans in each trace. If the IP and port quartet (`source.ip, source.port, destination.ip, destination.port`) are found in a span, the corresponding flow is looked up in ElasticSearch. When flows are found, a new span is added to the trace, and the trace is exported.

![CorrelationUnitv3 drawio](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/15678530/5bb8e7de-c254-4991-871d-05c9c6d6f3f6)


The timing configuration is needed because there is an ingest delay in any large distributed search engine. Because of this, the processor must wait a bit before the search can be started. This delay can be defined in the `processors.groupbytrace.wait_duration` value. Afterwards, the search can be started. The time window that will be searched can be configured in the` processors.ipfix_lookup.timing.lookup_window`. To keep the processor simple, the lookup_window is added before the start timestamp and after the end timestamp. This way, the chance that the Netflow/IPFIX records leading or being caused by this span is found is maximized.


# Example screenshot
Example of a working implementation:
![finnal-implementation](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/15678530/37036d33-07f1-4c9e-bdea-7834a5e01015)
(The network was intentionally slowed down for this screenshot)




100 changes: 100 additions & 0 deletions processor/ipfixlookupprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ipfixlookupprocessor

import "fmt"

type ElasticsearchConnection struct {
Addresses []string `mapstructure:"addresses"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
CertificateFingerprint string `mapstructure:"certificate_fingerprint"`
}

type ElasticsearchConfig struct {
Connection ElasticsearchConnection `mapstructure:"connection"`
}

type LookupFields struct {
SourceIP string `mapstructure:"source_ip"`
SourcePort string `mapstructure:"source_port"`
DestinationIP string `mapstructure:"destination_ip"`
DestinationPort string `mapstructure:"destination_port"`
}

type BaseQuery struct {
FieldName string `mapstructure:"field_name"`
FieldValue string `mapstructure:"field_value"`
LookupFields LookupFields `mapstructure:"lookup_fields"`
}

type QueryParameters struct {
BaseQuery BaseQuery `mapstructure:"base_query"`
DeviceIdentifier string `mapstructure:"device_identifier"`
LookupFields LookupFields `mapstructure:"lookup_fields"`
}

type TimingConfig struct {
LookupWindow int `mapstructure:"lookup_window"`
}

type SpanFields struct {
SourceIPs []string `mapstructure:"source_ips"`
SourcePorts []string `mapstructure:"source_ports"`
DestinationIPandPort []string `mapstructure:"destination_ip_and_port"`
DestinationIPs []string `mapstructure:"destination_ips"`
DestinationPorts []string `mapstructure:"destination_ports"`
}

type Spans struct {
SpanFields SpanFields `mapstructure:"span_fields"`
}

type Config struct {
Elasticsearch ElasticsearchConfig `mapstructure:"elastic_search"`
QueryParameters QueryParameters `mapstructure:"query_parameters"`
SpanAttributeFields []string `mapstructure:"span_attribute_fields"`
Timing TimingConfig `mapstructure:"timing"`
Spans Spans `mapstructure:"spans"`
}

func (c *Config) Validate() error {
// Validate Elasticsearch fields
if len(c.Elasticsearch.Connection.Addresses) == 0 {
return fmt.Errorf("elasticsearch addresses must not be empty")
}
if c.Elasticsearch.Connection.Username == "" {
return fmt.Errorf("elasticsearch username must not be empty")
}
if c.Elasticsearch.Connection.Password == "" {
return fmt.Errorf("elasticsearch password must not be empty")
}
if c.Elasticsearch.Connection.CertificateFingerprint == "" {
return fmt.Errorf("elasticsearch certificateFingerprint must not be empty")
}

// Validate QueryParameters fields
if c.QueryParameters.DeviceIdentifier == "" {
return fmt.Errorf("queryParameters deviceIdentifier must not be empty")
}
if c.QueryParameters.BaseQuery.FieldName == "" || c.QueryParameters.BaseQuery.FieldValue == "" {
return fmt.Errorf("queryParameters baseQuery fieldName and fieldValue must not be empty")
}

// Validate SpanAttributeFields
if len(c.SpanAttributeFields) == 0 {
return fmt.Errorf("spanAttributeFields must not be empty")
}

// Validate Spans fields
if len(c.Spans.SpanFields.SourceIPs) == 0 || len(c.Spans.SpanFields.DestinationIPs) == 0 {
return fmt.Errorf("spans sourceIPs and destinationIPs must not be empty")
}

// Validate timing fields
if c.Timing.LookupWindow < 0 {
return fmt.Errorf("lookupWindow must be greater than 0")
}
return nil
}
4 changes: 4 additions & 0 deletions processor/ipfixlookupprocessor/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ipfixlookupprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/ipfixlookupprocessor"
81 changes: 81 additions & 0 deletions processor/ipfixlookupprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ipfixlookupprocessor

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
)

const (
// this is the name used to refer to the processor in the config.yaml
typeStr = "ipfix_lookup"
)

func NewFactory() processor.Factory {

return processor.NewFactory(
typeStr,
createDefaultConfig,
processor.WithTraces(createTracesToTracesProcessor, component.StabilityLevelAlpha))
}

func createDefaultConfig() component.Config {
return &Config{
QueryParameters: QueryParameters{
BaseQuery: BaseQuery{
FieldName: "input.type",
FieldValue: "netflow",
},
DeviceIdentifier: "fields.observer\\.ip.0",
LookupFields: LookupFields{
SourceIP: "source.ip",
SourcePort: "source.port",
DestinationIP: "destination.ip",
DestinationPort: "destination.port",
},
},
SpanAttributeFields: []string{
"@this",
"fields.event\\.duration.0",
"fields.observer\\.ip.0",
"fields.source\\.ip.0",
"fields.source\\.port.0",
"fields.destination\\.ip.0",
"fields.destination\\.port.0",
"fields.netflow\\.ip_next_hop_ipv4_address",
},

Spans: Spans{
SpanFields: SpanFields{
SourceIPs: []string{
"net.peer.ip",
"src.ip",
},
SourcePorts: []string{
"net.peer.port",
"src.port",
},
DestinationIPandPort: []string{
"http.host",
},
DestinationIPs: []string{
"dst.ip",
},
DestinationPorts: []string{
"dst.port",
},
},
},
}
}

func createTracesToTracesProcessor(_ context.Context, params processor.CreateSettings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) {
c := newIPFIXLookupProcessor(params.Logger, cfg)
c.tracesConsumer = nextConsumer
return c, nil
}
54 changes: 54 additions & 0 deletions processor/ipfixlookupprocessor/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ipfixlookupprocessor

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor/processortest"
)

func TestFactory_Type(t *testing.T) {
factory := NewFactory()
assert.Equal(t, factory.Type(), component.Type(typeStr))
}

func TestFactory_CreateDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NotEqual(t, cfg, &Config{})
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}

func TestFactory_ValidateConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.EqualError(t, component.ValidateConfig(cfg), "elasticsearch addresses must not be empty")
cfg.(*Config).Elasticsearch.Connection.Addresses = []string{"http://localhost:9200"}
assert.EqualError(t, component.ValidateConfig(cfg), "elasticsearch username must not be empty")
cfg.(*Config).Elasticsearch.Connection.Username = "elastic"
assert.EqualError(t, component.ValidateConfig(cfg), "elasticsearch password must not be empty")
cfg.(*Config).Elasticsearch.Connection.Password = "changeme"
assert.EqualError(t, component.ValidateConfig(cfg), "elasticsearch certificateFingerprint must not be empty")
cfg.(*Config).Elasticsearch.Connection.CertificateFingerprint = "xxxx"
assert.NoError(t, component.ValidateConfig(cfg), "elasticsearch addresses must not be empty")
}

func TestNewFactory(t *testing.T) {
factory := NewFactory()
conn, err := factory.CreateTracesProcessor(
context.Background(),
processortest.NewNopCreateSettings(),
factory.CreateDefaultConfig(),
consumertest.NewNop(),
)

assert.NoError(t, err)
assert.NotNil(t, conn)
}
Loading