diff --git a/collector/internal/telemetryapi/listener_test.go b/collector/internal/telemetryapi/listener_test.go index 230bd919cc..74d6c7ff31 100644 --- a/collector/internal/telemetryapi/listener_test.go +++ b/collector/internal/telemetryapi/listener_test.go @@ -45,11 +45,6 @@ func setupListener(t *testing.T) (*Listener, string) { address, err := listener.Start() require.NoError(t, err) - - t.Cleanup(func() { - listener.Shutdown() - }) - return listener, address } @@ -193,8 +188,8 @@ func TestListener_StartAndShutdown(t *testing.T) { } else { require.NoError(t, resp.Body.Close()) } - listener.Shutdown() + listener.Shutdown() require.Nil(t, listener.httpServer, "httpServer should be nil after Shutdown()") } @@ -241,6 +236,7 @@ func TestListener_httpHandler(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { listener, address := setupListener(t) + defer listener.Shutdown() submitEvents(t, address, test.events) require.EventuallyWithT(t, func(c *assert.CollectT) { require.Equal(c, test.expectedCount, listener.queue.Len()) @@ -302,6 +298,7 @@ func TestListener_Wait_Success(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { listener, address := setupListener(t) + defer listener.Shutdown() waitDone := make(chan error, 1) go func() { diff --git a/collector/internal/telemetryapi/types.go b/collector/internal/telemetryapi/types.go index 1cd4583c9b..d5bd0fe427 100644 --- a/collector/internal/telemetryapi/types.go +++ b/collector/internal/telemetryapi/types.go @@ -24,15 +24,29 @@ const ( PlatformInitStart EventType = Platform + ".initStart" // PlatformInitRuntimeDone is used when function initialization ended. PlatformInitRuntimeDone EventType = Platform + ".initRuntimeDone" - // PlatformReport is used when a report of function invocation is received. - PlatformReport EventType = Platform + ".report" - // Function invocation started. + // PlatformInitReport is used when a report of function initialization is received. + PlatformInitReport EventType = Platform + ".initReport" + // PlatformStart is used when function invocation started. PlatformStart EventType = Platform + ".start" - // The runtime finished processing an event with either success or failure. + // PlatformRuntimeDone is used when the runtime finished processing an event with either success or failure. PlatformRuntimeDone EventType = Platform + ".runtimeDone" + // PlatformReport is used when a report of function invocation is received. + PlatformReport EventType = Platform + ".report" + // PlatformRestoreStart is used when runtime restore started. + PlatformRestoreStart EventType = Platform + ".restoreStart" + // PlatformRestoreRuntimeDone is used when runtime restore completed. + PlatformRestoreRuntimeDone EventType = Platform + ".restoreRuntimeDone" + // PlatformRestoreReport is used when a report of runtime restore is received. + PlatformRestoreReport EventType = Platform + ".restoreReport" + // PlatformExtension is used for extension state events. + PlatformExtension EventType = Platform + ".extension" + // PlatformTelemetrySubscription is used when the extension subscribed to the Telemetry API. + PlatformTelemetrySubscription EventType = Platform + ".telemetrySubscription" + // PlatformLogsDropped is used when Lambda dropped log entries. + PlatformLogsDropped EventType = Platform + ".logsDropped" // Function is used to receive log events emitted by the function Function EventType = "function" - // Extension is used is to receive log events emitted by the extension + // Extension is used to receive log events emitted by the extension Extension EventType = "extension" ) diff --git a/collector/lambdacomponents/receiver/telemetryapi.go b/collector/lambdacomponents/receiver/telemetryapi.go index d0ffe066fd..ed01a8daf1 100644 --- a/collector/lambdacomponents/receiver/telemetryapi.go +++ b/collector/lambdacomponents/receiver/telemetryapi.go @@ -17,8 +17,9 @@ package receiver import ( - "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" "go.opentelemetry.io/collector/receiver" + + "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapireceiver" ) func init() { diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index df5d08a3a1..be49616bd2 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -42,9 +42,20 @@ import ( ) const ( - initialQueueSize = 5 - scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" - logReportFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB" + initialQueueSize = 5 + scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" + platformReportLogFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB" + platformStartLogFmt = "START RequestId: %s Version: %s" + platformRuntimeDoneLogFmt = "END RequestId: %s Version: %s" + platformInitStartLogFmt = "INIT_START Runtime Version: %s Runtime Version ARN: %s" + platformInitRuntimeDoneLogFmt = "INIT_RUNTIME_DONE Status: %s" + platformInitReportLogFmt = "INIT_REPORT Initialization Type: %s Phase: %s Status: %s Duration: %.2f ms" + platformRestoreStartLogFmt = "RESTORE_START Runtime Version: %s Runtime Version ARN: %s" + platformRestoreRuntimeDoneLogFmt = "RESTORE_RUNTIME_DONE Status: %s" + platformRestoreReportLogFmt = "RESTORE_REPORT Status: %s Duration: %.2f ms" + platformTelemetrySubscriptionLogFmt = "TELEMETRY: %s Subscribed Types: %v" + platformExtensionLogFmt = "EXTENSION Name: %s State: %s Events: %v" + platformLogsDroppedLogFmt = "LOGS_DROPPED DroppedRecords: %d DroppedBytes: %d Reason: %s" ) type telemetryAPIReceiver struct { @@ -59,6 +70,7 @@ type telemetryAPIReceiver struct { port int types []telemetryapi.EventType resource pcommon.Resource + faasFunctionVersion string currentFaasInvocationID string logReport bool } @@ -188,6 +200,15 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ slice = nil } +func (r *telemetryAPIReceiver) getRecordRequestId(record map[string]interface{}) string { + if requestId, ok := record["requestId"].(string); ok { + return requestId + } else if r.currentFaasInvocationID != "" { + return r.currentFaasInvocationID + } + return "" +} + func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { log := plog.NewLogs() resourceLog := log.ResourceLogs().AppendEmpty() @@ -195,93 +216,172 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { scopeLog := resourceLog.ScopeLogs().AppendEmpty() scopeLog.Scope().SetName(scopeName) for _, el := range slice { + if !r.logReport && el.Type == string(telemetryapi.PlatformReport) { + continue + } r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) - if el.Type == string(telemetryapi.Function) || el.Type == string(telemetryapi.Extension) { - logRecord := scopeLog.LogRecords().AppendEmpty() - logRecord.Attributes().PutStr("type", el.Type) - if t, err := time.Parse(time.RFC3339, el.Time); err == nil { - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) - logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) - } else { - r.logger.Error("error parsing time", zap.Error(err)) - return plog.Logs{}, err - } - if record, ok := el.Record.(map[string]interface{}); ok { - // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function - if timestamp, ok := record["timestamp"].(string); ok { - if t, err := time.Parse(time.RFC3339, timestamp); err == nil { - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) - } else { - r.logger.Error("error parsing time", zap.Error(err)) - return plog.Logs{}, err - } - } - if level, ok := record["level"].(string); ok { - logRecord.SetSeverityNumber(severityTextToNumber(strings.ToUpper(level))) - logRecord.SetSeverityText(logRecord.SeverityNumber().String()) - } - if requestId, ok := record["requestId"].(string); ok { - logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) - } else if r.currentFaasInvocationID != "" { - logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID) - } - if line, ok := record["message"].(string); ok { - logRecord.Body().SetStr(line) - } - } else { - if r.currentFaasInvocationID != "" { - logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID) + logRecord := scopeLog.LogRecords().AppendEmpty() + logRecord.Attributes().PutStr("type", el.Type) + if t, err := time.Parse(time.RFC3339, el.Time); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) + } else { + r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err + } + if record, ok := el.Record.(map[string]interface{}); ok { + requestId := r.getRecordRequestId(record) + if requestId != "" { + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) + + // If this is the first event in the invocation with a request id (i.e. the "platform.start" event), + // set the current invocation id to this request id. + if el.Type == string(telemetryapi.PlatformStart) { + r.currentFaasInvocationID = requestId } - // in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function - if line, ok := el.Record.(string); ok { - logRecord.Body().SetStr(line) + } + + // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + if timestamp, ok := record["timestamp"].(string); ok { + if t, err := time.Parse(time.RFC3339, timestamp); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + } else { + r.logger.Error("error parsing time", zap.Error(err)) + return plog.Logs{}, err } } - } else { // platform events, if subscribed to - if el.Type == string(telemetryapi.PlatformStart) { - if record, ok := el.Record.(map[string]interface{}); ok { - if requestId, ok := record["requestId"].(string); ok { - r.currentFaasInvocationID = requestId + if level, ok := record["level"].(string); ok { + logRecord.SetSeverityNumber(severityTextToNumber(strings.ToUpper(level))) + logRecord.SetSeverityText(logRecord.SeverityNumber().String()) + } + + if strings.HasPrefix(el.Type, platform) { + if el.Type == string(telemetryapi.PlatformInitStart) { + functionVersion, _ := record["functionVersion"].(string) + if functionVersion != "" { + r.faasFunctionVersion = functionVersion } } - } else if el.Type == string(telemetryapi.PlatformRuntimeDone) { - r.currentFaasInvocationID = "" - } else if el.Type == string(telemetryapi.PlatformReport) && r.logReport { - if record, ok := el.Record.(map[string]interface{}); ok { - if logRecord := createReportLogRecord(&scopeLog, record); logRecord != nil { - logRecord.Attributes().PutStr("type", el.Type) - if t, err := time.Parse(time.RFC3339, el.Time); err == nil { - logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) - logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) - } - } + + message := createPlatformMessage(requestId, r.faasFunctionVersion, el.Type, record) + if message != "" { + logRecord.Body().SetStr(message) } + } else if line, ok := record["message"].(string); ok { + logRecord.Body().SetStr(line) + } + } else { + if r.currentFaasInvocationID != "" { + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID) + } + // in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function + if line, ok := el.Record.(string); ok { + logRecord.Body().SetStr(line) } } + if el.Type == string(telemetryapi.PlatformRuntimeDone) { + r.currentFaasInvocationID = "" + } } return log, nil } -// createReportLogRecord creates a log record for the platform.report event -// returns the log record if successful, otherwise nil -func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface{}) *plog.LogRecord { +func createPlatformMessage(requestId string, functionVersion string, eventType string, record map[string]interface{}) string { + switch eventType { + case string(telemetryapi.PlatformStart): + if requestId != "" && functionVersion != "" { + return fmt.Sprintf(platformStartLogFmt, requestId, functionVersion) + } + case string(telemetryapi.PlatformRuntimeDone): + if requestId != "" && functionVersion != "" { + return fmt.Sprintf(platformRuntimeDoneLogFmt, requestId, functionVersion) + } + case string(telemetryapi.PlatformReport): + return createPlatformReportMessage(requestId, record) + case string(telemetryapi.PlatformInitStart): + runtimeVersion, _ := record["runtimeVersion"].(string) + runtimeVersionArn, _ := record["runtimeVersionArn"].(string) + if runtimeVersion != "" || runtimeVersionArn != "" { + return fmt.Sprintf(platformInitStartLogFmt, runtimeVersion, runtimeVersionArn) + } + case string(telemetryapi.PlatformInitRuntimeDone): + status, _ := record["status"].(string) + if status != "" { + return fmt.Sprintf(platformInitRuntimeDoneLogFmt, status) + } + case string(telemetryapi.PlatformInitReport): + initType, _ := record["initializationType"].(string) + phase, _ := record["phase"].(string) + status, _ := record["status"].(string) + var durationMs float64 + if metrics, ok := record["metrics"].(map[string]interface{}); ok { + durationMs, _ = metrics["durationMs"].(float64) + } + if initType != "" || phase != "" || status != "" || durationMs != 0 { + return fmt.Sprintf(platformInitReportLogFmt, initType, phase, status, durationMs) + } + case string(telemetryapi.PlatformRestoreStart): + runtimeVersion, _ := record["runtimeVersion"].(string) + runtimeVersionArn, _ := record["runtimeVersionArn"].(string) + if runtimeVersion != "" || runtimeVersionArn != "" { + return fmt.Sprintf(platformRestoreStartLogFmt, runtimeVersion, runtimeVersionArn) + } + case string(telemetryapi.PlatformRestoreRuntimeDone): + status, _ := record["status"].(string) + if status != "" { + return fmt.Sprintf(platformRestoreRuntimeDoneLogFmt, status) + } + case string(telemetryapi.PlatformRestoreReport): + status, _ := record["status"].(string) + var durationMs float64 + if metrics, ok := record["metrics"].(map[string]interface{}); ok { + durationMs, _ = metrics["durationMs"].(float64) + } + if status != "" && durationMs != 0 { + return fmt.Sprintf(platformRestoreReportLogFmt, status, durationMs) + } + case string(telemetryapi.PlatformTelemetrySubscription): + name, _ := record["name"].(string) + types, _ := record["types"].([]interface{}) + if name != "" { + return fmt.Sprintf(platformTelemetrySubscriptionLogFmt, name, types) + } + case string(telemetryapi.PlatformExtension): + name, _ := record["name"].(string) + state, _ := record["state"].(string) + events, _ := record["events"].([]interface{}) + if name != "" { + return fmt.Sprintf(platformExtensionLogFmt, name, state, events) + } + case string(telemetryapi.PlatformLogsDropped): + droppedRecords, _ := record["droppedRecords"].(float64) + droppedBytes, _ := record["droppedBytes"].(float64) + reason, _ := record["reason"].(string) + if reason != "" { + return fmt.Sprintf(platformLogsDroppedLogFmt, int(droppedRecords), int(droppedBytes), reason) + } + } + return "" +} + +func createPlatformReportMessage(requestId string, record map[string]interface{}) string { // gathering metrics metrics, ok := record["metrics"].(map[string]interface{}) if !ok { - return nil + return "" } var durationMs, billedDurationMs, memorySizeMB, maxMemoryUsedMB float64 if durationMs, ok = metrics[string(telemetryapi.MetricDurationMs)].(float64); !ok { - return nil + return "" } if billedDurationMs, ok = metrics[string(telemetryapi.MetricBilledDurationMs)].(float64); !ok { - return nil + return "" } if memorySizeMB, ok = metrics[string(telemetryapi.MetricMemorySizeMB)].(float64); !ok { - return nil + return "" } if maxMemoryUsedMB, ok = metrics[string(telemetryapi.MetricMaxMemoryUsedMB)].(float64); !ok { - return nil + return "" } // optionally gather information about cold start time @@ -292,19 +392,8 @@ func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface } } - // gathering requestId - requestId := "" - if requestId, ok = record["requestId"].(string); !ok { - return nil - } - - // we have all information available, we can create the log record - logRecord := scopeLog.LogRecords().AppendEmpty() - logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) - - // building the body of the log record, optionally adding the init duration - body := fmt.Sprintf( - logReportFmt, + message := fmt.Sprintf( + platformReportLogFmt, requestId, durationMs, billedDurationMs, @@ -312,11 +401,10 @@ func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface maxMemoryUsedMB, ) if initDurationMs > 0 { - body += fmt.Sprintf(" Init Duration: %.2f ms", initDurationMs) + message += fmt.Sprintf(" Init Duration: %.2f ms", initDurationMs) } - logRecord.Body().SetStr(body) - return &logRecord + return message } func severityTextToNumber(severityText string) plog.SeverityNumber { @@ -418,7 +506,7 @@ func newTelemetryAPIReceiver( } } - subscribedTypes := []telemetryapi.EventType{} + var subscribedTypes []telemetryapi.EventType for _, val := range cfg.Types { switch val { case "platform": diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 346ec2fc2b..30944d5fbc 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -174,23 +174,26 @@ func TestCreatePlatformInitSpan(t *testing.T) { func TestCreateLogs(t *testing.T) { t.Parallel() + type logInfo struct { + logType string + timestamp string + body string + severityText string + containsRequestId bool + requestId string + severityNumber plog.SeverityNumber + } + testCases := []struct { - desc string - slice []event - expectedLogRecords int - expectedType string - expectedTimestamp string - expectedBody string - expectedSeverityText string - expectedContainsRequestId bool - expectedRequestId string - expectedSeverityNumber plog.SeverityNumber - expectError bool + desc string + slice []event + expectedLogs []logInfo + expectError bool }{ { - desc: "no slice", - expectedLogRecords: 0, - expectError: false, + desc: "no slice", + expectedLogs: []logInfo{}, + expectError: false, }, { desc: "Invalid Timestamp", @@ -212,14 +215,16 @@ func TestCreateLogs(t *testing.T) { Record: "[INFO] Hello world, I am an extension!", }, }, - expectedLogRecords: 1, - expectedType: "function", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "[INFO] Hello world, I am an extension!", - expectedContainsRequestId: false, - expectedSeverityText: "", - expectedSeverityNumber: plog.SeverityNumberUnspecified, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "function", + timestamp: "2022-10-12T00:03:50.000Z", + body: "[INFO] Hello world, I am an extension!", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { desc: "function text with requestId", @@ -242,15 +247,33 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 1, - expectedType: "function", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "[INFO] Hello world, I am an extension!", - expectedContainsRequestId: true, - expectedRequestId: "34472c47-5ff0-4df5-a9ad-03776afa5473", - expectedSeverityText: "", - expectedSeverityNumber: plog.SeverityNumberUnspecified, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.start", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: true, + requestId: "34472c47-5ff0-4df5-a9ad-03776afa5473", + severityNumber: plog.SeverityNumberUnspecified, + }, + { + logType: "function", + timestamp: "2022-10-12T00:03:50.000Z", + body: "[INFO] Hello world, I am an extension!", + containsRequestId: true, + requestId: "34472c47-5ff0-4df5-a9ad-03776afa5473", + severityNumber: plog.SeverityNumberUnspecified, + }, + { + logType: "platform.runtimeDone", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: true, + requestId: "34472c47-5ff0-4df5-a9ad-03776afa5473", + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { desc: "function json", @@ -266,15 +289,18 @@ func TestCreateLogs(t *testing.T) { }, }, }, - expectedLogRecords: 1, - expectedType: "function", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "Hello world, I am a function!", - expectedContainsRequestId: true, - expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68189", - expectedSeverityText: "Info", - expectedSeverityNumber: plog.SeverityNumberInfo, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "function", + timestamp: "2022-10-12T00:03:50.000Z", + body: "Hello world, I am a function!", + containsRequestId: true, + requestId: "79b4f56e-95b1-4643-9700-2807f4e68189", + severityText: "Info", + severityNumber: plog.SeverityNumberInfo, + }, + }, + expectError: false, }, { desc: "extension text", @@ -285,14 +311,16 @@ func TestCreateLogs(t *testing.T) { Record: "[INFO] Hello world, I am an extension!", }, }, - expectedLogRecords: 1, - expectedType: "extension", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "[INFO] Hello world, I am an extension!", - expectedContainsRequestId: false, - expectedSeverityText: "", - expectedSeverityNumber: plog.SeverityNumberUnspecified, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "extension", + timestamp: "2022-10-12T00:03:50.000Z", + body: "[INFO] Hello world, I am an extension!", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { desc: "extension json", @@ -308,15 +336,18 @@ func TestCreateLogs(t *testing.T) { }, }, }, - expectedLogRecords: 1, - expectedType: "extension", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "Hello world, I am an extension!", - expectedContainsRequestId: true, - expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68689", - expectedSeverityText: "Info", - expectedSeverityNumber: plog.SeverityNumberInfo, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "extension", + timestamp: "2022-10-12T00:03:50.000Z", + body: "Hello world, I am an extension!", + containsRequestId: true, + requestId: "79b4f56e-95b1-4643-9700-2807f4e68689", + severityText: "Info", + severityNumber: plog.SeverityNumberInfo, + }, + }, + expectError: false, }, { desc: "extension json anything", @@ -332,18 +363,21 @@ func TestCreateLogs(t *testing.T) { }, }, }, - expectedLogRecords: 1, - expectedType: "extension", - expectedTimestamp: "2022-10-12T00:03:50.000Z", - expectedBody: "Hello world, I am an extension!", - expectedContainsRequestId: true, - expectedRequestId: "79b4f56e-95b1-4643-9700-2807f4e68689", - expectedSeverityText: "Unspecified", - expectedSeverityNumber: plog.SeverityNumberUnspecified, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "extension", + timestamp: "2022-10-12T00:03:50.000Z", + body: "Hello world, I am an extension!", + containsRequestId: true, + requestId: "79b4f56e-95b1-4643-9700-2807f4e68689", + severityText: "Unspecified", + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.initStart anything", + desc: "platform.initStart", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -351,11 +385,19 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.initStart", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.initRuntimeDone anything", + desc: "platform.initRuntimeDone", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -363,11 +405,19 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.initRuntimeDone", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.initReport anything", + desc: "platform.initReport", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -375,11 +425,19 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.initReport", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.start anything", + desc: "platform.start", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -389,11 +447,20 @@ func TestCreateLogs(t *testing.T) { }, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.start", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: true, + requestId: "test-id", + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.runtimeDone anything", + desc: "platform.runtimeDone", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -401,11 +468,19 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.runtimeDone", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.report anything", + desc: "platform.report", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -413,11 +488,19 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.report", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.restoreStart anything", + desc: "platform.restoreStart", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -425,11 +508,19 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.restoreStart", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.restoreRuntimeDone anything", + desc: "platform.restoreRuntimeDone", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -437,23 +528,39 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.restoreRuntimeDone", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.restoreReport anything", + desc: "platform.restoreReport", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", - Type: "platform.restoreStart", + Type: "platform.restoreReport", Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.restoreReport", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.telemetrySubscription anything", + desc: "platform.telemetrySubscription", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -461,11 +568,19 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.telemetrySubscription", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, { - desc: "platform.logsDropped anything", + desc: "platform.logsDropped", slice: []event{ { Time: "2022-10-12T00:03:50.000Z", @@ -473,44 +588,61 @@ func TestCreateLogs(t *testing.T) { Record: map[string]any{}, }, }, - expectedLogRecords: 0, - expectError: false, + expectedLogs: []logInfo{ + { + logType: "platform.logsDropped", + timestamp: "2022-10-12T00:03:50.000Z", + body: "", + containsRequestId: false, + severityNumber: plog.SeverityNumberUnspecified, + }, + }, + expectError: false, }, } + for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { r, err := newTelemetryAPIReceiver( - &Config{}, + &Config{ + LogReport: true, + }, receivertest.NewNopSettings(Type), ) require.NoError(t, err) log, err := r.createLogs(tc.slice) if tc.expectError { require.Error(t, err) - } else { - require.Equal(t, 1, log.ResourceLogs().Len()) - resourceLog := log.ResourceLogs().At(0) - require.Equal(t, 1, resourceLog.ScopeLogs().Len()) - scopeLog := resourceLog.ScopeLogs().At(0) - require.Equal(t, scopeName, scopeLog.Scope().Name()) - require.Equal(t, tc.expectedLogRecords, scopeLog.LogRecords().Len()) - if scopeLog.LogRecords().Len() > 0 { - logRecord := scopeLog.LogRecords().At(0) - attr, ok := logRecord.Attributes().Get("type") - require.True(t, ok) - require.Equal(t, tc.expectedType, attr.Str()) - expectedTime, err := time.Parse(time.RFC3339, tc.expectedTimestamp) - require.NoError(t, err) - require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp()) - requestId, ok := logRecord.Attributes().Get(semconv.AttributeFaaSInvocationID) - require.Equal(t, tc.expectedContainsRequestId, ok) - if ok { - require.Equal(t, tc.expectedRequestId, requestId.Str()) - } - require.Equal(t, tc.expectedSeverityText, logRecord.SeverityText()) - require.Equal(t, tc.expectedSeverityNumber, logRecord.SeverityNumber()) - require.Equal(t, tc.expectedBody, logRecord.Body().Str()) + return + } + require.NoError(t, err) + require.Equal(t, 1, log.ResourceLogs().Len()) + resourceLog := log.ResourceLogs().At(0) + require.Equal(t, 1, resourceLog.ScopeLogs().Len()) + scopeLog := resourceLog.ScopeLogs().At(0) + require.Equal(t, scopeName, scopeLog.Scope().Name()) + require.Equal(t, len(tc.expectedLogs), scopeLog.LogRecords().Len()) + + for i, expected := range tc.expectedLogs { + logRecord := scopeLog.LogRecords().At(i) + + attr, ok := logRecord.Attributes().Get("type") + require.True(t, ok) + require.Equal(t, expected.logType, attr.Str()) + + expectedTime, err := time.Parse(time.RFC3339, expected.timestamp) + require.NoError(t, err) + require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp()) + + requestId, ok := logRecord.Attributes().Get(semconv.AttributeFaaSInvocationID) + require.Equal(t, expected.containsRequestId, ok) + if ok { + require.Equal(t, expected.requestId, requestId.Str()) } + + require.Equal(t, expected.severityText, logRecord.SeverityText()) + require.Equal(t, expected.severityNumber, logRecord.SeverityNumber()) + require.Equal(t, expected.body, logRecord.Body().Str()) } }) } @@ -520,15 +652,15 @@ func TestCreateLogsWithLogReport(t *testing.T) { t.Parallel() testCases := []struct { - desc string - slice []event - logReport bool - expectedLogRecords int - expectedType string - expectedTimestamp string - expectedBody string - expectedAttributes map[string]interface{} - expectError bool + desc string + slice []event + logReport bool + expectedLogRecords int + expectedType string + expectedTimestamp string + expectedBody string + expectedAttributes map[string]interface{} + expectError bool }{ { desc: "platform.report with logReport enabled - valid metrics", @@ -662,8 +794,11 @@ func TestCreateLogsWithLogReport(t *testing.T) { }, }, logReport: true, - expectedLogRecords: 0, + expectedLogRecords: 1, expectError: false, + expectedType: "platform.report", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "invalid record format", }, { desc: "platform.report with logReport enabled - with initDurationMs", @@ -754,6 +889,387 @@ func TestCreateLogsWithLogReport(t *testing.T) { } } +func TestCreatePlatformMessage(t *testing.T) { + t.Parallel() + + testCases := []struct { + desc string + requestId string + functionVersion string + eventType string + record map[string]interface{} + expected string + }{ + { + desc: "platform.start with requestId and functionVersion", + requestId: "test-request-id", + functionVersion: "$LATEST", + eventType: "platform.start", + record: map[string]interface{}{}, + expected: "START RequestId: test-request-id Version: $LATEST", + }, + { + desc: "platform.start with empty requestId", + requestId: "", + functionVersion: "$LATEST", + eventType: "platform.start", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.start with empty functionVersion", + requestId: "test-request-id", + functionVersion: "", + eventType: "platform.start", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.runtimeDone with requestId and functionVersion", + requestId: "test-request-id", + functionVersion: "v1.0.0", + eventType: "platform.runtimeDone", + record: map[string]interface{}{}, + expected: "END RequestId: test-request-id Version: v1.0.0", + }, + { + desc: "platform.runtimeDone with empty requestId", + requestId: "", + functionVersion: "v1.0.0", + eventType: "platform.runtimeDone", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.runtimeDone with empty functionVersion", + requestId: "test-request-id", + functionVersion: "", + eventType: "platform.runtimeDone", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.report with valid metrics", + requestId: "test-request-id", + functionVersion: "$LATEST", + eventType: "platform.report", + record: map[string]interface{}{ + "metrics": map[string]interface{}{ + "durationMs": 100.5, + "billedDurationMs": 101.0, + "memorySizeMB": 128.0, + "maxMemoryUsedMB": 64.0, + }, + }, + expected: "REPORT RequestId: test-request-id Duration: 100.50 ms Billed Duration: 101 ms Memory Size: 128 MB Max Memory Used: 64 MB", + }, + { + desc: "platform.report with missing metrics", + requestId: "test-request-id", + functionVersion: "$LATEST", + eventType: "platform.report", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.initStart with runtimeVersion and runtimeVersionArn", + requestId: "", + functionVersion: "", + eventType: "platform.initStart", + record: map[string]interface{}{ + "runtimeVersion": "python:3.9", + "runtimeVersionArn": "arn:aws:lambda:us-east-1::runtime:python:3.9", + }, + expected: "INIT_START Runtime Version: python:3.9 Runtime Version ARN: arn:aws:lambda:us-east-1::runtime:python:3.9", + }, + { + desc: "platform.initStart with only runtimeVersion", + requestId: "", + functionVersion: "", + eventType: "platform.initStart", + record: map[string]interface{}{ + "runtimeVersion": "nodejs:18", + }, + expected: "INIT_START Runtime Version: nodejs:18 Runtime Version ARN: ", + }, + { + desc: "platform.initStart with only runtimeVersionArn", + requestId: "", + functionVersion: "", + eventType: "platform.initStart", + record: map[string]interface{}{ + "runtimeVersionArn": "arn:aws:lambda:us-east-1::runtime:go:1.x", + }, + expected: "INIT_START Runtime Version: Runtime Version ARN: arn:aws:lambda:us-east-1::runtime:go:1.x", + }, + { + desc: "platform.initStart with empty record", + requestId: "", + functionVersion: "", + eventType: "platform.initStart", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.initRuntimeDone with status success", + requestId: "", + functionVersion: "", + eventType: "platform.initRuntimeDone", + record: map[string]interface{}{ + "status": "success", + }, + expected: "INIT_RUNTIME_DONE Status: success", + }, + { + desc: "platform.initRuntimeDone with status failure", + requestId: "", + functionVersion: "", + eventType: "platform.initRuntimeDone", + record: map[string]interface{}{ + "status": "failure", + }, + expected: "INIT_RUNTIME_DONE Status: failure", + }, + { + desc: "platform.initRuntimeDone with empty status", + requestId: "", + functionVersion: "", + eventType: "platform.initRuntimeDone", + record: map[string]interface{}{ + "status": "", + }, + expected: "", + }, + { + desc: "platform.initRuntimeDone with missing status", + requestId: "", + functionVersion: "", + eventType: "platform.initRuntimeDone", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.initReport with all fields", + requestId: "", + functionVersion: "", + eventType: "platform.initReport", + record: map[string]interface{}{ + "initializationType": "on-demand", + "phase": "init", + "status": "success", + "metrics": map[string]interface{}{ + "durationMs": 250.75, + }, + }, + expected: "INIT_REPORT Initialization Type: on-demand Phase: init Status: success Duration: 250.75 ms", + }, + { + desc: "platform.initReport with provisioned-concurrency", + requestId: "", + functionVersion: "", + eventType: "platform.initReport", + record: map[string]interface{}{ + "initializationType": "provisioned-concurrency", + "phase": "init", + "status": "success", + "metrics": map[string]interface{}{ + "durationMs": 100.0, + }, + }, + expected: "INIT_REPORT Initialization Type: provisioned-concurrency Phase: init Status: success Duration: 100.00 ms", + }, + { + desc: "platform.initReport with empty record", + requestId: "", + functionVersion: "", + eventType: "platform.initReport", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.initReport with only initType", + requestId: "", + functionVersion: "", + eventType: "platform.initReport", + record: map[string]interface{}{ + "initializationType": "on-demand", + }, + expected: "INIT_REPORT Initialization Type: on-demand Phase: Status: Duration: 0.00 ms", + }, + { + desc: "platform.restoreStart with runtimeVersion and runtimeVersionArn", + requestId: "", + functionVersion: "", + eventType: "platform.restoreStart", + record: map[string]interface{}{ + "runtimeVersion": "python:3.9", + "runtimeVersionArn": "arn:aws:lambda:us-east-1::runtime:python:3.9", + }, + expected: "RESTORE_START Runtime Version: python:3.9 Runtime Version ARN: arn:aws:lambda:us-east-1::runtime:python:3.9", + }, + { + desc: "platform.restoreStart with empty record", + requestId: "", + functionVersion: "", + eventType: "platform.restoreStart", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "platform.restoreRuntimeDone with status", + requestId: "", + functionVersion: "", + eventType: "platform.restoreRuntimeDone", + record: map[string]interface{}{ + "status": "success", + }, + expected: "RESTORE_RUNTIME_DONE Status: success", + }, + { + desc: "platform.restoreRuntimeDone with empty status", + requestId: "", + functionVersion: "", + eventType: "platform.restoreRuntimeDone", + record: map[string]interface{}{ + "status": "", + }, + expected: "", + }, + { + desc: "platform.restoreReport with status and duration", + requestId: "", + functionVersion: "", + eventType: "platform.restoreReport", + record: map[string]interface{}{ + "status": "success", + "metrics": map[string]interface{}{ + "durationMs": 50.25, + }, + }, + expected: "RESTORE_REPORT Status: success Duration: 50.25 ms", + }, + { + desc: "platform.restoreReport with empty status", + requestId: "", + functionVersion: "", + eventType: "platform.restoreReport", + record: map[string]interface{}{ + "status": "", + "metrics": map[string]interface{}{ + "durationMs": 50.25, + }, + }, + expected: "", + }, + { + desc: "platform.restoreReport with zero duration", + requestId: "", + functionVersion: "", + eventType: "platform.restoreReport", + record: map[string]interface{}{ + "status": "success", + "metrics": map[string]interface{}{ + "durationMs": 0.0, + }, + }, + expected: "", + }, + { + desc: "platform.telemetrySubscription with name and types", + requestId: "", + functionVersion: "", + eventType: "platform.telemetrySubscription", + record: map[string]interface{}{ + "name": "my-extension", + "types": []interface{}{"platform", "function"}, + }, + expected: "TELEMETRY: my-extension Subscribed Types: [platform function]", + }, + { + desc: "platform.telemetrySubscription with empty name", + requestId: "", + functionVersion: "", + eventType: "platform.telemetrySubscription", + record: map[string]interface{}{ + "name": "", + "types": []interface{}{"platform"}, + }, + expected: "", + }, + { + desc: "platform.extension with all fields", + requestId: "", + functionVersion: "", + eventType: "platform.extension", + record: map[string]interface{}{ + "name": "my-extension", + "state": "Ready", + "events": []interface{}{"INVOKE", "SHUTDOWN"}, + }, + expected: "EXTENSION Name: my-extension State: Ready Events: [INVOKE SHUTDOWN]", + }, + { + desc: "platform.extension with empty name", + requestId: "", + functionVersion: "", + eventType: "platform.extension", + record: map[string]interface{}{ + "name": "", + "state": "Ready", + "events": []interface{}{"INVOKE"}, + }, + expected: "", + }, + { + desc: "platform.logsDropped with all fields", + requestId: "", + functionVersion: "", + eventType: "platform.logsDropped", + record: map[string]interface{}{ + "droppedRecords": float64(10), + "droppedBytes": float64(1024), + "reason": "Consumer is too slow", + }, + expected: "LOGS_DROPPED DroppedRecords: 10 DroppedBytes: 1024 Reason: Consumer is too slow", + }, + { + desc: "platform.logsDropped with empty reason", + requestId: "", + functionVersion: "", + eventType: "platform.logsDropped", + record: map[string]interface{}{ + "droppedRecords": float64(10), + "droppedBytes": float64(1024), + "reason": "", + }, + expected: "", + }, + { + desc: "unknown event type", + requestId: "test-id", + functionVersion: "v1", + eventType: "platform.unknown", + record: map[string]interface{}{}, + expected: "", + }, + { + desc: "function event type", + requestId: "test-id", + functionVersion: "v1", + eventType: "function", + record: map[string]interface{}{}, + expected: "", + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + result := createPlatformMessage(tc.requestId, tc.functionVersion, tc.eventType, tc.record) + require.Equal(t, tc.expected, result) + }) + } +} + func TestSeverityTextToNumber(t *testing.T) { t.Parallel()