Skip to content

Commit 5eb3861

Browse files
committed
review round 2 answer 1
1 parent 51b24c1 commit 5eb3861

File tree

2 files changed

+94
-101
lines changed

2 files changed

+94
-101
lines changed

monitoring/exporter/stackdriver/stackdriver.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,14 @@ type Options struct {
6767
// RPC calls.
6868
ClientOptions []option.ClientOption
6969

70-
// options for bundles amortizing export requests. Note that a bundle is created for each
70+
// Options for bundles amortizing export requests. Note that a bundle is created for each
7171
// project. When not provided, default values in bundle package are used.
72+
73+
// BundleDelayThreshold determines the max amount of time the exporter can wait before
74+
// uploading data to the stackdriver.
7275
BundleDelayThreshold time.Duration
76+
// BundleCountThreshold determines how many RowData objects can be buffered before batch
77+
// uploading them to the backend.
7378
BundleCountThreshold int
7479

7580
// Callback functions provided by user.
@@ -85,8 +90,8 @@ type Options struct {
8590
GetProjectID func(*RowData) (projectID string, err error)
8691
// OnError is used to report any error happened while exporting view data fails. Whenever
8792
// this function is called, it's guaranteed that at least one row data is also passed to
88-
// OnError. Row data passed to OnError must not be modified. When OnError is not set, all
89-
// errors happened on exporting are ignored.
93+
// OnError. Row data passed to OnError must not be modified and OnError must be
94+
// non-blocking. When OnError is not set, all errors happened on exporting are ignored.
9095
OnError func(error, ...*RowData)
9196
// MakeResource creates monitored resource from RowData. It is guaranteed that only RowData
9297
// that passes GetProjectID will be given to this function. Though not recommended, error
@@ -221,6 +226,15 @@ func (e *Exporter) exportRowData(rd *RowData) {
221226
}
222227
}
223228

229+
// projectData contain per-project data in exporter. It should be created by newProjectData()
230+
type projectData struct {
231+
parent *Exporter
232+
projectID string
233+
// We make bundler for each project because call to monitoring RPC can be grouped only in
234+
// project level
235+
bndler *bundler.Bundler
236+
}
237+
224238
func (e *Exporter) getProjectData(projectID string) *projectData {
225239
e.mu.Lock()
226240
defer e.mu.Unlock()
@@ -233,6 +247,23 @@ func (e *Exporter) getProjectData(projectID string) *projectData {
233247
return pd
234248
}
235249

250+
func (e *Exporter) newProjectData(projectID string) *projectData {
251+
pd := &projectData{
252+
parent: e,
253+
projectID: projectID,
254+
}
255+
256+
pd.bndler = newBundler((*RowData)(nil), pd.uploadRowData)
257+
// Set options for bundler if they are provided by users.
258+
if 0 < e.opts.BundleDelayThreshold {
259+
pd.bndler.DelayThreshold = e.opts.BundleDelayThreshold
260+
}
261+
if 0 < e.opts.BundleCountThreshold {
262+
pd.bndler.BundleCountThreshold = e.opts.BundleCountThreshold
263+
}
264+
return pd
265+
}
266+
236267
// Close flushes and closes the exporter. Close must be called after the exporter is unregistered
237268
// and no further calls to ExportView() are made. Once Close() is returned no further access to the
238269
// exporter is allowed in any way.

monitoring/exporter/stackdriver/upload.go

Lines changed: 60 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -18,123 +18,69 @@ import (
1818
"fmt"
1919

2020
"go.opencensus.io/tag"
21-
"google.golang.org/api/support/bundler"
2221
metricpb "google.golang.org/genproto/googleapis/api/metric"
2322
mpb "google.golang.org/genproto/googleapis/monitoring/v3"
2423
)
2524

26-
// MaxTimeSeriePerUpload is the maximum number of time series that stackdriver accepts. Only test
27-
// may change this value.
28-
var MaxTimeSeriesPerUpload = 200
25+
// This file contains features used for uploading row data to stackdriver when bundler makes an
26+
// upload request by calling projectData.uploadRowData().
2927

30-
// projectData contain per-project data in exporter. It should be created by newProjectData()
31-
type projectData struct {
32-
parent *Exporter
33-
projectID string
34-
// We make bundler for each project because call to monitoring RPC can be grouped only in
35-
// project level
36-
bndler *bundler.Bundler
37-
}
38-
39-
func (e *Exporter) newProjectData(projectID string) *projectData {
40-
pd := &projectData{
41-
parent: e,
42-
projectID: projectID,
43-
}
44-
45-
pd.bndler = newBundler((*RowData)(nil), pd.uploadRowData)
46-
// Set options for bundler if they are provided by users.
47-
if 0 < e.opts.BundleDelayThreshold {
48-
pd.bndler.DelayThreshold = e.opts.BundleDelayThreshold
49-
}
50-
if 0 < e.opts.BundleCountThreshold {
51-
pd.bndler.BundleCountThreshold = e.opts.BundleCountThreshold
52-
}
53-
return pd
54-
}
28+
// MaxTimeSeriePerUpload is the maximum number of time series that's uploaded to the stackdriver
29+
// at once. Consumer may change this value, but note that stackdriver may reject upload request if
30+
// the number of time series is too large.
31+
var MaxTimeSeriesPerUpload = 100
5532

5633
// uploadRowData is called by bundler to upload row data, and report any error happened meanwhile.
5734
func (pd *projectData) uploadRowData(bundle interface{}) {
5835
exp := pd.parent
5936
rds := bundle.([]*RowData)
6037

61-
// reqRds contains RowData objects those are uploaded to stackdriver at given iteration.
62-
// It's main usage is for error reporting. For actual uploading operation, we use req.
63-
// remainingRds are RowData that has not been processed at all.
64-
var reqRds, remainingRds []*RowData
65-
for ; len(rds) != 0; rds = remainingRds {
66-
var req *mpb.CreateTimeSeriesRequest
67-
req, reqRds, remainingRds = pd.makeReq(rds)
68-
if req == nil {
69-
// No need to perform RPC call for empty set of requests.
70-
continue
71-
}
72-
if err := createTimeSeries(exp.client, exp.ctx, req); err != nil {
73-
newErr := fmt.Errorf("RPC call to create time series failed for project %s: %v", pd.projectID, err)
74-
// We pass all row data not successfully uploaded.
75-
exp.onError(newErr, reqRds...)
76-
}
77-
}
78-
}
79-
80-
// makeReq creates a request that's suitable to be passed to create time series RPC call.
81-
//
82-
// reqRds contains rows those are contained in req. Main use of reqRds is to be returned to users if
83-
// creating time series failed. (We don't want users to investigate structure of timeseries.)
84-
// remainingRds contains rows those are not used at all in makeReq because of the length limitation
85-
// or request. Another call of makeReq() with remainigRds will handle (some) rows in them. When req
86-
// is nil, then there's nothing to request and reqRds will also contain nothing.
87-
//
88-
// Some rows in rds may fail while converting them to time series, and in that case makeReq() calls
89-
// exporter's onError() directly, not propagating errors to the caller.
90-
func (pd *projectData) makeReq(rds []*RowData) (req *mpb.CreateTimeSeriesRequest, reqRds, remainingRds []*RowData) {
91-
exp := pd.parent
92-
timeSeries := []*mpb.TimeSeries{}
38+
// uploadTs contains TimeSeries objects that needs to be uploaded.
39+
var uploadTs []*mpb.TimeSeries = nil
40+
// uploadRds contains RowData objects corresponds to uploadTs. It's used for error reporting
41+
// when upload operation fails.
42+
var uploadRds []*RowData = nil
9343

94-
var i int
95-
var rd *RowData
96-
for i, rd = range rds {
97-
pt := newPoint(rd.View, rd.Row, rd.Start, rd.End)
98-
if pt.Value == nil {
99-
err := fmt.Errorf("inconsistent data found in view %s", rd.View.Name)
100-
pd.parent.onError(err, rd)
101-
continue
102-
}
103-
resource, err := exp.makeResource(rd)
44+
for _, rd := range rds {
45+
ts, err := exp.makeTS(rd)
10446
if err != nil {
105-
newErr := fmt.Errorf("failed to construct resource of view %s: %v", rd.View.Name, err)
106-
pd.parent.onError(newErr, rd)
47+
exp.onError(err, rd)
10748
continue
10849
}
109-
110-
ts := &mpb.TimeSeries{
111-
Metric: &metricpb.Metric{
112-
Type: rd.View.Name,
113-
Labels: exp.makeLabels(rd.Row.Tags),
114-
},
115-
Resource: resource,
116-
Points: []*mpb.Point{pt},
117-
}
118-
// Growing timeseries and reqRds are done at same time.
119-
timeSeries = append(timeSeries, ts)
120-
reqRds = append(reqRds, rd)
121-
// Don't grow timeseries over the limit.
122-
if len(timeSeries) == MaxTimeSeriesPerUpload {
123-
break
50+
// Time series created. We update both uploadTs and uploadRds.
51+
uploadTs = append(uploadTs, ts)
52+
uploadRds = append(uploadRds, rd)
53+
if len(uploadTs) == MaxTimeSeriesPerUpload {
54+
pd.uploadTimeSeries(uploadTs, uploadRds)
55+
uploadTs = nil
56+
uploadRds = nil
12457
}
12558
}
59+
// Upload any remaining time series.
60+
if len(uploadTs) != 0 {
61+
pd.uploadTimeSeries(uploadTs, uploadRds)
62+
}
63+
}
12664

127-
// Since i is the last index processed, remainingRds should start from i+1.
128-
remainingRds = rds[i+1:]
129-
if len(timeSeries) == 0 {
130-
req = nil
131-
} else {
132-
req = &mpb.CreateTimeSeriesRequest{
133-
Name: fmt.Sprintf("projects/%s", pd.projectID),
134-
TimeSeries: timeSeries,
135-
}
65+
// makeTS constructs a time series from a row data.
66+
func (e *Exporter) makeTS(rd *RowData) (*mpb.TimeSeries, error) {
67+
pt := newPoint(rd.View, rd.Row, rd.Start, rd.End)
68+
if pt.Value == nil {
69+
return nil, fmt.Errorf("inconsistent data found in view %s", rd.View.Name)
13670
}
137-
return req, reqRds, remainingRds
71+
resource, err := e.makeResource(rd)
72+
if err != nil {
73+
return nil, fmt.Errorf("failed to construct resource of view %s: %v", rd.View.Name, err)
74+
}
75+
ts := &mpb.TimeSeries{
76+
Metric: &metricpb.Metric{
77+
Type: rd.View.Name,
78+
Labels: e.makeLabels(rd.Row.Tags),
79+
},
80+
Resource: resource,
81+
Points: []*mpb.Point{pt},
82+
}
83+
return ts, nil
13884
}
13985

14086
// makeLables constructs label that's ready for being uploaded to stackdriver.
@@ -154,3 +100,19 @@ func (e *Exporter) makeLabels(tags []tag.Tag) map[string]string {
154100
}
155101
return labels
156102
}
103+
104+
// uploadTimeSeries uploads timeSeries. ts and rds must contain matching data, and ts must not be
105+
// empty. When uploading fails, this function calls exporter's onError() directly, not propagating
106+
// errors to the caller.
107+
func (pd *projectData) uploadTimeSeries(ts []*mpb.TimeSeries, rds []*RowData) {
108+
exp := pd.parent
109+
req := &mpb.CreateTimeSeriesRequest{
110+
Name: fmt.Sprintf("projects/%s", pd.projectID),
111+
TimeSeries: ts,
112+
}
113+
if err := createTimeSeries(exp.client, exp.ctx, req); err != nil {
114+
newErr := fmt.Errorf("RPC call to create time series failed for project %s: %v", pd.projectID, err)
115+
// We pass all row data not successfully uploaded.
116+
exp.onError(newErr, rds...)
117+
}
118+
}

0 commit comments

Comments
 (0)