Skip to content

Commit 5fbe3ad

Browse files
authored
Speed up ES tests (#7606)
## Which problem is this PR solving? - Part of #7167 ## Description of the changes - Avoid health check timeouts in tests - Move bulk processor creation after pings are done ## How was this change tested? - test --------- Signed-off-by: Yuri Shkuro <[email protected]>
1 parent dbeabc7 commit 5fbe3ad

File tree

3 files changed

+24
-20
lines changed

3 files changed

+24
-20
lines changed

internal/storage/elasticsearch/config/config.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -250,20 +250,6 @@ func NewClient(ctx context.Context, c *Configuration, logger *zap.Logger, metric
250250
logger: logger,
251251
}
252252

253-
bulkProc, err := rawClient.BulkProcessor().
254-
Before(func(id int64, _ /* requests */ []elastic.BulkableRequest) {
255-
bcb.startTimes.Store(id, time.Now())
256-
}).
257-
After(bcb.invoke).
258-
BulkSize(c.BulkProcessing.MaxBytes).
259-
Workers(c.BulkProcessing.Workers).
260-
BulkActions(c.BulkProcessing.MaxActions).
261-
FlushInterval(c.BulkProcessing.FlushInterval).
262-
Do(ctx)
263-
if err != nil {
264-
return nil, err
265-
}
266-
267253
if c.Version == 0 {
268254
// Determine ElasticSearch Version
269255
pingResult, _, err := rawClient.Ping(c.Servers[0]).Do(ctx)
@@ -302,6 +288,20 @@ func NewClient(ctx context.Context, c *Configuration, logger *zap.Logger, metric
302288
}
303289
}
304290

291+
bulkProc, err := rawClient.BulkProcessor().
292+
Before(func(id int64, _ /* requests */ []elastic.BulkableRequest) {
293+
bcb.startTimes.Store(id, time.Now())
294+
}).
295+
After(bcb.invoke).
296+
BulkSize(c.BulkProcessing.MaxBytes).
297+
Workers(c.BulkProcessing.Workers).
298+
BulkActions(c.BulkProcessing.MaxActions).
299+
FlushInterval(c.BulkProcessing.FlushInterval).
300+
Do(ctx)
301+
if err != nil {
302+
return nil, err
303+
}
304+
305305
return eswrapper.WrapESClient(rawClient, bulkProc, c.Version, rawClientV8), nil
306306
}
307307

internal/storage/v1/elasticsearch/factory_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,8 +318,9 @@ func TestESStorageFactoryWithConfig(t *testing.T) {
318318
func TestESStorageFactoryWithConfigError(t *testing.T) {
319319
t.Parallel()
320320
cfg := escfg.Configuration{
321-
Servers: []string{"http://127.0.0.1:65535"},
322-
LogLevel: "error",
321+
Servers: []string{"http://invalid-host-name:65535"},
322+
DisableHealthCheck: true,
323+
LogLevel: "error",
323324
}
324325
_, err := NewFactoryBase(context.Background(), cfg, metrics.NullFactory, zap.NewNop())
325326
require.ErrorContains(t, err, "failed to create Elasticsearch client")

internal/storage/v1/elasticsearch/factoryv1_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,15 +114,18 @@ func TestFactoryInitializeErr(t *testing.T) {
114114
expectedErr: "Servers: non zero value required",
115115
},
116116
{
117-
name: "server error",
118-
factory: NewFactory(),
119-
expectedErr: "failed to create Elasticsearch client: health check timeout: Head \"http://127.0.0.1:9200\": dial tcp 127.0.0.1:9200: connect: connection refused: no Elasticsearch node available",
117+
name: "server error",
118+
factory: &Factory{Options: &Options{Config: namespaceConfig{Configuration: escfg.Configuration{
119+
Servers: []string{"http://invalid-host-name:9200"},
120+
DisableHealthCheck: true,
121+
}}}},
122+
expectedErr: "failed to create Elasticsearch client",
120123
},
121124
}
122125
for _, test := range tests {
123126
t.Run(test.name, func(t *testing.T) {
124127
err := test.factory.Initialize(metrics.NullFactory, zaptest.NewLogger(t))
125-
require.EqualError(t, err, test.expectedErr)
128+
require.ErrorContains(t, err, test.expectedErr)
126129
})
127130
}
128131
}

0 commit comments

Comments
 (0)