Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions internal/elasticsearch/ml/datafeed_state/acc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,81 @@ func TestAccResourceMLDatafeedState_withTimes(t *testing.T) {
},
})
}

func TestAccResourceMLDatafeedState_multiStep(t *testing.T) {
jobID := fmt.Sprintf("test-job-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum))
datafeedID := fmt.Sprintf("test-datafeed-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum))
indexName := fmt.Sprintf("test-datafeed-index-%s", sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum))

resource.Test(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
Steps: []resource.TestStep{
{
ProtoV6ProviderFactories: acctest.Providers,
ConfigDirectory: acctest.NamedTestCaseDirectory("closed_stopped"),
ConfigVariables: config.Variables{
"job_id": config.StringVariable(jobID),
"datafeed_id": config.StringVariable(datafeedID),
"index_name": config.StringVariable(indexName),
},
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.nginx", "state", "stopped"),
resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.nginx", "force", "true"),
),
},
{
ProtoV6ProviderFactories: acctest.Providers,
ConfigDirectory: acctest.NamedTestCaseDirectory("job_opened"),
ConfigVariables: config.Variables{
"job_id": config.StringVariable(jobID),
"datafeed_id": config.StringVariable(datafeedID),
"index_name": config.StringVariable(indexName),
},
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.nginx", "state", "stopped"),
resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.nginx", "force", "false"),
),
},
{
ProtoV6ProviderFactories: acctest.Providers,
ConfigDirectory: acctest.NamedTestCaseDirectory("started_no_time"),
ConfigVariables: config.Variables{
"job_id": config.StringVariable(jobID),
"datafeed_id": config.StringVariable(datafeedID),
"index_name": config.StringVariable(indexName),
},
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.nginx", "state", "started"),
resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.nginx", "force", "false"),
),
},
{
ProtoV6ProviderFactories: acctest.Providers,
ConfigDirectory: acctest.NamedTestCaseDirectory("stopped_job_open"),
ConfigVariables: config.Variables{
"job_id": config.StringVariable(jobID),
"datafeed_id": config.StringVariable(datafeedID),
"index_name": config.StringVariable(indexName),
},
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.nginx", "state", "stopped"),
resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.nginx", "force", "false"),
),
},
{
ProtoV6ProviderFactories: acctest.Providers,
ConfigDirectory: acctest.NamedTestCaseDirectory("started_with_time"),
ConfigVariables: config.Variables{
"job_id": config.StringVariable(jobID),
"datafeed_id": config.StringVariable(datafeedID),
"index_name": config.StringVariable(indexName),
},
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.nginx", "state", "started"),
resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.nginx", "force", "false"),
resource.TestCheckResourceAttr("elasticstack_elasticsearch_ml_datafeed_state.nginx", "start", "2025-12-01T00:00:00+01:00"),
),
},
},
})
}
61 changes: 37 additions & 24 deletions internal/elasticsearch/ml/datafeed_state/models.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package datafeed_state

