Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,9 @@ type Statsd struct {
TCPlistener *net.TCPListener

// track current connections so we can close them in Stop()
conns map[string]*net.TCPConn
graphiteParser *graphite.Parser
acc telegraf.Accumulator
bufPool sync.Pool // pool of byte slices to handle parsing
conns map[string]*net.TCPConn
acc telegraf.Accumulator
bufPool sync.Pool // pool of byte slices to handle parsing

lastGatherTime time.Time

Expand Down Expand Up @@ -568,6 +567,10 @@ func (s *Statsd) udpListen(conn *net.UDPConn) error {
// packet into statsd strings and then calls parseStatsdLine, which parses a
// single statsd metric into a struct.
func (s *Statsd) parser() error {
p, err := s.newGraphiteParser()
if err != nil {
return err
}
for {
select {
case <-s.done:
Expand All @@ -590,7 +593,7 @@ func (s *Statsd) parser() error {
s.Log.Debugf(" line was: %s", line)
}
default:
if err := s.parseStatsdLine(line); err != nil {
if err := s.parseStatsdLine(p, line); err != nil {
if !errors.Is(err, errParsing) {
// Ignore parsing errors but error out on
// everything else...
Expand All @@ -607,7 +610,7 @@ func (s *Statsd) parser() error {

// parseStatsdLine will parse the given statsd line, validating it as it goes.
// If the line is valid, it will be cached for the next call to Gather()
func (s *Statsd) parseStatsdLine(line string) error {
func (s *Statsd) parseStatsdLine(p *graphite.Parser, line string) error {
lineTags := make(map[string]string)
if s.DataDogExtensions {
recombinedSegments := make([]string, 0)
Expand Down Expand Up @@ -718,7 +721,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
}

// Parse the name & tags from bucket
m.name, m.field, m.tags = s.parseName(m.bucket)
m.name, m.field, m.tags = s.parseName(p, m.bucket)
switch m.mtype {
case "c":
m.tags["metric_type"] = "counter"
Expand Down Expand Up @@ -766,9 +769,7 @@ func (s *Statsd) parseStatsdLine(line string) error {
// config file. If there is a match, it will parse the name of the metric and
// map of tags.
// Return values are (<name>, <field>, <tags>)
func (s *Statsd) parseName(bucket string) (name, field string, tags map[string]string) {
s.Lock()
defer s.Unlock()
func (s *Statsd) parseName(p *graphite.Parser, bucket string) (name, field string, tags map[string]string) {
tags = make(map[string]string)

bucketparts := strings.Split(bucket, ",")
Expand All @@ -795,20 +796,9 @@ func (s *Statsd) parseName(bucket string) (name, field string, tags map[string]s
s.Log.Errorf("Unknown sanitizae name method: %s", s.SanitizeNamesMethod)
}

p := s.graphiteParser
var err error

if p == nil || s.graphiteParser.Separator != s.MetricSeparator {
p = &graphite.Parser{Separator: s.MetricSeparator, Templates: s.Templates}
err = p.Init()
s.graphiteParser = p
}

if err == nil {
p.DefaultTags = tags
//nolint:errcheck // unable to propagate
name, tags, field, _ = p.ApplyTemplate(name)
}
p.DefaultTags = tags
//nolint:errcheck // unable to propagate
name, tags, field, _ = p.ApplyTemplate(name)

if s.ConvertNames {
name = strings.ReplaceAll(name, ".", "_")
Expand Down Expand Up @@ -1064,6 +1054,17 @@ func (s *Statsd) expireCachedMetrics() {
}
}

// newGraphiteParser initializes and returns a new graphite.Parser. graphite.Parser returned is not safe to be used in
// multiple goroutines.
func (s *Statsd) newGraphiteParser() (*graphite.Parser, error) {
p := &graphite.Parser{Separator: s.MetricSeparator, Templates: s.Templates}
err := p.Init()
if err != nil {
return nil, err
}
return p, nil
}

func init() {
inputs.Add("statsd", func() telegraf.Input {
return &Statsd{
Expand Down
Loading
Loading