diff --git a/monitoring/exporter/stackdriver/data_test.go b/monitoring/exporter/stackdriver/data_test.go new file mode 100644 index 000000000..4612329ec --- /dev/null +++ b/monitoring/exporter/stackdriver/data_test.go @@ -0,0 +1,172 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver + +import ( + "context" + "errors" + "fmt" + "time" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + mrpb "google.golang.org/genproto/googleapis/api/monitoredres" +) + +// This file defines various data needed for testing. + +const ( + label1name = "key_1" + label2name = "key_2" + label3name = "key_3" + label4name = "key_4" + label5name = "key_5" + + value1 = "value_1" + value2 = "value_2" + value3 = "value_3" + value4 = "value_4" + value5 = "value_5" + value6 = "value_6" + value7 = "value_7" + value8 = "value_8" + + metric1name = "metric_1" + metric1desc = "this is metric 1" + metric2name = "metric_2" + metric2desc = "this is metric 2" + metric3name = "metric_3" + metric3desc = "this is metric 3" + + project1 = "project-1" + project2 = "project-2" +) + +var ( + ctx = context.Background() + + invalidDataStr = "invalid data" + // This error is used for test to catch some error happpened. + errInvalidData = errors.New(invalidDataStr) + // This error is used for unexpected error. + errUnrecognizedData = errors.New("unrecognized data") + + key1 = getKey(label1name) + key2 = getKey(label2name) + key3 = getKey(label3name) + projectKey = getKey(ProjectKeyName) + + view1 = &view.View{ + Name: metric1name, + Description: metric1desc, + TagKeys: nil, + Measure: stats.Int64(metric1name, metric1desc, stats.UnitDimensionless), + Aggregation: view.Sum(), + } + view2 = &view.View{ + Name: metric2name, + Description: metric2desc, + TagKeys: []tag.Key{key1, key2, key3}, + Measure: stats.Int64(metric2name, metric2desc, stats.UnitDimensionless), + Aggregation: view.Sum(), + } + view3 = &view.View{ + Name: metric3name, + Description: metric3desc, + TagKeys: []tag.Key{projectKey}, + Measure: stats.Int64(metric3name, metric3desc, stats.UnitDimensionless), + Aggregation: view.Sum(), + } + + // To make verification easy, we require all valid rows should have int64 values and all of + // them must be distinct. + view1row1 = &view.Row{ + Tags: nil, + Data: &view.SumData{Value: 1}, + } + view1row2 = &view.Row{ + Tags: nil, + Data: &view.SumData{Value: 2}, + } + view1row3 = &view.Row{ + Tags: nil, + Data: &view.SumData{Value: 3}, + } + view2row1 = &view.Row{ + Tags: []tag.Tag{{key1, value1}, {key2, value2}, {key3, value3}}, + Data: &view.SumData{Value: 4}, + } + view2row2 = &view.Row{ + Tags: []tag.Tag{{key1, value4}, {key2, value5}, {key3, value6}}, + Data: &view.SumData{Value: 5}, + } + view3row1 = &view.Row{ + Tags: []tag.Tag{{projectKey, project1}}, + Data: &view.SumData{Value: 6}, + } + view3row2 = &view.Row{ + Tags: []tag.Tag{{projectKey, project2}}, + Data: &view.SumData{Value: 7}, + } + view3row3 = &view.Row{ + Tags: []tag.Tag{{projectKey, project1}}, + Data: &view.SumData{Value: 8}, + } + // This Row does not have valid Data field, so is invalid. + invalidRow = &view.Row{Data: nil} + + resource1 = &mrpb.MonitoredResource{ + Type: "cloudsql_database", + Labels: map[string]string{ + "project_id": project1, + "region": "us-central1", + "database_id": "cloud-SQL-instance-1", + }, + } + resource2 = &mrpb.MonitoredResource{ + Type: "gce_instance", + Labels: map[string]string{ + "project_id": project2, + "zone": "us-east1", + "database_id": "GCE-instance-1", + }, + } +) + +// Timestamps. We make sure that all time stamps are strictly increasing. +var ( + startTime1, endTime1, startTime2, endTime2 time.Time + startTime3, endTime3, startTime4, endTime4 time.Time +) + +func init() { + ts := time.Now() + for _, t := range []*time.Time{ + &startTime1, &endTime1, &startTime2, &endTime2, + &startTime3, &endTime3, &startTime4, &endTime4, + } { + *t = ts + ts = ts.Add(time.Second) + } +} + +func getKey(name string) tag.Key { + key, err := tag.NewKey(name) + if err != nil { + panic(fmt.Errorf("key creation failed for key name: %s", name)) + } + return key +} diff --git a/monitoring/exporter/stackdriver/mock_check_test.go b/monitoring/exporter/stackdriver/mock_check_test.go new file mode 100644 index 000000000..3aa60e5d8 --- /dev/null +++ b/monitoring/exporter/stackdriver/mock_check_test.go @@ -0,0 +1,314 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver + +import ( + "context" + "fmt" + "strings" + + monitoring "cloud.google.com/go/monitoring/apiv3" + "google.golang.org/api/option" + "google.golang.org/api/support/bundler" + mpb "google.golang.org/genproto/googleapis/monitoring/v3" +) + +// This file defines various mocks for testing, and checking functions for mocked data. We mock +// metric client and bundler because their actions involves RPC calls or non-deterministic behavior. + +// Following data are used to store various data generated by exporters' activity. They are used by +// each test to verify intended behavior. Each test should call testDataInit() to clear these data. +var ( + // errStorage records all errors and associated RowData objects reported by exporter. + errStorage []errRowData + // projDataMap is a copy of projDataMap used by each tests. + projDataMap map[string]*projectData + // projRds saves all RowData objects passed to addToBundler call by project ID. Since a + // value of a map is not addressable, we save the pointer to the slice. + projRds map[string]*[]*RowData + // timeSeriesReqs saves all incoming requests for creating time series. + timeSeriesReqs []*mpb.CreateTimeSeriesRequest + // timeSeriesResults holds predefined error values to be returned by mockCreateTimeSerie() + // calls. Each errors in timeSeriesResults are returned per each mockCreateTimeSeries() + // call. If all errors in timeSeriesResults are used, all other mockCreateTimeSeries calls + // will return nil. + timeSeriesResults []error +) + +func init() { + // Mock functions. + newMetricClient = mockNewMetricClient + createTimeSeries = mockCreateTimeSeries + newBundler = mockNewBundler + addToBundler = mockAddToBundler +} + +// testDataInit() initializes all data needed for each test. This function must be called at the +// beginning of each test. +func testDataInit() { + projDataMap = nil + projRds = map[string]*[]*RowData{} + timeSeriesReqs = nil + timeSeriesResults = nil + errStorage = nil +} + +// Mocked functions. + +func mockNewMetricClient(_ context.Context, _ ...option.ClientOption) (*monitoring.MetricClient, error) { + return nil, nil +} + +func mockCreateTimeSeries(_ context.Context, _ *monitoring.MetricClient, req *mpb.CreateTimeSeriesRequest) error { + timeSeriesReqs = append(timeSeriesReqs, req) + // Check timeSeriesResults and if not empty, return the first error from it. + if len(timeSeriesResults) == 0 { + return nil + } + err := timeSeriesResults[0] + // Delete the returning error. + timeSeriesResults = timeSeriesResults[1:] + return err +} + +func mockNewBundler(_ interface{}, _ func(interface{})) *bundler.Bundler { + // We do not return nil but create an empty Bundler object because + // 1. Exporter.newProjectData() is setting fields of Bundler. + // 2. mockAddToBundler needs to get the project ID of the bundler. To do that we need + // different address for each bundler. + return &bundler.Bundler{} +} + +func mockAddToBundler(bndler *bundler.Bundler, item interface{}, _ int) error { + // Get the project ID of the bndler by inspecting projDataMap. + var projID string + projIDfound := false + for tempProjID, pd := range projDataMap { + if pd.bndler == bndler { + projID = tempProjID + projIDfound = true + break + } + } + if !projIDfound { + return errUnrecognizedData + } + + rds, ok := projRds[projID] + if !ok { + // For new project ID, create the actual slice and save its pointer. + var rdsSlice []*RowData + rds = &rdsSlice + projRds[projID] = rds + } + *rds = append(*rds, item.(*RowData)) + return nil +} + +// newTest*() functions create exporters and project data used for testing. Each test should call +// One of these functions once and only once, and never call NewExporter() directly. + +// newTestExp creates an exporter which saves error to errStorage. Caller should not set +// opts.OnError and opts.BundleCountThreshold. +func newTestExp(opts Options) (*Exporter, error) { + opts.OnError = testOnError + // For testing convenience, we reduce the number of timeseris in one upload monitoring API + // call. + opts.BundleCountThreshold = 3 + exp, err := NewExporter(ctx, opts) + if err != nil { + return nil, fmt.Errorf("creating exporter failed: %v", err) + } + // Expose projDataMap so that mockAddToBundler() can use it. + projDataMap = exp.projDataMap + return exp, nil +} + +// newTestProjData creates a projectData object to test behavior of projectData.uploadRowData. Other +// uses are not recommended. As newTestExp, all errors are saved to errStorage. +func newTestProjData(opts Options) (*projectData, error) { + exp, err := newTestExp(opts) + if err != nil { + return nil, err + } + return exp.newProjectData(project1), nil +} + +// We define a storage for all errors happened in export operation. + +type errRowData struct { + err error + rds []*RowData +} + +// testOnError records any incoming error and accompanying RowData array. This function is passed to +// the exporter to record errors. +func testOnError(err error, rds ...*RowData) { + errStorage = append(errStorage, errRowData{err, rds}) +} + +// multiError stores a sequence of errors. To convert it to an actual error, call toError(). +type multiError struct { + errs []error +} + +func (me *multiError) addf(format string, args ...interface{}) { + me.errs = append(me.errs, fmt.Errorf(format, args...)) +} + +func (me *multiError) toError() error { + switch len(me.errs) { + case 0: + return nil + case 1: + return me.errs[0] + default: + return fmt.Errorf("multiple errors: %q", me.errs) + } +} + +// checkMetricClient checks all recorded requests to the metric client. We only compare int64 +// values of the time series. To make this work, we assigned different int64 values for all valid +// rows in the test. +func checkMetricClient(wantReqsValues [][]int64) error { + reqsLen, wantReqsLen := len(timeSeriesReqs), len(wantReqsValues) + if reqsLen != wantReqsLen { + return fmt.Errorf("number of requests got: %d, want %d", reqsLen, wantReqsLen) + } + var errs multiError + for i := 0; i < reqsLen; i++ { + prefix := fmt.Sprintf("%d-th request mismatch", i+1) + tsArr := timeSeriesReqs[i].TimeSeries + wantTsValues := wantReqsValues[i] + tsArrLen, wantTsArrLen := len(tsArr), len(wantTsValues) + if tsArrLen != wantTsArrLen { + errs.addf("%s: number of time series got: %d, want: %d", prefix, tsArrLen, wantTsArrLen) + continue + } + for j := 0; j < tsArrLen; j++ { + // This is how monitoring API stores the int64 value. + tsVal := tsArr[j].Points[0].Value.Value.(*mpb.TypedValue_Int64Value).Int64Value + wantTsVal := wantTsValues[j] + if tsVal != wantTsVal { + errs.addf("%s: Value got: %d, want: %d", prefix, tsVal, wantTsVal) + } + } + } + return errs.toError() +} + +// errRowDataCheck contains data for checking content of error storage. +type errRowDataCheck struct { + errPrefix, errSuffix string + rds []*RowData +} + +// checkErrStorage checks content of error storage. For returned errors, we check prefix and suffix. +func checkErrStorage(wantErrRdCheck []errRowDataCheck) error { + gotLen, wantLen := len(errStorage), len(wantErrRdCheck) + if gotLen != wantLen { + return fmt.Errorf("number of reported errors: %d, want: %d", gotLen, wantLen) + } + var errs multiError + for i := 0; i < gotLen; i++ { + prefix := fmt.Sprintf("%d-th reported error mismatch", i+1) + errRd, wantErrRd := errStorage[i], wantErrRdCheck[i] + errStr := errRd.err.Error() + if errPrefix := wantErrRd.errPrefix; !strings.HasPrefix(errStr, errPrefix) { + errs.addf("%s: error got: %q, want: prefixed by %q", prefix, errStr, errPrefix) + } + if errSuffix := wantErrRd.errSuffix; !strings.HasSuffix(errStr, errSuffix) { + errs.addf("%s: error got: %q, want: suffiexd by %q", prefix, errStr, errSuffix) + } + if err := checkRowDataArr(errRd.rds, wantErrRd.rds); err != nil { + errs.addf("%s: RowData array mismatch: %v", prefix, err) + } + } + return errs.toError() +} + +func checkRowDataArr(rds, wantRds []*RowData) error { + rdLen, wantRdLen := len(rds), len(wantRds) + if rdLen != wantRdLen { + return fmt.Errorf("number row data got: %d, want: %d", rdLen, wantRdLen) + } + var errs multiError + for i := 0; i < rdLen; i++ { + if err := checkRowData(rds[i], wantRds[i]); err != nil { + errs.addf("%d-th row data mismatch: %v", i+1, err) + } + } + return errs.toError() +} + +func checkRowData(rd, wantRd *RowData) error { + var errs multiError + if rd.View != wantRd.View { + errs.addf("View got: %s, want: %s", rd.View.Name, wantRd.View.Name) + } + if rd.Start != wantRd.Start { + errs.addf("Start got: %v, want: %v", rd.Start, wantRd.Start) + } + if rd.End != wantRd.End { + errs.addf("End got: %v, want: %v", rd.End, wantRd.End) + } + if rd.Row != wantRd.Row { + errs.addf("Row got: %v, want: %v", rd.Row, wantRd.Row) + } + return errs.toError() +} + +// checkProjData checks all data passed to the bundler by bundler.Add(). +func checkProjData(wantProjData map[string][]*RowData) error { + var errs multiError + for proj := range projRds { + if _, ok := wantProjData[proj]; !ok { + errs.addf("project in exporter's project data not wanted: %s", proj) + } + } + + for proj, wantRds := range wantProjData { + rds, ok := projRds[proj] + if !ok { + errs.addf("wanted project not found in exporter's project data: %v", proj) + continue + } + if err := checkRowDataArr(*rds, wantRds); err != nil { + errs.addf("RowData array mismatch for project %s: %v", proj, err) + } + } + return errs.toError() +} + +// checkLabels checks data in labels. +func checkLabels(prefix string, labels, wantLabels map[string]string) error { + var errs multiError + for labelName, value := range labels { + wantValue, ok := wantLabels[labelName] + if !ok { + errs.addf("%s: label name in time series not wanted: %s", prefix, labelName) + continue + } + if value != wantValue { + errs.addf("%s: value for label name %s got: %s, want: %s", prefix, labelName, value, wantValue) + } + } + for wantLabelName := range wantLabels { + if _, ok := labels[wantLabelName]; !ok { + errs.addf("%s: wanted label name not found in time series: %s", prefix, wantLabelName) + } + } + return errs.toError() +} diff --git a/monitoring/exporter/stackdriver/project_data.go b/monitoring/exporter/stackdriver/project_data.go new file mode 100644 index 000000000..9e4a0cc71 --- /dev/null +++ b/monitoring/exporter/stackdriver/project_data.go @@ -0,0 +1,79 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver + +import ( + "fmt" + + "google.golang.org/api/support/bundler" + mpb "google.golang.org/genproto/googleapis/monitoring/v3" +) + +// projectData contain per-project data in exporter. It should be created by newProjectData() +type projectData struct { + parent *Exporter + projectID string + // We make bundler for each project because call to monitoring API can be grouped only in + // project level + bndler *bundler.Bundler +} + +// uploadRowData is called by bundler to upload row data, and report any error happened meanwhile. +func (pd *projectData) uploadRowData(bundle interface{}) { + exp := pd.parent + rds := bundle.([]*RowData) + + // uploadTs contains TimeSeries objects that needs to be uploaded. + var uploadTs []*mpb.TimeSeries + // uploadRds contains RowData objects corresponds to uploadTs. It's used for error reporting + // when upload operation fails. + var uploadRds []*RowData + + for _, rd := range rds { + ts, err := exp.makeTS(rd) + if err != nil { + exp.opts.OnError(err, rd) + continue + } + // Time series created. We update both uploadTs and uploadRds. + uploadTs = append(uploadTs, ts) + uploadRds = append(uploadRds, rd) + if len(uploadTs) == exp.opts.BundleCountThreshold { + pd.uploadTimeSeries(uploadTs, uploadRds) + uploadTs = nil + uploadRds = nil + } + } + // Upload any remaining time series. + if len(uploadTs) != 0 { + pd.uploadTimeSeries(uploadTs, uploadRds) + } +} + +// uploadTimeSeries uploads timeSeries. ts and rds must contain matching data, and ts must not be +// empty. When uploading fails, this function calls exporter's OnError() directly, not propagating +// errors to the caller. +func (pd *projectData) uploadTimeSeries(ts []*mpb.TimeSeries, rds []*RowData) { + exp := pd.parent + req := &mpb.CreateTimeSeriesRequest{ + Name: fmt.Sprintf("projects/%s", pd.projectID), + TimeSeries: ts, + } + if err := createTimeSeries(exp.ctx, exp.client, req); err != nil { + newErr := fmt.Errorf("monitoring API call to create time series failed for project %s: %v", pd.projectID, err) + // We pass all row data not successfully uploaded. + exp.opts.OnError(newErr, rds...) + } +} diff --git a/monitoring/exporter/stackdriver/row_data_to_point.go b/monitoring/exporter/stackdriver/row_data_to_point.go new file mode 100644 index 000000000..bdbbbca0d --- /dev/null +++ b/monitoring/exporter/stackdriver/row_data_to_point.go @@ -0,0 +1,138 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver + +import ( + "time" + + tspb "github.com/golang/protobuf/ptypes/timestamp" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + dspb "google.golang.org/genproto/googleapis/api/distribution" + mpb "google.golang.org/genproto/googleapis/monitoring/v3" +) + +// Functions in this file is used to convert RowData to monitoring point that are used by uploading +// monitoring API calls. All functions except newStringPoint in this file are copied from +// contrib.go.opencensus.io/exporter/stackdriver. + +func newPoint(v *view.View, row *view.Row, start, end time.Time) *mpb.Point { + switch v.Aggregation.Type { + case view.AggTypeLastValue: + return newGaugePoint(v, row, end) + default: + return newCumulativePoint(v, row, start, end) + } +} + +// newStringPoint returns a metric point with string value. +func newStringPoint(val string, end time.Time) *mpb.Point { + gaugeTime := &tspb.Timestamp{ + Seconds: end.Unix(), + Nanos: int32(end.Nanosecond()), + } + return &mpb.Point{ + Interval: &mpb.TimeInterval{ + EndTime: gaugeTime, + }, + Value: &mpb.TypedValue{ + Value: &mpb.TypedValue_StringValue{ + StringValue: val, + }, + }, + } +} + +func newCumulativePoint(v *view.View, row *view.Row, start, end time.Time) *mpb.Point { + return &mpb.Point{ + Interval: &mpb.TimeInterval{ + StartTime: &tspb.Timestamp{ + Seconds: start.Unix(), + Nanos: int32(start.Nanosecond()), + }, + EndTime: &tspb.Timestamp{ + Seconds: end.Unix(), + Nanos: int32(end.Nanosecond()), + }, + }, + Value: newTypedValue(v, row), + } +} + +func newGaugePoint(v *view.View, row *view.Row, end time.Time) *mpb.Point { + gaugeTime := &tspb.Timestamp{ + Seconds: end.Unix(), + Nanos: int32(end.Nanosecond()), + } + return &mpb.Point{ + Interval: &mpb.TimeInterval{ + EndTime: gaugeTime, + }, + Value: newTypedValue(v, row), + } +} + +func newTypedValue(vd *view.View, r *view.Row) *mpb.TypedValue { + switch v := r.Data.(type) { + case *view.CountData: + return &mpb.TypedValue{Value: &mpb.TypedValue_Int64Value{ + Int64Value: v.Value, + }} + case *view.SumData: + switch vd.Measure.(type) { + case *stats.Int64Measure: + return &mpb.TypedValue{Value: &mpb.TypedValue_Int64Value{ + Int64Value: int64(v.Value), + }} + case *stats.Float64Measure: + return &mpb.TypedValue{Value: &mpb.TypedValue_DoubleValue{ + DoubleValue: v.Value, + }} + } + case *view.DistributionData: + return &mpb.TypedValue{Value: &mpb.TypedValue_DistributionValue{ + DistributionValue: &dspb.Distribution{ + Count: v.Count, + Mean: v.Mean, + SumOfSquaredDeviation: v.SumOfSquaredDev, + // TODO(songya): uncomment this once Stackdriver supports min/max. + // Range: &dspb.Distribution_Range{ + // Min: v.Min, + // Max: v.Max, + // }, + BucketOptions: &dspb.Distribution_BucketOptions{ + Options: &dspb.Distribution_BucketOptions_ExplicitBuckets{ + ExplicitBuckets: &dspb.Distribution_BucketOptions_Explicit{ + Bounds: vd.Aggregation.Buckets, + }, + }, + }, + BucketCounts: v.CountPerBucket, + }, + }} + case *view.LastValueData: + switch vd.Measure.(type) { + case *stats.Int64Measure: + return &mpb.TypedValue{Value: &mpb.TypedValue_Int64Value{ + Int64Value: int64(v.Value), + }} + case *stats.Float64Measure: + return &mpb.TypedValue{Value: &mpb.TypedValue_DoubleValue{ + DoubleValue: v.Value, + }} + } + } + return nil +} diff --git a/monitoring/exporter/stackdriver/stackdriver.go b/monitoring/exporter/stackdriver/stackdriver.go new file mode 100644 index 000000000..035574822 --- /dev/null +++ b/monitoring/exporter/stackdriver/stackdriver.go @@ -0,0 +1,332 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package stackdriver provides an exporter that uploads data from opencensus to stackdriver +// metrics of multiple GCP projects. +// +// General assumptions or requirements when using this exporter. +// 1. The basic unit of data is a view.Data with only a single view.Row. We define it as a separate +// type called RowData. +// 2. We can inspect each RowData to tell whether this RowData is applicable for this exporter. +// 3. For RowData that is applicable to this exporter, we require that +// 3.1. Any view associated to RowData corresponds to a stackdriver metric, and it is already +// defined for all GCP projects. +// 3.2. RowData has correcponding GCP projects, and we can determine its project ID. +// 3.3. After trimming labels and tags, configuration of all view data matches that of corresponding +// stackdriver metric +package stackdriver + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + monitoring "cloud.google.com/go/monitoring/apiv3" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "google.golang.org/api/option" + "google.golang.org/api/support/bundler" + metricpb "google.golang.org/genproto/googleapis/api/metric" + mrpb "google.golang.org/genproto/googleapis/api/monitoredres" + mpb "google.golang.org/genproto/googleapis/monitoring/v3" +) + +// MaxTimeSeriesPerUpload is the maximum number of timeseries objects that will be uploaded to +// Stackdriver in one API call. +const MaxTimeSeriesPerUpload = 100 + +// Exporter is the exporter that can be registered to opencensus. An Exporter object must be +// created by NewExporter(). +type Exporter struct { + // TODO(lawrencechung): If possible, find a way to not storing ctx in the struct. + ctx context.Context + client *monitoring.MetricClient + opts Options + + // mu protects access to projDataMap + mu sync.Mutex + // per-project data of exporter + projDataMap map[string]*projectData +} + +// Options designates various parameters used by stats exporter. Default value of fields in Options +// are valid for use. +type Options struct { + // ClientOptions designates options for creating metric client, especially credentials for + // monitoring API calls. + ClientOptions []option.ClientOption + + // Options for bundles amortizing export requests. Note that a bundle is created for each + // project. + + // BundleDelayThreshold determines the max amount of time the exporter can wait before + // uploading data to the stackdriver. If this value is not positive, the default value in + // the bundle package is used. + BundleDelayThreshold time.Duration + // BundleCountThreshold determines how many RowData objects can be buffered before batch + // uploading them to the backend. If this value is not between 1 and MaxTimeSeriesPerUpload, + // MaxTimeSeriesPerUpload is used. + BundleCountThreshold int + + // Callback functions provided by user. + + // GetProjectID is used to filter whether given row data can be applicable to this exporter + // and if so, it also determines the projectID of given row data. If + // RowDataNotApplicableError is returned, then the row data is not applicable to this + // exporter, and it will be silently ignored. Though not recommended, other errors can be + // returned, and in that case the error is reported to callers via OnError and the row data + // will not be uploaded to stackdriver. When GetProjectID is not set, for any row data with + // tag key name "project_id" (it's defined as ProjectKeyName), the value of the tag will be + // it's project ID. All other row data will be silently ignored. + GetProjectID func(*RowData) (projectID string, err error) + // OnError is used to report any error happened while exporting view data fails. Whenever + // this function is called, it's guaranteed that at least one row data is also passed to + // OnError. Row data passed to OnError must not be modified and OnError must be + // non-blocking. When OnError is not set, all errors happened on exporting are ignored. + OnError func(error, ...*RowData) + // MakeResource creates monitored resource from RowData. It is guaranteed that only RowData + // that passes GetProjectID will be given to this function. Though not recommended, error + // can be returned, and in that case the error is reported to callers via OnError and the + // row data will not be uploaded to stackdriver. When MakeResource is not set, global + // resource is used for all RowData objects. + MakeResource func(rd *RowData) (*mrpb.MonitoredResource, error) + // IsValueString tells that whether the value in the row data is a string, and return the + // string value if it is. This function circumvents the restriction that opencensus metrics + // do not support string value. When IsValueString is not set, it always returns false. + IsValueString func(rd *RowData) (string, bool, error) + + // Options concerning labels. + + // DefaultLabels store default value of some labels. Labels in DefaultLabels need not be + // specified in tags of view data. Default labels and tags of view may have overlapping + // label keys. In this case, values in tag are used. Default labels are used for labels + // those are constant throughout export operation, like version number of the calling + // program. + DefaultLabels map[string]string + // UnexportedLabels contains key of labels that will not be exported stackdriver. Typical + // uses of unexported labels will be either that marks project ID, or that's used only for + // constructing resource. + UnexportedLabels []string +} + +// ProjectKeyName is used by defaultGetProjectID to get the project ID of a given row data. +const ProjectKeyName = "project_id" + +// Default values for options. Their semantics are described in Options. + +func defaultGetProjectID(rd *RowData) (string, error) { + for _, tag := range rd.Row.Tags { + if tag.Key.Name() == ProjectKeyName { + return tag.Value, nil + } + } + return "", ErrRowDataNotApplicable +} + +func defaultOnError(err error, rds ...*RowData) {} + +func defaultMakeResource(rd *RowData) (*mrpb.MonitoredResource, error) { + return &mrpb.MonitoredResource{Type: "global"}, nil +} + +func defaultIsValueString(rd *RowData) (string, bool, error) { + return "", false, nil +} + +// Following functions are wrapper of functions those will be mocked by tests. Only tests can modify +// these functions. +var ( + newMetricClient = monitoring.NewMetricClient + createTimeSeries = func(ctx context.Context, c *monitoring.MetricClient, ts *mpb.CreateTimeSeriesRequest) error { + return c.CreateTimeSeries(ctx, ts) + } + newBundler = bundler.NewBundler + addToBundler = (*bundler.Bundler).Add +) + +// NewExporter creates an Exporter object. Once a call to NewExporter is made, any fields in opts +// must not be modified at all. ctx will also be used throughout entire exporter operation when +// making monitoring API call. +func NewExporter(ctx context.Context, opts Options) (*Exporter, error) { + client, err := newMetricClient(ctx, opts.ClientOptions...) + if err != nil { + return nil, fmt.Errorf("failed to create a metric client: %v", err) + } + + e := &Exporter{ + ctx: ctx, + client: client, + opts: opts, + projDataMap: make(map[string]*projectData), + } + + if !(0 < e.opts.BundleDelayThreshold) { + e.opts.BundleDelayThreshold = bundler.DefaultDelayThreshold + } + if !(0 < e.opts.BundleCountThreshold && e.opts.BundleCountThreshold <= MaxTimeSeriesPerUpload) { + e.opts.BundleCountThreshold = MaxTimeSeriesPerUpload + } + if e.opts.GetProjectID == nil { + e.opts.GetProjectID = defaultGetProjectID + } + if e.opts.OnError == nil { + e.opts.OnError = defaultOnError + } + if e.opts.MakeResource == nil { + e.opts.MakeResource = defaultMakeResource + } + if e.opts.IsValueString == nil { + e.opts.IsValueString = defaultIsValueString + } + + return e, nil +} + +// RowData represents a single row in view data. This is our unit of computation. We use a single +// row instead of view data because a view data consists of multiple rows, and each row may belong +// to different projects. +type RowData struct { + View *view.View + Start, End time.Time + Row *view.Row +} + +// ExportView is the method called by opencensus to export view data. It constructs RowData out of +// view.Data objects. +func (e *Exporter) ExportView(vd *view.Data) { + for _, row := range vd.Rows { + rd := &RowData{ + View: vd.View, + Start: vd.Start, + End: vd.End, + Row: row, + } + e.exportRowData(rd) + } +} + +// ErrRowDataNotApplicable is used to tell that given row data is not applicable to the exporter. +// See GetProjectID of Options for more detail. +var ErrRowDataNotApplicable = errors.New("row data is not applicable to the exporter, so it will be ignored") + +// exportRowData exports a single row data. +func (e *Exporter) exportRowData(rd *RowData) { + projID, err := e.opts.GetProjectID(rd) + if err != nil { + // We ignore non-applicable RowData. + if err != ErrRowDataNotApplicable { + newErr := fmt.Errorf("failed to get project ID on row data with view %s: %v", rd.View.Name, err) + e.opts.OnError(newErr, rd) + } + return + } + pd := e.getProjectData(projID) + switch err := addToBundler(pd.bndler, rd, 1); err { + case nil: + case bundler.ErrOversizedItem: + go pd.uploadRowData(rd) + default: + newErr := fmt.Errorf("failed to add row data with view %s to bundle for project %s: %v", rd.View.Name, projID, err) + e.opts.OnError(newErr, rd) + } +} + +func (e *Exporter) getProjectData(projectID string) *projectData { + e.mu.Lock() + defer e.mu.Unlock() + if pd, ok := e.projDataMap[projectID]; ok { + return pd + } + + pd := e.newProjectData(projectID) + e.projDataMap[projectID] = pd + return pd +} + +func (e *Exporter) newProjectData(projectID string) *projectData { + pd := &projectData{ + parent: e, + projectID: projectID, + } + + pd.bndler = newBundler((*RowData)(nil), pd.uploadRowData) + pd.bndler.DelayThreshold = e.opts.BundleDelayThreshold + pd.bndler.BundleCountThreshold = e.opts.BundleCountThreshold + return pd +} + +// Close flushes and closes the exporter. Close must be called after the exporter is unregistered +// and no further calls to ExportView() are made. Once Close() is returned no further access to the +// exporter is allowed in any way. +func (e *Exporter) Close() error { + e.mu.Lock() + for _, pd := range e.projDataMap { + pd.bndler.Flush() + } + e.mu.Unlock() + + if err := e.client.Close(); err != nil { + return fmt.Errorf("failed to close the metric client: %v", err) + } + return nil +} + +// makeTS constructs a time series from a row data. +func (e *Exporter) makeTS(rd *RowData) (*mpb.TimeSeries, error) { + var pt *mpb.Point + if strVal, ok, err := e.opts.IsValueString(rd); err != nil { + return nil, fmt.Errorf("failed to check whether row data is string valued or not in view: %v", rd.View.Name) + } else if ok { + pt = newStringPoint(strVal, rd.End) + } else { + pt = newPoint(rd.View, rd.Row, rd.Start, rd.End) + if pt.Value == nil { + return nil, fmt.Errorf("inconsistent data found in view %s", rd.View.Name) + } + } + + resource, err := e.opts.MakeResource(rd) + if err != nil { + return nil, fmt.Errorf("failed to construct resource of view %s: %v", rd.View.Name, err) + } + ts := &mpb.TimeSeries{ + Metric: &metricpb.Metric{ + Type: rd.View.Name, + Labels: e.makeLabels(rd.Row.Tags), + }, + Resource: resource, + Points: []*mpb.Point{pt}, + } + return ts, nil +} + +// makeLabels constructs label that's ready for being uploaded to stackdriver. +func (e *Exporter) makeLabels(tags []tag.Tag) map[string]string { + opts := e.opts + labels := make(map[string]string, len(opts.DefaultLabels)+len(tags)) + for key, val := range opts.DefaultLabels { + labels[key] = val + } + // If there's overlap When combining exporter's default label and tags, values in tags win. + for _, tag := range tags { + labels[tag.Key.Name()] = tag.Value + } + // Some labels are not for exporting. + for _, key := range opts.UnexportedLabels { + delete(labels, key) + } + return labels +} diff --git a/monitoring/exporter/stackdriver/stackdriver_test.go b/monitoring/exporter/stackdriver/stackdriver_test.go new file mode 100644 index 000000000..c4dcde6af --- /dev/null +++ b/monitoring/exporter/stackdriver/stackdriver_test.go @@ -0,0 +1,458 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver + +import ( + "fmt" + "testing" + + "go.opencensus.io/stats/view" + mrpb "google.golang.org/genproto/googleapis/api/monitoredres" + mpb "google.golang.org/genproto/googleapis/monitoring/v3" +) + +// This file contains actual tests. + +// TestAll runs all tests defined in this file. +func TestAll(t *testing.T) { + testData := []struct { + name string + test func(t *testing.T) + }{ + {"ProjectClassifyNoError", testProjectClassifyNoError}, + {"ProjectClassifyError", testProjectClassifyError}, + {"DefaultProjectClassify", testDefaultProjectClassify}, + {"UploadNoError", testUploadNoError}, + {"UploadTimeSeriesMakeError", testUploadTimeSeriesMakeError}, + {"UploadWithMetricClientError", testUploadWithMetricClientError}, + {"StringValueMetric", testStringValueMetric}, + {"MakeResource", testMakeResource}, + {"MakeLabel", testMakeLabel}, + } + + for _, data := range testData { + run := func(t *testing.T) { + testDataInit() + data.test(t) + } + t.Run(data.name, run) + } +} + +// testProjectClassifyNoError tests that exporter can recognize and distribute incoming data by +// its project. +func testProjectClassifyNoError(t *testing.T) { + viewData1 := &view.Data{ + View: view1, + Start: startTime1, + End: endTime1, + Rows: []*view.Row{view1row1, view1row2}, + } + viewData2 := &view.Data{ + View: view2, + Start: startTime2, + End: endTime2, + Rows: []*view.Row{view2row1}, + } + + getProjectID := func(rd *RowData) (string, error) { + switch rd.Row { + case view1row1, view2row1: + return project1, nil + case view1row2: + return project2, nil + default: + return "", errUnrecognizedData + } + } + + exp, err := newTestExp(Options{GetProjectID: getProjectID}) + if err != nil { + t.Fatal(err) + } + exp.ExportView(viewData1) + exp.ExportView(viewData2) + + wantRowData := map[string][]*RowData{ + project1: []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view2, startTime2, endTime2, view2row1}, + }, + project2: []*RowData{ + {view1, startTime1, endTime1, view1row2}, + }, + } + if err := checkErrStorage(nil); err != nil { + t.Error(err) + } + if err := checkProjData(wantRowData); err != nil { + t.Error(err) + } +} + +// testProjectClassifyError tests that exporter can properly handle errors while classifying +// incoming data by its project. +func testProjectClassifyError(t *testing.T) { + viewData1 := &view.Data{ + View: view1, + Start: startTime1, + End: endTime1, + Rows: []*view.Row{view1row1, view1row2}, + } + viewData2 := &view.Data{ + View: view2, + Start: startTime2, + End: endTime2, + Rows: []*view.Row{view2row1, view2row2}, + } + + getProjectID := func(rd *RowData) (string, error) { + switch rd.Row { + case view1row1, view2row2: + return project1, nil + case view1row2: + return "", ErrRowDataNotApplicable + case view2row1: + return "", errInvalidData + default: + return "", errUnrecognizedData + } + } + + exp, err := newTestExp(Options{GetProjectID: getProjectID}) + if err != nil { + t.Fatal(err) + } + exp.ExportView(viewData1) + exp.ExportView(viewData2) + + wantErrRdCheck := []errRowDataCheck{ + { + errPrefix: "failed to get project ID", + errSuffix: invalidDataStr, + rds: []*RowData{{view2, startTime2, endTime2, view2row1}}, + }, + } + wantRowData := map[string][]*RowData{ + project1: []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view2, startTime2, endTime2, view2row2}, + }, + } + if err := checkErrStorage(wantErrRdCheck); err != nil { + t.Error(err) + } + if err := checkProjData(wantRowData); err != nil { + t.Error(err) + } +} + +// testDefaultProjectClassify tests that defaultGetProjectID classifies RowData by tag with key name +// "project_id". +func testDefaultProjectClassify(t *testing.T) { + viewData1 := &view.Data{ + View: view1, + Start: startTime1, + End: endTime1, + Rows: []*view.Row{view1row1}, + } + viewData2 := &view.Data{ + View: view3, + Start: startTime2, + End: endTime2, + Rows: []*view.Row{view3row1, view3row2}, + } + viewData3 := &view.Data{ + View: view2, + Start: startTime3, + End: endTime3, + Rows: []*view.Row{view2row1, view2row2}, + } + viewData4 := &view.Data{ + View: view3, + Start: startTime4, + End: endTime4, + Rows: []*view.Row{view3row3}, + } + + exp, err := newTestExp(Options{}) + if err != nil { + t.Fatal(err) + } + exp.ExportView(viewData1) + exp.ExportView(viewData2) + exp.ExportView(viewData3) + exp.ExportView(viewData4) + + if err := checkErrStorage(nil); err != nil { + t.Error(err) + } + // RowData in viewData1 and viewData3 has no project ID tag, so ignored. + wantRowData := map[string][]*RowData{ + project1: []*RowData{ + {view3, startTime2, endTime2, view3row1}, + {view3, startTime4, endTime4, view3row3}, + }, + project2: []*RowData{ + {view3, startTime2, endTime2, view3row2}, + }, + } + if err := checkProjData(wantRowData); err != nil { + t.Error(err) + } +} + +// testUploadNoError tests that all RowData objects passed to uploadRowData() are grouped by +// slice of length MaxTimeSeriesPerUpload, and passed to createTimeSeries(). +func testUploadNoError(t *testing.T) { + pd, err := newTestProjData(Options{}) + if err != nil { + t.Fatal(err) + } + rd := []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view1, startTime1, endTime1, view1row2}, + {view1, startTime1, endTime1, view1row3}, + {view2, startTime2, endTime2, view2row1}, + {view2, startTime2, endTime2, view2row2}, + } + pd.uploadRowData(rd) + + if err := checkErrStorage(nil); err != nil { + t.Error(err) + } + wantClData := [][]int64{ + {1, 2, 3}, + {4, 5}, + } + if err := checkMetricClient(wantClData); err != nil { + t.Error(err) + } +} + +// testUploadTimeSeriesMakeError tests that errors while creating time series are properly handled. +func testUploadTimeSeriesMakeError(t *testing.T) { + makeResource := func(rd *RowData) (*mrpb.MonitoredResource, error) { + if rd.Row == view1row2 { + return nil, errInvalidData + } + return defaultMakeResource(rd) + } + pd, err := newTestProjData(Options{MakeResource: makeResource}) + if err != nil { + t.Fatal(err) + } + rd := []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view1, startTime1, endTime1, view1row2}, + {view1, startTime1, endTime1, view1row3}, + // This row data is invalid, so it will trigger inconsistent data error. + {view2, startTime2, endTime2, invalidRow}, + {view2, startTime2, endTime2, view2row1}, + {view2, startTime2, endTime2, view2row2}, + } + pd.uploadRowData(rd) + + wantErrRdCheck := []errRowDataCheck{ + { + errPrefix: "failed to construct resource", + errSuffix: invalidDataStr, + rds: []*RowData{{view1, startTime1, endTime1, view1row2}}, + }, { + errPrefix: "inconsistent data found in view", + errSuffix: metric2name, + rds: []*RowData{{view2, startTime2, endTime2, invalidRow}}, + }, + } + if err := checkErrStorage(wantErrRdCheck); err != nil { + t.Error(err) + } + + wantClData := [][]int64{ + {1, 3, 4}, + {5}, + } + if err := checkMetricClient(wantClData); err != nil { + t.Error(err) + } +} + +// testUploadTimeSeriesMakeError tests that exporter can handle error on metric client's time +// series create monitoing API call. +func testUploadWithMetricClientError(t *testing.T) { + pd, err := newTestProjData(Options{}) + if err != nil { + t.Fatal(err) + } + timeSeriesResults = append(timeSeriesResults, nil, errInvalidData) + rd := []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view1, startTime1, endTime1, view1row2}, + {view1, startTime1, endTime1, view1row3}, + {view2, startTime2, endTime2, view2row1}, + {view2, startTime2, endTime2, view2row2}, + } + pd.uploadRowData(rd) + + wantErrRdCheck := []errRowDataCheck{ + { + errPrefix: "monitoring API call to create time series failed", + errSuffix: invalidDataStr, + rds: []*RowData{ + {view2, startTime2, endTime2, view2row1}, + {view2, startTime2, endTime2, view2row2}, + }, + }, + } + if err := checkErrStorage(wantErrRdCheck); err != nil { + t.Error(err) + } + + wantClData := [][]int64{ + {1, 2, 3}, + {4, 5}, + } + if err := checkMetricClient(wantClData); err != nil { + t.Error(err) + } +} + +// testStringValueMetric tests that exporter can detect string valued metric by IsValueString given +// in option. +func testStringValueMetric(t *testing.T) { + isValueString := func(rd *RowData) (string, bool, error) { + switch rd.Row { + case view1row1: + return "string_value", true, nil + case view1row2: + return "", false, nil + default: + return "", false, errUnrecognizedData + } + } + pd, err := newTestProjData(Options{IsValueString: isValueString}) + if err != nil { + t.Fatal(err) + } + rd := []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view1, startTime1, endTime1, view1row2}, + {view1, startTime1, endTime1, view1row3}, + } + pd.uploadRowData(rd) + wantErrRdCheck := []errRowDataCheck{ + { + errPrefix: "failed to check whether row data is string valued or not", + errSuffix: metric1name, + rds: []*RowData{{view1, startTime1, endTime1, view1row3}}, + }, + } + if err := checkErrStorage(wantErrRdCheck); err != nil { + t.Error(err) + } + wantVals := []interface{}{"string_value", int64(2)} + tsArr := timeSeriesReqs[0].TimeSeries + tsVals := []interface{}{ + tsArr[0].Points[0].Value.Value.(*mpb.TypedValue_StringValue).StringValue, + tsArr[1].Points[0].Value.Value.(*mpb.TypedValue_Int64Value).Int64Value, + } + for i := 0; i < len(tsVals); i++ { + if tsVal, wantVal := tsVals[i], wantVals[i]; tsVal != wantVal { + t.Errorf("%d-th time series value mismatch: got %v, want %v", i+1, tsVal, wantVal) + } + } +} + +// testMakeResource tests that exporter can create monitored resource dynamically. +func testMakeResource(t *testing.T) { + makeResource := func(rd *RowData) (*mrpb.MonitoredResource, error) { + switch rd.Row { + case view1row1: + return resource1, nil + case view1row2: + return resource2, nil + default: + return nil, errUnrecognizedData + } + } + pd, err := newTestProjData(Options{MakeResource: makeResource}) + if err != nil { + t.Fatal(err) + } + rd := []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view1, startTime1, endTime1, view1row2}, + } + pd.uploadRowData(rd) + if err := checkErrStorage(nil); err != nil { + t.Error(err) + } + if err := checkMetricClient([][]int64{{1, 2}}); err != nil { + t.Error(err) + } + + tsArr := timeSeriesReqs[0].TimeSeries + for i, wantResource := range []*mrpb.MonitoredResource{resource1, resource2} { + if resource := tsArr[i].Resource; resource != wantResource { + t.Errorf("%d-th time series resource got: %#v, want: %#v", i+1, resource, wantResource) + } + } +} + +// testMakeLabel tests that exporter can correctly handle label manipulation process, including +// merging default label with tags, and removing unexported labels. +func testMakeLabel(t *testing.T) { + opts := Options{ + DefaultLabels: map[string]string{ + label1name: value7, + label4name: value8, + }, + UnexportedLabels: []string{label3name, label5name}, + } + pd, err := newTestProjData(opts) + if err != nil { + t.Fatal(err) + } + rd := []*RowData{ + {view1, startTime1, endTime1, view1row1}, + {view2, startTime2, endTime2, view2row1}, + } + pd.uploadRowData(rd) + if err := checkErrStorage(nil); err != nil { + t.Error(err) + } + if err := checkMetricClient([][]int64{{1, 4}}); err != nil { + t.Error(err) + } + + wantLabels1 := map[string]string{ + label1name: value7, + label4name: value8, + } + wantLabels2 := map[string]string{ + // Default value for key1 is suppressed, and value defined in tag of view2row1 is + // used. + label1name: value1, + label2name: value2, + label4name: value8, + } + tsArr := timeSeriesReqs[0].TimeSeries + for i, wantLabels := range []map[string]string{wantLabels1, wantLabels2} { + prefix := fmt.Sprintf("%d-th time series labels mismatch", i+1) + if err := checkLabels(prefix, tsArr[i].Metric.Labels, wantLabels); err != nil { + t.Error(err) + } + } +} diff --git a/monitoring/exporter/stackdriver/timeseries_fake.go b/monitoring/exporter/stackdriver/timeseries_fake.go new file mode 100644 index 000000000..4a3ff1d64 --- /dev/null +++ b/monitoring/exporter/stackdriver/timeseries_fake.go @@ -0,0 +1,130 @@ +// Copyright 2018 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stackdriver + +import ( + "context" + "fmt" + "os" + "strings" + + monitoring "cloud.google.com/go/monitoring/apiv3" + metricpb "google.golang.org/genproto/googleapis/api/metric" + mrpb "google.golang.org/genproto/googleapis/api/monitoredres" + mpb "google.golang.org/genproto/googleapis/monitoring/v3" +) + +var warningMsg = ` +********************************************************************************* +* DEBUG LOG: THIS PROGRAM IS USING FAKE MONITORING API FOR DEVELOPMENT PURPOSE. * +********************************************************************************* +`[1:] + +// WARNING: this file contains development purpose fake version of monitoring API. This code should +// not be used in any production environment at all. + +func init() { + if os.Getenv("CLOUD_SQL_PROXY_DEV") != "" { + fmt.Printf(warningMsg) + createTimeSeries = fakeCreateTimeSeries + } +} + +func fakeCreateTimeSeries(ctx context.Context, cl *monitoring.MetricClient, req *mpb.CreateTimeSeriesRequest) error { + if req == nil { + return fmt.Errorf("request is nil") + } + + var fakeTsArr []*mpb.TimeSeries + for i, ts := range req.TimeSeries { + errPrefix := fmt.Sprintf("timeseries[%d]", i) + name := ts.Metric.Type + sepIndex := strings.IndexByte(name, '/') + if sepIndex == -1 { + return fmt.Errorf("%s: metric name does not have '/': %s", errPrefix, name) + } + fakeName := fmt.Sprintf("custom.googleapis.com/cloudsql%s", name[sepIndex:]) + + fakeRscLabels := make(map[string]string) + var uuid string + for key, val := range ts.Resource.Labels { + var fakeKey string + fakeVal := val + switch key { + case "uuid": + uuid = val + continue + case "project_id": + fakeKey = key + case "region": + fakeKey = "zone" + if val == "UNSPECIFIED" { + fakeVal = "global" + } + case "database_id": + fakeKey = "instance_id" + default: + return fmt.Errorf("unknown label key of monitored resource: %s", key) + } + fakeRscLabels[fakeKey] = fakeVal + } + fakeResource := &mrpb.MonitoredResource{ + Type: "gce_instance", + Labels: fakeRscLabels, + } + + isValueString := false + var strVal string + + fakeVal := ts.Points[0].Value.Value + if strTypeVal, ok := fakeVal.(*mpb.TypedValue_StringValue); ok { + isValueString = true + strVal = strTypeVal.StringValue + fakeVal = &mpb.TypedValue_Int64Value{ + Int64Value: 0, + } + } + fakePt := &mpb.Point{ + Interval: ts.Points[0].Interval, + Value: &mpb.TypedValue{ + Value: fakeVal, + }, + } + + fakeLabels := map[string]string{"uuid": uuid} + for key, val := range ts.Metric.Labels { + fakeLabels[key] = val + } + if isValueString { + fakeLabels["string_metric_value"] = strVal + } + + fakeTs := &mpb.TimeSeries{ + Metric: &metricpb.Metric{ + Type: fakeName, + Labels: fakeLabels, + }, + Resource: fakeResource, + Points: []*mpb.Point{fakePt}, + } + fakeTsArr = append(fakeTsArr, fakeTs) + } + + fakeReq := &mpb.CreateTimeSeriesRequest{ + Name: req.Name, + TimeSeries: fakeTsArr, + } + return cl.CreateTimeSeries(ctx, fakeReq) +}