import (
"strconv"
"time"

"github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed"
"github.com/elastic/terraform-provider-elasticstack/internal/models"
"github.com/elastic/terraform-provider-elasticstack/internal/utils"
"github.com/elastic/terraform-provider-elasticstack/internal/utils/customtypes"
Expand All @@ -25,42 +25,55 @@ type MLDatafeedStateData struct {
Timeouts timeouts.Value `tfsdk:"timeouts"`
}

func (d *MLDatafeedStateData) GetStartAsString() (string, diag.Diagnostics) {
return d.getTimeAttributeAsString(d.Start)
}

func (d *MLDatafeedStateData) GetEndAsString() (string, diag.Diagnostics) {
return d.getTimeAttributeAsString(d.End)
}

func (d *MLDatafeedStateData) getTimeAttributeAsString(val timetypes.RFC3339) (string, diag.Diagnostics) {
if !utils.IsKnown(val) {
return "", nil
func timeInSameLocation(ms int64, source timetypes.RFC3339) (time.Time, diag.Diagnostics) {
t := time.UnixMilli(ms)
if !utils.IsKnown(source) {
return t, nil
}

valTime, diags := val.ValueRFC3339Time()
sourceTime, diags := source.ValueRFC3339Time()
if diags.HasError() {
return "", diags
return t, diags
}
return strconv.FormatInt(valTime.Unix(), 10), nil

t = t.In(sourceTime.Location())
return t, nil
}

func (d *MLDatafeedStateData) SetStartAndEndFromAPI(datafeedStats *models.DatafeedStats) diag.Diagnostics {
var diags diag.Diagnostics
if datafeedStats.RunningState == nil {
diags.AddWarning("Running state was empty for a started datafeed", "The Elasticsearch API returned an empty running state for a Datafeed which was successfully started. Ignoring start and end response values.")
return diags

if datafeed.State(datafeedStats.State) == datafeed.StateStarted {
if datafeedStats.RunningState == nil {
diags.AddWarning("Running state was empty for a started datafeed", "The Elasticsearch API returned an empty running state for a Datafeed which was successfully started. Ignoring start and end response values.")
return diags
}

if datafeedStats.RunningState.SearchInterval != nil {
start, diags := timeInSameLocation(datafeedStats.RunningState.SearchInterval.StartMS, d.Start)
if diags.HasError() {
return diags
}

end, diags := timeInSameLocation(datafeedStats.RunningState.SearchInterval.EndMS, d.End)
if diags.HasError() {
return diags
}
Comment on lines +53 to +61
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable diags is reused for both start and end conversions, shadowing the outer diags variable declared at line 44. This makes it unclear which diagnostics are being accumulated. Use distinct variable names like startDiags and endDiags to avoid confusion.

Copilot uses AI. Check for mistakes.

d.Start = timetypes.NewRFC3339TimeValue(start)
d.End = timetypes.NewRFC3339TimeValue(end)
}

if datafeedStats.RunningState.RealTimeConfigured {
d.End = timetypes.NewRFC3339Null()
}
}

if datafeedStats.RunningState.SearchInterval != nil {
d.Start = timetypes.NewRFC3339TimeValue(time.UnixMilli(datafeedStats.RunningState.SearchInterval.StartMS))
d.End = timetypes.NewRFC3339TimeValue(time.UnixMilli(datafeedStats.RunningState.SearchInterval.EndMS))
} else {
if d.Start.IsUnknown() {
d.Start = timetypes.NewRFC3339Null()
d.End = timetypes.NewRFC3339Null()
}

if datafeedStats.RunningState.RealTimeConfigured {
if d.End.IsUnknown() {
d.End = timetypes.NewRFC3339Null()
}

Expand Down
5 changes: 1 addition & 4 deletions internal/elasticsearch/ml/datafeed_state/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/elastic/terraform-provider-elasticstack/internal/clients"
"github.com/elastic/terraform-provider-elasticstack/internal/clients/elasticsearch"
"github.com/elastic/terraform-provider-elasticstack/internal/diagutil"
"github.com/elastic/terraform-provider-elasticstack/internal/elasticsearch/ml/datafeed"
"github.com/hashicorp/terraform-plugin-framework/diag"
"github.com/hashicorp/terraform-plugin-framework/resource"
"github.com/hashicorp/terraform-plugin-framework/types"
Expand Down Expand Up @@ -64,9 +63,7 @@ func (r *mlDatafeedStateResource) read(ctx context.Context, data MLDatafeedState

data.Id = types.StringValue(compId.String())

if datafeed.State(datafeedStats.State) == datafeed.StateStarted {
diags.Append(data.SetStartAndEndFromAPI(datafeedStats)...)
}
diags.Append(data.SetStartAndEndFromAPI(datafeedStats)...)

return &data, diags
}
1 change: 1 addition & 0 deletions internal/elasticsearch/ml/datafeed_state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func GetSchema() schema.Schema {
Computed: true,
PlanModifiers: []planmodifier.String{
stringplanmodifier.UseStateForUnknown(),
SetUnknownIfStateHasChanges(),
},
},
"end": schema.StringAttribute{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we treat it similarly to “start”? (e.g. use SetUnknownIfStateHasChanges)?

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package datafeed_state

import (
"context"

"github.com/elastic/terraform-provider-elasticstack/internal/utils"
"github.com/hashicorp/terraform-plugin-framework/path"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier"
"github.com/hashicorp/terraform-plugin-framework/types"
)

// SetUnknownIfStateHasChanges returns a plan modifier that sets the current attribute to unknown
// if the state attribute has changed between state and config.
func SetUnknownIfStateHasChanges() planmodifier.String {
return setUnknownIfStateHasChanges{}
}

type setUnknownIfStateHasChanges struct{}

func (s setUnknownIfStateHasChanges) Description(ctx context.Context) string {
return "Sets the attribute value to unknown if the state attribute has changed"
}

func (s setUnknownIfStateHasChanges) MarkdownDescription(ctx context.Context) string {
return s.Description(ctx)
}

func (s setUnknownIfStateHasChanges) PlanModifyString(ctx context.Context, req planmodifier.StringRequest, resp *planmodifier.StringResponse) {
// Only apply this modifier if we have both state and config
if req.State.Raw.IsNull() || req.Config.Raw.IsNull() {
return
}

// Continue using the config value if it's explicitly set
if utils.IsKnown(req.ConfigValue) {
return
}

// Get the state attribute from state and config to check if it has changed
var stateValue, configValue types.String
resp.Diagnostics.Append(req.State.GetAttribute(ctx, path.Root("state"), &stateValue)...)
resp.Diagnostics.Append(req.Config.GetAttribute(ctx, path.Root("state"), &configValue)...)
if resp.Diagnostics.HasError() {
return
}

// If the state attribute has changed between state and config, set the current attribute to Unknown
if !stateValue.Equal(configValue) {
resp.PlanValue = types.StringUnknown()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
variable "job_id" {
description = "The ML job ID"
type = string
}

variable "datafeed_id" {
description = "The ML datafeed ID"
type = string
}

variable "index_name" {
description = "The index name"
type = string
}

provider "elasticstack" {
elasticsearch {}
}

resource "elasticstack_elasticsearch_index" "test" {
name = var.index_name
deletion_protection = false
mappings = jsonencode({
properties = {
"@timestamp" = {
type = "date"
}
nginx = {
properties = {
access = {
properties = {
body_sent = {
properties = {
bytes = {
type = "long"
}
}
}
geoip = {
properties = {
city_name = {
type = "keyword"
}
}
}
user_agent = {
properties = {
build = {
type = "keyword"
}
}
}
}
}
}
}
}
})
}

resource "elasticstack_elasticsearch_ml_anomaly_detection_job" "nginx" {
job_id = var.job_id
description = "Anomaly detection for network traffic"
analysis_config = {
bucket_span = "15m"
detectors = [
{
function = "count"
detector_description = "count"
},
{
function = "mean"
field_name = "nginx.access.body_sent.bytes"
detector_description = "mean(\"nginx.access.body_sent.bytes\")"
}
]
influencers = ["nginx.access.geoip.city_name", "nginx.access.user_agent.build"]
model_prune_window = "30d"
}
analysis_limits = {
model_memory_limit = "10MB"
categorization_examples_limit = 4
}
data_description = {
time_field = "@timestamp"
time_format = "epoch_ms"
}
model_snapshot_retention_days = 10
daily_model_snapshot_retention_after_days = 1
}

resource "elasticstack_elasticsearch_ml_datafeed" "datafeed_nginx" {
datafeed_id = var.datafeed_id
job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.nginx.job_id
query = jsonencode({
bool = {
must = [
{
match_all = {}
}
]
}
})
indices = [elasticstack_elasticsearch_index.test.name]
}

resource "elasticstack_elasticsearch_ml_job_state" "nginx" {
job_id = elasticstack_elasticsearch_ml_anomaly_detection_job.nginx.job_id
state = "closed"
}

resource "elasticstack_elasticsearch_ml_datafeed_state" "nginx" {
datafeed_id = elasticstack_elasticsearch_ml_datafeed.datafeed_nginx.datafeed_id
state = "stopped"
force = true

depends_on = [
elasticstack_elasticsearch_ml_datafeed.datafeed_nginx,
elasticstack_elasticsearch_ml_job_state.nginx
]
}
Loading