Skip to content

Commit a1546ad

Browse files
elekegonelbre
authored andcommitted
scope: add RawVal type for raw float64 values
Adds a new RawVal type to store raw float64 values without any aggregation (histogram, sum, etc). This provides a simpler alternative to FloatVal when only the most recent value is needed. With high cardinality data, it's important to use only minimal fields (especially as Prometheus/Victoria can calculate the sum/min/max aggregations)
1 parent b70ae73 commit a1546ad

File tree

3 files changed

+106
-0
lines changed

3 files changed

+106
-0
lines changed

examples/minimal/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ func ComputeThing(ctx context.Context, arg1, arg2 int) (res int, err error) {
5050
mon.Event("hit 3")
5151
}
5252

53+
mon.RawVal("raw").Observe(1.0)
54+
mon.RawValk(monkit.NewSeriesKey("rawk").WithTag("foo", "bar"), monkit.Sum, monkit.Count).Observe(1.0)
5355
mon.BoolVal("was-4").Observe(res == 4)
5456
mon.IntVal("res").Observe(int64(res))
5557
mon.DurationVal("took").Observe(time.Second + time.Duration(rand.Intn(int(10*time.Second))))

scope.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,29 @@ func (s *Scope) DurationVal(name string, tags ...SeriesTag) *DurationVal {
233233
return m
234234
}
235235

236+
// RawValk retrieves or creates a RawVal with a given key and aggregations.
237+
func (s *Scope) RawValk(key SeriesKey, aggregations ...Aggregate) *RawVal {
238+
source := s.newSource(key.String(), func() StatSource {
239+
return NewRawVal(key, aggregations...)
240+
})
241+
m, ok := source.(*RawVal)
242+
if !ok {
243+
panic(fmt.Sprintf("%s already used for another stats source: %#v", key, source))
244+
}
245+
return m
246+
}
247+
248+
// RawVal retrieves or creates a RawVal after the given name and tags.
249+
func (s *Scope) RawVal(name string, tags ...SeriesTag) *RawVal {
250+
return s.RawValk(NewSeriesKey(name).WithTags(tags...))
251+
}
252+
253+
// RawValf retrieves or creates a RawVal after the given printf-formatted
254+
// name.
255+
func (s *Scope) RawValf(template string, args ...interface{}) *RawVal {
256+
return s.RawVal(fmt.Sprintf(template, args...))
257+
}
258+
236259
// Timer retrieves or creates a Timer after the given name.
237260
func (s *Scope) Timer(name string, tags ...SeriesTag) *Timer {
238261
source := s.newSource(sourceName("", name, tags), func() StatSource {

val.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,3 +246,84 @@ func (v *DurationVal) Quantile(quantile float64) (rv time.Duration) {
246246
v.mtx.Unlock()
247247
return rv
248248
}
249+
250+
// Aggregate can implement additional aggregation for collected values.
251+
type Aggregate func() (observe func(val float64), stat func() (field string, val float64))
252+
253+
// RawVal is a simple wrapper around a float64 value without any aggregation
254+
// (histogram, sum, etc). Constructed using NewRawVal, though its expected usage is like:
255+
//
256+
// var mon = monkit.Package()
257+
//
258+
// func MyFunc() {
259+
// ...
260+
// mon.RawVal("value").Observe(val)
261+
// ...
262+
// }
263+
type RawVal struct {
264+
mtx sync.Mutex
265+
value float64
266+
key SeriesKey
267+
stats []func() (field string, val float64)
268+
observers []func(val float64)
269+
}
270+
271+
// NewRawVal creates a RawVal
272+
func NewRawVal(key SeriesKey, aggregations ...Aggregate) *RawVal {
273+
val := &RawVal{key: key}
274+
for _, agg := range aggregations {
275+
observe, stat := agg()
276+
val.stats = append(val.stats, stat)
277+
val.observers = append(val.observers, observe)
278+
}
279+
return val
280+
}
281+
282+
// Observe sets the current value
283+
func (v *RawVal) Observe(val float64) {
284+
v.mtx.Lock()
285+
v.value = val
286+
for _, o := range v.observers {
287+
o(val)
288+
}
289+
v.mtx.Unlock()
290+
}
291+
292+
// Stats implements the StatSource interface.
293+
func (v *RawVal) Stats(cb func(key SeriesKey, field string, val float64)) {
294+
v.mtx.Lock()
295+
cb(v.key, "recent", v.value)
296+
v.mtx.Unlock()
297+
for _, s := range v.stats {
298+
field, value := s()
299+
cb(v.key, field, value)
300+
}
301+
}
302+
303+
// Get returns the current value
304+
func (v *RawVal) Get() float64 {
305+
v.mtx.Lock()
306+
value := v.value
307+
v.mtx.Unlock()
308+
return value
309+
}
310+
311+
// Count is a value aggregator that counts the number of times the value is measured.
312+
func Count() (observe func(val float64), stat func() (field string, val float64)) {
313+
var counter int
314+
return func(val float64) {
315+
counter++
316+
}, func() (field string, val float64) {
317+
return "count", float64(counter)
318+
}
319+
}
320+
321+
// Sum is a value aggregator that summarizes the values measured.
322+
func Sum() (observe func(val float64), stat func() (field string, val float64)) {
323+
var sum int
324+
return func(val float64) {
325+
sum += int(val)
326+
}, func() (field string, val float64) {
327+
return "sum", float64(sum)
328+
}
329+
}

0 commit comments

Comments
 (0)