Skip to content

Commit 9dafc6d

Browse files
Update AWS SDK to v2 and refactor Kinesis integration
Signed-off-by: rideshnath-scout <[email protected]>
1 parent 9a70082 commit 9dafc6d

File tree

5 files changed

+254
-162
lines changed

5 files changed

+254
-162
lines changed

bindings/aws/kinesis/kinesis.go

Lines changed: 33 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ import (
2222
"sync/atomic"
2323
"time"
2424

25-
"github.com/aws/aws-sdk-go/aws"
26-
"github.com/aws/aws-sdk-go/aws/request"
27-
"github.com/aws/aws-sdk-go/service/kinesis"
25+
"github.com/aws/aws-sdk-go-v2/service/kinesis"
26+
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"
2827
"github.com/cenkalti/backoff/v4"
2928
"github.com/google/uuid"
3029
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces"
@@ -44,11 +43,11 @@ type AWSKinesis struct {
4443

4544
worker *worker.Worker
4645

47-
streamName string
48-
consumerName string
49-
consumerARN *string
50-
logger logger.Logger
51-
consumerMode string
46+
streamName string
47+
consumerName string
48+
consumerARN *string
49+
logger logger.Logger
50+
consumerMode string
5251
applicationName string
5352
closed atomic.Bool
5453
closeCh chan struct{}
@@ -146,7 +145,7 @@ func (a *AWSKinesis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*
146145
if partitionKey == "" {
147146
partitionKey = uuid.New().String()
148147
}
149-
_, err := a.authProvider.Kinesis().Kinesis.PutRecordWithContext(ctx, &kinesis.PutRecordInput{
148+
_, err := a.authProvider.Kinesis().Kinesis.PutRecord(ctx, &kinesis.PutRecordInput{
150149
StreamName: &a.metadata.StreamName,
151150
Data: req.Data,
152151
PartitionKey: &partitionKey,
@@ -176,7 +175,7 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er
176175
}
177176
case ExtendedFanout:
178177
var stream *kinesis.DescribeStreamOutput
179-
stream, err = a.authProvider.Kinesis().Kinesis.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName})
178+
stream, err = a.authProvider.Kinesis().Kinesis.DescribeStream(ctx, &kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName})
180179
if err != nil {
181180
return err
182181
}
@@ -210,7 +209,7 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er
210209
}
211210

212211
// Subscribe to all shards.
213-
func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDescription, handler bindings.Handler) error {
212+
func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc types.StreamDescription, handler bindings.Handler) error {
214213
consumerARN, err := a.ensureConsumer(ctx, streamDesc.StreamARN)
215214
if err != nil {
216215
a.logger.Error(err)
@@ -221,7 +220,7 @@ func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDes
221220

222221
a.wg.Add(len(streamDesc.Shards))
223222
for i, shard := range streamDesc.Shards {
224-
go func(idx int, s *kinesis.Shard) {
223+
go func(idx int, s types.Shard) {
225224
defer a.wg.Done()
226225

227226
// Reconnection backoff
@@ -237,14 +236,14 @@ func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDes
237236
return
238237
default:
239238
}
240-
sub, err := a.authProvider.Kinesis().Kinesis.SubscribeToShardWithContext(ctx, &kinesis.SubscribeToShardInput{
239+
sub, err := a.authProvider.Kinesis().Kinesis.SubscribeToShard(ctx, &kinesis.SubscribeToShardInput{
241240
ConsumerARN: consumerARN,
242241
ShardId: s.ShardId,
243-
StartingPosition: &kinesis.StartingPosition{Type: aws.String(kinesis.ShardIteratorTypeLatest)},
242+
StartingPosition: &types.StartingPosition{Type: types.ShardIteratorTypeLatest},
244243
})
245244
if err != nil {
246245
wait := bo.NextBackOff()
247-
a.logger.Errorf("Error while reading from shard %v: %v. Attempting to reconnect in %s...", s.ShardId, err, wait)
246+
a.logger.Errorf("Error while reading from shard %v: %v. Attempting to reconnect in %s...", *s.ShardId, err, wait)
248247
select {
249248
case <-ctx.Done():
250249
return
@@ -257,10 +256,10 @@ func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDes
257256
bo.Reset()
258257

259258
// Process events
260-
for event := range sub.EventStream.Events() {
259+
for event := range sub.GetStream().Events() {
261260
switch e := event.(type) {
262-
case *kinesis.SubscribeToShardEvent:
263-
for _, rec := range e.Records {
261+
case *types.SubscribeToShardEventStreamMemberSubscribeToShardEvent:
262+
for _, rec := range e.Value.Records {
264263
handler(ctx, &bindings.ReadResponse{
265264
Data: rec.Data,
266265
})
@@ -289,7 +288,7 @@ func (a *AWSKinesis) ensureConsumer(ctx context.Context, streamARN *string) (*st
289288
// Only set timeout on consumer call.
290289
conCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
291290
defer cancel()
292-
consumer, err := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumerWithContext(conCtx, &kinesis.DescribeStreamConsumerInput{
291+
consumer, err := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumer(conCtx, &kinesis.DescribeStreamConsumerInput{
293292
ConsumerName: &a.metadata.ConsumerName,
294293
StreamARN: streamARN,
295294
})
@@ -301,7 +300,7 @@ func (a *AWSKinesis) ensureConsumer(ctx context.Context, streamARN *string) (*st
301300
}
302301

303302
func (a *AWSKinesis) registerConsumer(ctx context.Context, streamARN *string) (*string, error) {
304-
consumer, err := a.authProvider.Kinesis().Kinesis.RegisterStreamConsumerWithContext(ctx, &kinesis.RegisterStreamConsumerInput{
303+
consumer, err := a.authProvider.Kinesis().Kinesis.RegisterStreamConsumer(ctx, &kinesis.RegisterStreamConsumerInput{
305304
ConsumerName: &a.metadata.ConsumerName,
306305
StreamARN: streamARN,
307306
})
@@ -324,7 +323,7 @@ func (a *AWSKinesis) deregisterConsumer(ctx context.Context, streamARN *string,
324323
if a.consumerARN != nil {
325324
// Use a background context because the running context may have been canceled already
326325
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
327-
_, err := a.authProvider.Kinesis().Kinesis.DeregisterStreamConsumerWithContext(ctx, &kinesis.DeregisterStreamConsumerInput{
326+
_, err := a.authProvider.Kinesis().Kinesis.DeregisterStreamConsumer(ctx, &kinesis.DeregisterStreamConsumerInput{
328327
ConsumerARN: consumerARN,
329328
StreamARN: streamARN,
330329
ConsumerName: &a.metadata.ConsumerName,
@@ -337,34 +336,19 @@ func (a *AWSKinesis) deregisterConsumer(ctx context.Context, streamARN *string,
337336
return nil
338337
}
339338

340-
func (a *AWSKinesis) waitUntilConsumerExists(ctx aws.Context, input *kinesis.DescribeStreamConsumerInput, opts ...request.WaiterOption) error {
341-
w := request.Waiter{
342-
Name: "WaitUntilConsumerExists",
343-
MaxAttempts: 18,
344-
Delay: request.ConstantWaiterDelay(10 * time.Second),
345-
Acceptors: []request.WaiterAcceptor{
346-
{
347-
State: request.SuccessWaiterState,
348-
Matcher: request.PathWaiterMatch, Argument: "ConsumerDescription.ConsumerStatus",
349-
Expected: "ACTIVE",
350-
},
351-
},
352-
NewRequest: func(opts []request.Option) (*request.Request, error) {
353-
var inCpy *kinesis.DescribeStreamConsumerInput
354-
if input != nil {
355-
tmp := *input
356-
inCpy = &tmp
357-
}
358-
req, _ := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumerRequest(inCpy)
359-
req.SetContext(ctx)
360-
req.ApplyOptions(opts...)
361-
362-
return req, nil
363-
},
339+
func (a *AWSKinesis) waitUntilConsumerExists(ctx context.Context, input *kinesis.DescribeStreamConsumerInput) error {
340+
// Poll until consumer is active
341+
for i := 0; i < 18; i++ {
342+
consumer, err := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumer(ctx, input)
343+
if err != nil {
344+
return err
345+
}
346+
if consumer.ConsumerDescription.ConsumerStatus == types.ConsumerStatusActive {
347+
return nil
348+
}
349+
time.Sleep(10 * time.Second)
364350
}
365-
w.ApplyOptions(opts...)
366-
367-
return w.WaitWithContext(ctx)
351+
return fmt.Errorf("consumer did not become active within timeout")
368352
}
369353

370354
func (a *AWSKinesis) parseMetadata(meta bindings.Metadata) (*kinesisMetadata, error) {
@@ -393,7 +377,7 @@ func (r *recordProcessorFactory) CreateProcessor() interfaces.IRecordProcessor {
393377
}
394378

395379
func (p *recordProcessor) Initialize(input *interfaces.InitializationInput) {
396-
p.logger.Infof("Processing ShardId: %v at checkpoint: %v", input.ShardId, aws.StringValue(input.ExtendedSequenceNumber.SequenceNumber))
380+
p.logger.Infof("Processing ShardId: %v at checkpoint: %v", input.ShardId, *input.ExtendedSequenceNumber.SequenceNumber)
397381
}
398382

399383
func (p *recordProcessor) ProcessRecords(input *interfaces.ProcessRecordsInput) {

common/authentication/aws/client.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,11 @@ import (
2121
"github.com/aws/aws-sdk-go-v2/aws"
2222
awsv2config "github.com/aws/aws-sdk-go-v2/config"
2323
v2creds "github.com/aws/aws-sdk-go-v2/credentials"
24+
kinesisv2 "github.com/aws/aws-sdk-go-v2/service/kinesis"
2425
"github.com/aws/aws-sdk-go/aws/credentials"
2526
"github.com/aws/aws-sdk-go/aws/session"
2627
"github.com/aws/aws-sdk-go/service/dynamodb"
2728
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
28-
"github.com/aws/aws-sdk-go/service/kinesis"
29-
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
3029
"github.com/aws/aws-sdk-go/service/s3"
3130
"github.com/aws/aws-sdk-go/service/s3/s3manager"
3231
"github.com/aws/aws-sdk-go/service/secretsmanager"
@@ -118,7 +117,7 @@ type ParameterStoreClients struct {
118117
}
119118

120119
type KinesisClients struct {
121-
Kinesis kinesisiface.KinesisAPI
120+
Kinesis *kinesisv2.Client
122121
Region string
123122
Credentials *credentials.Credentials
124123
V2Credentials aws.CredentialsProvider
@@ -174,19 +173,24 @@ func (c *ParameterStoreClients) New(session *session.Session) {
174173
}
175174

176175
func (c *KinesisClients) New(session *session.Session) {
177-
c.Kinesis = kinesis.New(session, session.Config)
178176
c.Region = *session.Config.Region
179177
c.Credentials = session.Config.Credentials
180-
// Convert v1 credentials to v2 for KCL usage
178+
// Convert v1 credentials to v2 for both Kinesis client and KCL usage
181179
if v1Creds, err := session.Config.Credentials.Get(); err == nil {
182180
c.V2Credentials = v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken)
181+
// Create v2 config and Kinesis client
182+
v2Config := aws.Config{
183+
Region: c.Region,
184+
Credentials: c.V2Credentials,
185+
}
186+
c.Kinesis = kinesisv2.NewFromConfig(v2Config)
183187
}
184188
}
185189

186190
func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string, error) {
187191
if c.Kinesis != nil {
188-
stream, err := c.Kinesis.DescribeStreamWithContext(ctx, &kinesis.DescribeStreamInput{
189-
StreamName: aws.String(streamName),
192+
stream, err := c.Kinesis.DescribeStream(ctx, &kinesisv2.DescribeStreamInput{
193+
StreamName: &streamName,
190194
})
191195
if err != nil {
192196
return nil, err

0 commit comments

Comments
 (0)