Skip to content

Commit

Permalink
Suport websockets as a ProxyMode for Envoy (#62)
Browse files Browse the repository at this point in the history
Co-authored-by: Tom Patterer <[email protected]>
  • Loading branch information
relistan and patoms authored Mar 2, 2021
1 parent ff3e175 commit b418716
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 44 deletions.
1 change: 0 additions & 1 deletion .ignore

This file was deleted.

11 changes: 9 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ how to handle a service it has discovered. It uses these to:
2. How to name the service. `ServiceName=`
3. How to health check the service. `HealthCheck` and `HealthCheckArgs`
4. Whether or not the service is a receiver of Sidecar change events. `SidecarListener`
5. Wether or not Sidecar should entirely ignore this service. `SidecarDiscovery`
6. HAproxy proxy behavior. `ProxyMode`
5. Whether or not Sidecar should entirely ignore this service. `SidecarDiscovery`
6. Envoy or HAproxy proxy behavior. `ProxyMode`

**Service Ports**
Services may be started with one or more `ServicePort_xxx` labels that help
Expand Down Expand Up @@ -334,6 +334,13 @@ setting the following Docker label:
ProxyMode=tcp
```

You may also enable Websocket support where it's available (e.g. in Envoy) by
setting:

```
ProxyMode=ws
```

**Templating In Labels**
You sometimes need to pass information in the Docker labels which
is not available to you at the time of container creation. One example of this
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type HAproxyConfig struct {
PidFile string `envconfig:"PID_FILE" default:"/var/run/haproxy.pid"`
Disable bool `envconfig:"DISABLE"`
User string `envconfig:"USER" default:"haproxy"`
Group string `envconfig:"GROUP" default:"haproxy"`
Group string `envconfig:"GROUP" default:""`
UseHostnames bool `envconfig:"USE_HOSTNAMES"`
}

Expand Down
99 changes: 78 additions & 21 deletions envoy/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/golang/protobuf/ptypes/duration"
"github.com/golang/protobuf/ptypes/wrappers"
log "github.com/sirupsen/logrus"
anypb "google.golang.org/protobuf/types/known/anypb"
)

const (
Expand Down Expand Up @@ -160,17 +161,14 @@ func EnvoyResourcesFromState(state *catalog.ServicesState, bindIP string,
}
}

// envoyListenerFromService creates an Envoy listener from a service instance
func envoyListenerFromService(svc *service.Service, envoyServiceName string,
servicePort int64, bindIP string) (cache_types.Resource, error) {

var connectionManagerName string
var connectionManager proto.Message
// connectionManagerForService returns a ConnectionManager configured
// appropriately for the Sidecar service
func connectionManagerForService(svc *service.Service, envoyServiceName string) (managerName string, manager proto.Message, err error) {
switch svc.ProxyMode {
case "http":
connectionManagerName = wellknown.HTTPConnectionManager
managerName = wellknown.HTTPConnectionManager

connectionManager = &hcm.HttpConnectionManager{
manager = &hcm.HttpConnectionManager{
StatPrefix: "ingress_http",
HttpFilters: []*hcm.HttpFilter{{
Name: wellknown.Router,
Expand Down Expand Up @@ -201,23 +199,89 @@ func envoyListenerFromService(svc *service.Service, envoyServiceName string,
},
}
case "tcp":
connectionManagerName = wellknown.TCPProxy
managerName = wellknown.TCPProxy

connectionManager = &tcpp.TcpProxy{
manager = &tcpp.TcpProxy{
StatPrefix: "ingress_tcp",
ClusterSpecifier: &tcpp.TcpProxy_Cluster{
Cluster: envoyServiceName,
},
}
case "ws":
managerName = wellknown.HTTPConnectionManager

manager = &hcm.HttpConnectionManager{
StatPrefix: "ingress_http",
HttpFilters: []*hcm.HttpFilter{{
Name: wellknown.Router,
}},
RouteSpecifier: &hcm.HttpConnectionManager_RouteConfig{
RouteConfig: &api.RouteConfiguration{
ValidateClusters: &wrappers.BoolValue{Value: false},
VirtualHosts: []*route.VirtualHost{{
Name: svc.Name,
Domains: []string{"*"},
Routes: []*route.Route{{
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: "/",
},
},
Action: &route.Route_Route{
Route: &route.RouteAction{
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: envoyServiceName,
},
Timeout: &duration.Duration{},
},
},
}},
}},
},
},
UpgradeConfigs: []*hcm.HttpConnectionManager_UpgradeConfig{
{
UpgradeType: "websocket",
},
},
}
default:
return nil, fmt.Errorf("unrecognised proxy mode: %s", svc.ProxyMode)
return "", nil, fmt.Errorf("unrecognised proxy mode: %s", svc.ProxyMode)
}

// If it was a supported type, return the result
return managerName, manager, nil
}

// filterChainsForService returns a filter chain configured appropriately for
// the Sidecar service
func filterChainsForService(svc *service.Service, managerName string, serializedManager *anypb.Any) []*listener.FilterChain {
return []*listener.FilterChain{{
Filters: []*listener.Filter{{
Name: managerName,
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: serializedManager,
},
}},
}}
}

// envoyListenerFromService creates an Envoy listener from a service instance
func envoyListenerFromService(svc *service.Service, envoyServiceName string,
servicePort int64, bindIP string) (cache_types.Resource, error) {

managerName, manager, err := connectionManagerForService(svc, envoyServiceName)
if err != nil {
return nil, fmt.Errorf("failed to create the connection manager: %w", err)
}

serialisedConnectionManager, err := ptypes.MarshalAny(connectionManager)
serializedManager, err := ptypes.MarshalAny(manager)
if err != nil {
return nil, fmt.Errorf("failed to create the connection manager: %s", err)
return nil, fmt.Errorf("failed to create the connection manager: %w", err)
}

filterChains := filterChainsForService(svc, managerName, serializedManager)

return &api.Listener{
Name: envoyServiceName,
Address: &core.Address{
Expand All @@ -230,14 +294,7 @@ func envoyListenerFromService(svc *service.Service, envoyServiceName string,
},
},
},
FilterChains: []*listener.FilterChain{{
Filters: []*listener.Filter{{
Name: connectionManagerName,
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: serialisedConnectionManager,
},
}},
}},
FilterChains: filterChains,
}, nil
}

Expand Down
70 changes: 60 additions & 10 deletions envoy/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"net"
"sort"
"testing"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/Nitro/sidecar/config"
"github.com/Nitro/sidecar/envoy/adapter"
"github.com/Nitro/sidecar/service"

api "github.com/envoyproxy/go-control-plane/envoy/api/v2"
core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
hcm "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2"
Expand All @@ -22,11 +24,15 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/resource/v2"
xds "github.com/envoyproxy/go-control-plane/pkg/server/v2"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"

"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"google.golang.org/grpc"

"github.com/relistan/go-director"
log "github.com/sirupsen/logrus"

. "github.com/smartystreets/goconvey/convey"
"google.golang.org/grpc"
)

const (
Expand All @@ -53,7 +59,8 @@ func validateListener(serialisedListener *any.Any, svc service.Service) {
filters := filterChains[0].GetFilters()
So(filters, ShouldHaveLength, 1)

if svc.ProxyMode == "http" {
switch svc.ProxyMode {
case "http":
So(filters[0].GetName(), ShouldEqual, wellknown.HTTPConnectionManager)
connectionManager := &hcm.HttpConnectionManager{}
err = ptypes.UnmarshalAny(filters[0].GetTypedConfig(), connectionManager)
Expand All @@ -68,13 +75,33 @@ func validateListener(serialisedListener *any.Any, svc service.Service) {
So(route, ShouldNotBeNil)
So(route.GetCluster(), ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort))
So(route.GetTimeout(), ShouldNotBeNil)
} else { // tcp
case "tcp":
So(filters[0].GetName(), ShouldEqual, wellknown.TCPProxy)
connectionManager := &tcpp.TcpProxy{}
err = ptypes.UnmarshalAny(filters[0].GetTypedConfig(), connectionManager)
So(err, ShouldBeNil)
So(connectionManager.GetStatPrefix(), ShouldEqual, "ingress_tcp")
So(connectionManager.GetCluster(), ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort))
case "ws":
So(filters[0].GetName(), ShouldEqual, wellknown.HTTPConnectionManager)
connectionManager := &hcm.HttpConnectionManager{}
err = ptypes.UnmarshalAny(filters[0].GetTypedConfig(), connectionManager)
So(err, ShouldBeNil)
So(connectionManager.GetStatPrefix(), ShouldEqual, "ingress_http")
So(connectionManager.GetRouteConfig(), ShouldNotBeNil)
So(connectionManager.GetRouteConfig().GetVirtualHosts(), ShouldHaveLength, 1)
virtualHost := connectionManager.GetRouteConfig().GetVirtualHosts()[0]
So(virtualHost.GetName(), ShouldEqual, svc.Name)
So(virtualHost.GetRoutes(), ShouldHaveLength, 1)
route := virtualHost.GetRoutes()[0].GetRoute()
So(route, ShouldNotBeNil)
So(route.GetCluster(), ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort))
So(route.GetTimeout(), ShouldNotBeNil)

// websocket stuff
upgradeConfigs := connectionManager.GetUpgradeConfigs()
So(len(upgradeConfigs), ShouldEqual, 1)
So(upgradeConfigs[0].UpgradeType, ShouldEqual, "websocket")
}
}

Expand Down Expand Up @@ -137,7 +164,8 @@ func (sv *EnvoyMock) GetResource(stream envoy_discovery.AggregatedDiscoveryServi
So(err, ShouldBeNil)
}

// Recv() blocks until the stream ctx expires if the message sent via Send() is not recognised / valid
// Recv() blocks until the stream ctx expires if the message sent via
// Send() is not recognised / valid
response, err := stream.Recv()

So(err, ShouldBeNil)
Expand All @@ -155,8 +183,8 @@ func (sv *EnvoyMock) ValidateResources(stream envoy_discovery.AggregatedDiscover
}
}

// SnapshotCache is a light wrapper around cache.SnapshotCache which lets
// us get a notification after calling SetSnapshot via the Waiter chan
// SnapshotCache is a light wrapper around cache.SnapshotCache which lets us
// get a notification after calling SetSnapshot via the Waiter chan
type SnapshotCache struct {
cache.SnapshotCache
Waiter chan struct{}
Expand Down Expand Up @@ -184,6 +212,8 @@ func Test_PortForServicePort(t *testing.T) {
BindIP: bindIP,
}

log.SetOutput(ioutil.Discard)

state := catalog.NewServicesState()

dummyHostname := "carcasone"
Expand Down Expand Up @@ -227,6 +257,19 @@ func Test_PortForServicePort(t *testing.T) {
},
}

wsSvc := service.Service{
ID: "deadbeef666",
Name: "kafka",
Created: baseTime,
Hostname: dummyHostname,
Updated: baseTime,
Status: service.ALIVE,
ProxyMode: "ws",
Ports: []service.Port{
{IP: "127.0.0.1", Port: 6666, ServicePort: 10102},
},
}

ctx, cancel := context.WithCancel(context.Background())
Reset(func() {
cancel()
Expand All @@ -242,14 +285,14 @@ func Test_PortForServicePort(t *testing.T) {
xdsServer: xds.NewServer(ctx, snapshotCache, &xdsCallbacks{}),
}

// The gRPC listener will be assigned a random port and will be owned and managed
// by the gRPC server
// The gRPC listener will be assigned a random port and will be owned
// and managed by the gRPC server
lis, err := net.Listen("tcp", ":0")
So(err, ShouldBeNil)
So(lis.Addr(), ShouldHaveSameTypeAs, &net.TCPAddr{})

// Using a FreeLooper instead would make it run too often, triggering spurious
// locking on the state, which can cause the tests to time out
// Using a FreeLooper instead would make it run too often, triggering
// spurious locking on the state, which can cause the tests to time out
go server.Run(ctx, director.NewTimedLooper(director.FOREVER, 10*time.Millisecond, make(chan error)), lis)

Convey("sends the Envoy state via gRPC", func() {
Expand Down Expand Up @@ -317,6 +360,13 @@ func Test_PortForServicePort(t *testing.T) {
envoyMock.ValidateResources(stream, tcpSvc, state.Hostname)
})

Convey("for a Websocket service", func() {
state.AddServiceEntry(wsSvc)
<-snapshotCache.Waiter

envoyMock.ValidateResources(stream, wsSvc, state.Hostname)
})

Convey("and skips tombstones", func() {
httpSvc.Tombstone()
state.AddServiceEntry(httpSvc)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 // indirect
golang.org/x/text v0.3.2 // indirect
google.golang.org/grpc v1.27.0
google.golang.org/protobuf v1.23.0
gopkg.in/alecthomas/kingpin.v2 v2.2.5
gopkg.in/jarcoal/httpmock.v1 v1.0.0-20170412085702-cf52904a3cf0
gopkg.in/relistan/rubberneck.v1 v1.0.1
Expand Down
9 changes: 7 additions & 2 deletions haproxy/haproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type HAproxy struct {

// Constructs a properly configured HAProxy and returns a pointer to it
func New(configFile string, pidFile string) *HAproxy {
reloadCmd := "haproxy -f " + configFile + " -p " + pidFile + " `[[ -f " + pidFile + " ]] && echo \"-sf $(cat " + pidFile + ")\"]]`"
reloadCmd := "haproxy -f " + configFile + " -p " + pidFile + " `[[ -f " + pidFile + " ]] && echo \"-sf $(cat " + pidFile + ")\"`"
verifyCmd := "haproxy -c -f " + configFile

proxy := HAproxy{
Expand Down Expand Up @@ -337,7 +337,12 @@ func getModes(state *catalog.ServicesState) map[string]string {
modeMap := make(map[string]string)
state.EachService(
func(hostname *string, serviceId *string, svc *service.Service) {
modeMap[svc.Name] = svc.ProxyMode
mode := svc.ProxyMode
// Treat websockets like HTTP
if mode == "ws" {
mode = "http"
}
modeMap[svc.Name] = mode
},
)
return modeMap
Expand Down
Loading

0 comments on commit b418716

Please sign in to comment.