Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial framework from ipfixlookupprocessor #30194

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions processor/ipfixlookupprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
149 changes: 149 additions & 0 deletions processor/ipfixlookupprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# IPFIX Lookup Processor
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Distributions | [contrib]|
| Issues |[![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aconnector%2Fipfix%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aconnector%2Fipfix) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aconnector%2Fipfix%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aconnector%2Fipfix) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib

## Supported Pipeline Types

| [Exporter Pipeline Type] | [Receiver Pipeline Type] | [Stability Level] |
| ------------------------ | ------------------------ | ----------------- |
| traces | traces | [development] |

[Exporter Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#exporter-pipeline-type
[Receiver Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#receiver-pipeline-type
[Stability Level]: https://github.com/open-telemetry/opentelemetry-collector#stability-levels
<!-- end autogenerated section -->

[ElasticSearch]: https://www.elastic.co/elasticsearch/

The `ipfixlookup` processor can be used to inject IPFIX spans into existing traces.

## Prerequisits

You need to store your Netflow/IPFIX logs in [ElasticSearch] to use this processor. Please check the [Netflow Integration](https://docs.elastic.co/en/integrations/netflow) for more information.

### Default Configuration

The `ipfixlookup` processorr will require the following minimum configuration.


For example, in the following configuration, the processor will connect to the specified [ElasticSearch] instance and search for IPFIX/Netflow events in the specified time window.
```yaml
processors:
groupbytrace:
wait_duration: 100s
num_traces: 1000
num_workers: 2
ipfix_lookup:
elastic_search:
connection:
addresses:
- https://<elastic-search-address>:9200/
username: elastic
password: <password>
certificate_fingerprint: <certificate fingerprint>
timing:
lookup_window: 25

service:
pipelines:
traces:
receivers: [otlp]
processors: [groupbytrace, ipfix_lookup]
exporters: [otlp/jaeger, debug]
```

### Custom lookup fields



Optionally, you can specify the fields the processor will look up in [ElasticSearch] and match within the spans

| Full Path Configuration | Description |
| -------------------------------------------- | --------------------------------------------------------------------------------------------- |
| `ipfix_lookup.query_parameters` | Parameters used for querying the IPFIX lookup processor. |
| `ipfix_lookup.base_query.field_name` | The name of the field used in the base query. |
| `ipfix_lookup.base_query.field_value` | The value of the field used in the base query. |
| `ipfix_lookup.device_identifier` | The field used to identify the device in IPFIX records. |
| `ipfix_lookup.lookup_fields.source_ip` | Field representing the source IP address. |
| `ipfix_lookup.lookup_fields.source_port` | Field representing the source port. |
| `ipfix_lookup.lookup_fields.destination_ip` | Field representing the destination IP address. |
| `ipfix_lookup.lookup_fields.destination_port`| Field representing the destination port. |
| `ipfix_lookup.span_attribute_fields` | Fields to be added as attributes to the new span. In [gjson](https://gjson.dev/) format. |
| `ipfix_lookup.spans.source_ips` | Fields representing the source IP address of the span. |
| `ipfix_lookup.spans.source_ports` | Fields representing the source port of the span. |
| `ipfix_lookup.spans.destination_ip_and_port` | Fields representing the destination IP and port of the span. Like: `192.168.10.10:443` |
| `ipfix_lookup.spans.destination_ips` | Fields representing the destination IP address of the span. |
| `ipfix_lookup.spans.destination_ports` | Fields representing the destination port of the span. |

The configuration below shows the default values:


```yaml
processors:
groupbytrace:
wait_duration: 10s
num_traces: 1000
num_workers: 2
ipfix_lookup:
query_parameters:
base_query:
field_name: input.type
field_value: netflow
device_identifier: "fields.observer\\.ip.0"
lookup_fields:
source_ip: source.ip
source_port: source.port
destination_ip: destination.ip
destination_port: destination.port
span_attribute_fields:
- "@this"
- "fields.event\\.duration.0"
- "fields.observer\\.ip.0"
- "fields.source\\.ip.0"
- "fields.source\\.port.0"
- "fields.destination\\.ip.0"
- "fields.destination\\.port.0"
- "fields.netflow\\.ip_next_hop_ipv4_address"
spans:
span_fields:
source_ips:
- net.peer.ip
- net.peer.name
- src.ip
source_ports:
- net.peer.port
- src.port
destination_ip_and_port:
- http.host
destination_ips:
- dst.ip
- net.peer.name
destination_ports:
- dst.port
```

### Timings:

This processor is responsible for looking through the spans in each trace. If the IP and port quartet (`source.ip, source.port, destination.ip, destination.port`) are found in a span, the corresponding flow is looked up in ElasticSearch. When flows are found, a new span is added to the trace, and the trace is exported.

![CorrelationUnitv3 drawio](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/15678530/5bb8e7de-c254-4991-871d-05c9c6d6f3f6)


The timing configuration is needed because there is an ingest delay in any large distributed search engine. Because of this, the processor must wait a bit before the search can be started. This delay can be defined in the `processors.groupbytrace.wait_duration` value. Afterwards, the search can be started. The time window that will be searched can be configured in the` processors.ipfix_lookup.timing.lookup_window`. To keep the processor simple, the lookup_window is added before the start timestamp and after the end timestamp. This way, the chance that the Netflow/IPFIX records leading or being caused by this span is found is maximized.


# Example screenshot
Example of a working implementation:
![finnal-implementation](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/15678530/37036d33-07f1-4c9e-bdea-7834a5e01015)
(The network was intentionally slowed down for this screenshot)




13 changes: 13 additions & 0 deletions processor/ipfixlookupprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ipfixlookupprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/ipfixlookupprocessor"

// Config for the processor
type Config struct {
// TODO
}

func (c *Config) Validate() error {
return nil
}
4 changes: 4 additions & 0 deletions processor/ipfixlookupprocessor/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ipfixlookupprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/ipfixlookupprocessor"
35 changes: 35 additions & 0 deletions processor/ipfixlookupprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ipfixlookupprocessor

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
)

const (
// this is the name used to refer to the processor in the config.yaml
typeStr = "ipfixLookup"
)

func NewFactory() processor.Factory {

return processor.NewFactory(
typeStr,
createDefaultConfig,
processor.WithTraces(createTracesToTracesProcessor, component.StabilityLevelAlpha))
}

func createDefaultConfig() component.Config {
return &Config{}
}

func createTracesToTracesProcessor(_ context.Context, params processor.CreateSettings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) {
c := newProcessor(params.Logger, cfg)
c.tracesConsumer = nextConsumer
return c, nil
}
40 changes: 40 additions & 0 deletions processor/ipfixlookupprocessor/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ipfixlookupprocessor

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor/processortest"
)

func TestFactory_Type(t *testing.T) {
factory := NewFactory()
assert.Equal(t, factory.Type(), component.Type(typeStr))
}

func TestFactory_CreateDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.Equal(t, cfg, &Config{})
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}

func TestNewFactory(t *testing.T) {
factory := NewFactory()
conn, err := factory.CreateTracesProcessor(
context.Background(),
processortest.NewNopCreateSettings(),
factory.CreateDefaultConfig(),
consumertest.NewNop(),
)

assert.NoError(t, err)
assert.NotNil(t, conn)
}
66 changes: 66 additions & 0 deletions processor/ipfixlookupprocessor/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/processor/ipfixlookupprocessor

go 1.20

require (
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.88.1-0.20231026220224-6405e152a2d9
go.opentelemetry.io/collector/consumer v0.88.1-0.20231026220224-6405e152a2d9
go.opentelemetry.io/collector/pdata v1.0.0
go.opentelemetry.io/collector/processor v0.88.0
go.uber.org/zap v1.26.0
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.91.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector v0.88.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.0.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.0.0-20231106091314-ad6fa27ad929
go.opentelemetry.io/collector/config/configtelemetry v0.88.1-0.20231026220224-6405e152a2d9 // indirect
go.opentelemetry.io/collector/confmap v0.88.1-0.20231026220224-6405e152a2d9 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0017.0.20231026220224-6405e152a2d9 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter => ../../internal/filter

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl => ../../pkg/ottl

retract (
v0.76.2
v0.76.1
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
Loading
Loading