diff --git a/src/carnot/planner/ir/otel_export_sink_ir.cc b/src/carnot/planner/ir/otel_export_sink_ir.cc index 672ca2c5767..206c77bf77a 100644 --- a/src/carnot/planner/ir/otel_export_sink_ir.cc +++ b/src/carnot/planner/ir/otel_export_sink_ir.cc @@ -160,6 +160,29 @@ Status OTelExportSinkIR::ProcessConfig(const OTelData& data) { new_span.span_kind = span.span_kind; data_.spans.push_back(std::move(new_span)); } + for (const auto& log : data.logs) { + OTelLog new_log; + + PX_ASSIGN_OR_RETURN(new_log.time_column, AddColumn(log.time_column)); + PX_ASSIGN_OR_RETURN(new_log.body_column, AddColumn(log.body_column)); + if (log.observed_time_column != nullptr) { + PX_ASSIGN_OR_RETURN(new_log.observed_time_column, AddColumn(log.observed_time_column)); + } + + new_log.severity_text = log.severity_text; + new_log.severity_number = log.severity_number; + + for (const auto& attr : log.attributes) { + if (attr.column_reference == nullptr) { + new_log.attributes.push_back({attr.name, nullptr, attr.string_value}); + continue; + } + PX_ASSIGN_OR_RETURN(auto column, AddColumn(attr.column_reference)); + new_log.attributes.push_back({attr.name, column, ""}); + } + + data_.logs.push_back(std::move(new_log)); + } return Status::OK(); } @@ -330,6 +353,46 @@ Status OTelExportSinkIR::ToProto(planpb::Operator* op) const { } span_pb->set_kind_value(span.span_kind); } + for (const auto& log : data_.logs) { + auto log_pb = otel_op->add_logs(); + + if (log.time_column->EvaluatedDataType() != types::TIME64NS) { + return log.time_column->CreateIRNodeError( + "Expected time column '$0' to be TIME64NS, received $1", log.time_column->col_name(), + types::ToString(log.time_column->EvaluatedDataType())); + } + PX_ASSIGN_OR_RETURN(auto time_column_index, log.time_column->GetColumnIndex()); + log_pb->set_time_column_index(time_column_index); + + if (log.observed_time_column != nullptr) { + if (log.observed_time_column->EvaluatedDataType() != types::TIME64NS) { + return log.observed_time_column->CreateIRNodeError( + "Expected observed_time column '$0' to be TIME64NS, received $1", + log.observed_time_column->col_name(), + types::ToString(log.observed_time_column->EvaluatedDataType())); + } + PX_ASSIGN_OR_RETURN(auto observed_time_column_index, + log.observed_time_column->GetColumnIndex()); + log_pb->set_observed_time_column_index(observed_time_column_index); + } else { + log_pb->set_observed_time_column_index(-1); + } + + log_pb->set_severity_text(log.severity_text); + log_pb->set_severity_number(log.severity_number); + + if (log.body_column->EvaluatedDataType() != types::STRING) { + return log.body_column->CreateIRNodeError( + "Expected body column '$0' to be STRING, received $1", log.body_column->col_name(), + types::ToString(log.body_column->EvaluatedDataType())); + } + PX_ASSIGN_OR_RETURN(auto body_column_index, log.body_column->GetColumnIndex()); + log_pb->set_body_column_index(body_column_index); + + for (const auto& attribute : log.attributes) { + PX_RETURN_IF_ERROR(attribute.ToProto(log_pb->add_attributes())); + } + } return Status::OK(); } diff --git a/src/carnot/planner/ir/otel_export_sink_ir.h b/src/carnot/planner/ir/otel_export_sink_ir.h index 2caad972498..76e963ae0d6 100644 --- a/src/carnot/planner/ir/otel_export_sink_ir.h +++ b/src/carnot/planner/ir/otel_export_sink_ir.h @@ -127,11 +127,23 @@ struct OTelSpan { int64_t span_kind; }; +struct OTelLog { + std::vector attributes; + + ColumnIR* time_column; + ColumnIR* observed_time_column = nullptr; + ColumnIR* body_column; + + int64_t severity_number; + std::string severity_text; +}; + struct OTelData { planpb::OTelEndpointConfig endpoint_config; std::vector resource_attributes; std::vector metrics; std::vector spans; + std::vector logs; }; /** diff --git a/src/carnot/planner/ir/otel_export_sink_ir_test.cc b/src/carnot/planner/ir/otel_export_sink_ir_test.cc index b508b2d8afb..286f0e8d700 100644 --- a/src/carnot/planner/ir/otel_export_sink_ir_test.cc +++ b/src/carnot/planner/ir/otel_export_sink_ir_test.cc @@ -443,6 +443,84 @@ INSTANTIATE_TEST_SUITE_P( .ConsumeValueOrDie(); }, }, + { + "logs_basic", + table_store::schema::Relation{{types::TIME64NS, types::STRING, types::STRING}, + {"start_time", "attribute_str", "log_message"}, + {types::ST_NONE, types::ST_NONE, types::ST_NONE}}, + R"pb( + endpoint_config {} + resource {} + logs { + attributes { + name: "service.name" + column { + column_type: STRING + column_index: 1 + } + } + time_column_index: 0 + observed_time_column_index: -1 + severity_number: 4 + severity_text: "INFO" + body_column_index: 2 + } + )pb", + [](IR* graph, OperatorIR* parent, table_store::schema::Relation* relation) { + OTelData data; + + auto& log = data.logs.emplace_back(); + log.time_column = CreateTypedColumn(graph, "start_time", relation); + log.attributes.push_back( + {"service.name", CreateTypedColumn(graph, "attribute_str", relation), ""}); + log.severity_number = 4; + log.severity_text = "INFO"; + log.body_column = CreateTypedColumn(graph, "log_message", relation); + + return graph->CreateNode(parent->ast(), parent, data) + .ConsumeValueOrDie(); + }, + }, + { + "logs_with_observed_time_col", + table_store::schema::Relation{ + {types::TIME64NS, types::TIME64NS, types::STRING, types::STRING}, + {"start_time", "observed_time", "attribute_str", "log_message"}, + {types::ST_NONE, types::ST_NONE, types::ST_NONE, types::ST_NONE}}, + R"pb( + endpoint_config {} + resource {} + logs { + attributes { + name: "service.name" + column { + column_type: STRING + column_index: 2 + } + } + time_column_index: 0 + observed_time_column_index: 1 + severity_number: 4 + severity_text: "INFO" + body_column_index: 3 + } + )pb", + [](IR* graph, OperatorIR* parent, table_store::schema::Relation* relation) { + OTelData data; + + auto& log = data.logs.emplace_back(); + log.time_column = CreateTypedColumn(graph, "start_time", relation); + log.observed_time_column = CreateTypedColumn(graph, "observed_time", relation); + log.attributes.push_back( + {"service.name", CreateTypedColumn(graph, "attribute_str", relation), ""}); + log.severity_number = 4; + log.severity_text = "INFO"; + log.body_column = CreateTypedColumn(graph, "log_message", relation); + + return graph->CreateNode(parent->ast(), parent, data) + .ConsumeValueOrDie(); + }, + }, { "string_value_attributes", table_store::schema::Relation{{types::TIME64NS, types::INT64}, @@ -557,6 +635,33 @@ OTelExportSinkIR* CreateSpanWithNameString(IR* graph, OperatorIR* parent, return graph->CreateNode(parent->ast(), parent, data).ConsumeValueOrDie(); } +OTelExportSinkIR* CreateLog(IR* graph, OperatorIR* parent, + table_store::schema::Relation* relation) { + OTelData data; + + auto& log = data.logs.emplace_back(); + log.time_column = CreateTypedColumn(graph, "start_time", relation); + log.body_column = CreateTypedColumn(graph, "log_message", relation); + log.severity_number = 4; + log.severity_text = "INFO"; + + return graph->CreateNode(parent->ast(), parent, data).ConsumeValueOrDie(); +} + +OTelExportSinkIR* CreateLogWithObservedTime(IR* graph, OperatorIR* parent, + table_store::schema::Relation* relation) { + OTelData data; + + auto& log = data.logs.emplace_back(); + log.time_column = CreateTypedColumn(graph, "start_time", relation); + log.observed_time_column = CreateTypedColumn(graph, "observed_time", relation); + log.body_column = CreateTypedColumn(graph, "log_message", relation); + log.severity_number = 4; + log.severity_text = "INFO"; + + return graph->CreateNode(parent->ast(), parent, data).ConsumeValueOrDie(); +} + INSTANTIATE_TEST_SUITE_P( ErrorTests, WrongColumnTypesTest, ::testing::ValuesIn(std::vector{ @@ -723,6 +828,31 @@ INSTANTIATE_TEST_SUITE_P( .ConsumeValueOrDie(); }, }, + { + "log_time_column_wrong", + table_store::schema::Relation{{types::INT64, types::STRING, types::STRING}, + {"start_time", "attribute_str", "log_message"}, + {types::ST_NONE, types::ST_NONE, types::ST_NONE}}, + "Expected time column 'start_time' to be TIME64NS, received INT64", + &CreateLog, + }, + { + "log_body_column_wrong", + table_store::schema::Relation{{types::TIME64NS, types::STRING, types::TIME64NS}, + {"start_time", "attribute_str", "log_message"}, + {types::ST_NONE, types::ST_NONE, types::ST_NONE}}, + "Expected body column 'log_message' to be STRING, received TIME64NS", + &CreateLog, + }, + { + "log_observed_time_column_wrong", + table_store::schema::Relation{ + {types::TIME64NS, types::INT64, types::STRING, types::STRING}, + {"start_time", "observed_time", "attribute_str", "log_message"}, + {types::ST_NONE, types::ST_NONE, types::ST_NONE, types::ST_NONE}}, + "Expected observed_time column 'observed_time' to be TIME64NS, received INT64", + &CreateLogWithObservedTime, + }, }), [](const ::testing::TestParamInfo& info) { return info.param.name; }); } // namespace planner