Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

contrib: skeleton app structure & Dogstatsd nsqd addon #909

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions apps/nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/BurntSushi/toml"
"github.com/judwhite/go-svc/svc"
"github.com/mreiferson/go-options"
"github.com/nsqio/nsq/contrib"
"github.com/nsqio/nsq/internal/app"
"github.com/nsqio/nsq/internal/version"
"github.com/nsqio/nsq/nsqd"
Expand Down Expand Up @@ -144,6 +145,9 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.Int("max-deflate-level", opts.MaxDeflateLevel, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)")
flagSet.Bool("snappy", opts.SnappyEnabled, "enable snappy feature negotiation (client compression)")

optModulesOptions := app.StringArray{}
flagSet.Var(&optModulesOptions, "mod-opt", "optional module options, of form: --mod-opt={{moduleName}}={{moduleOpt}}={{moduleOptValue}}")

return flagSet
}

Expand Down Expand Up @@ -232,6 +236,10 @@ func (p *program) Start() error {
}
nsqd.Main()

// hook into addons
addons := contrib.NewEnabledNSQDAddons(opts.ModOpt, nsqd)
addons.Start()

p.nsqd = nsqd
return nil
}
Expand Down
83 changes: 83 additions & 0 deletions contrib/datadog_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package contrib

import (
"errors"
"fmt"
"net"
"strings"
"time"
)

type DataDogClient struct {
conn net.Conn
addr string
prefix string
}

type DataDogTag struct {
k string
v string
}

type DataDogTags struct {
tags []*DataDogTag
}

// returns dogstatd compatible string
// "#tag1:value1,tag2:value2
func (ddt *DataDogTags) String() string {
ts := []string{}
for _, tag := range ddt.tags {
ts = append(ts, fmt.Sprintf("%s:%s", tag.k, tag.v))
}
return "#" + strings.Join(ts, ",")
}

func NewDataDogClient(addr string, prefix string) *DataDogClient {
return &DataDogClient{
addr: addr,
prefix: prefix,
}
}

func (c *DataDogClient) String() string {
return c.addr
}

func (c *DataDogClient) CreateSocket() error {
conn, err := net.DialTimeout("udp", c.addr, time.Second)
if err != nil {
return err
}
c.conn = conn
return nil
}

func (c *DataDogClient) Close() error {
return c.conn.Close()
}

func (c *DataDogClient) Incr(stat string, count int64, tags *DataDogTags) error {
return c.send(stat, "%d|c", count, tags)
}

func (c *DataDogClient) Decr(stat string, count int64, tags *DataDogTags) error {
return c.send(stat, "%d|c", -count, tags)
}

func (c *DataDogClient) Timing(stat string, delta int64, tags *DataDogTags) error {
return c.send(stat, "%d|ms", delta, tags)
}

func (c *DataDogClient) Gauge(stat string, value int64, tags *DataDogTags) error {
return c.send(stat, "%d|g", value, tags)
}

func (c *DataDogClient) send(stat string, format string, value int64, tags *DataDogTags) error {
if c.conn == nil {
return errors.New("not connected")
}
format = fmt.Sprintf("%s%s:%s|%s", c.prefix, stat, format, tags.String())
_, err := fmt.Fprintf(c.conn, format, value)
return err
}
58 changes: 58 additions & 0 deletions contrib/datadog_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package contrib

import (
"testing"
"github.com/nsqio/nsq/internal/test"
"net"
)

func TestDDTagsStringNoTags(t *testing.T) {
test.Equal(
t,
(&DataDogTags{}).String(),
"#",
)
}

func TestDDTagsStringSingleString(t *testing.T) {
test.Equal(
t,
(&DataDogTags{
tags: []*DataDogTag{
{k: "topic_name", v: "test_topic"},
},
}).String(),
"#topic_name:test_topic",
)
}

func TestDDTagsStringMultipleStrings(t *testing.T) {
test.Equal(
t,
(&DataDogTags{
tags: []*DataDogTag{
{k: "topic_name", v: "test_topic"},
{k: "channel_name", v: "test_channel"},
},
}).String(),
"#topic_name:test_topic,channel_name:test_channel",
)
}

func TestDDCSend(t *testing.T) {
r, w := net.Pipe()
b := make([]byte, len("nsq.topic.depth:100|t|#"))

go func() {
ddc := &DataDogClient{
conn: w,
addr: "test",
prefix: "nsq.",
}
testValue := int64(100)
ddc.send("topic.depth", "%d|t", testValue, &DataDogTags{})
}()

r.Read(b)
test.Equal(t, string(b), "nsq.topic.depth:100|t|#")
}
Loading