Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion router/pkg/config/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@
},
"object_path": {
"type": "string",
"description": "The path to the execution config in the storage provider. The path is used to download the execution config from the storage provider."
"description": "The path to the execution config in the storage provider. The path is used to download the execution config from the storage provider. If the path ends with `.zst` (zstd compressed) or `.gz` (gzip compressed), the file will be decompressed before reading."
}
}
}
Expand Down
52 changes: 45 additions & 7 deletions router/pkg/routerconfig/s3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@ package s3
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"path/filepath"
"strings"
"time"

"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/zstd"

"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/wundergraph/cosmo/router/pkg/controlplane/configpoller"
Expand Down Expand Up @@ -66,7 +72,7 @@ func NewClient(endpoint string, options *ClientOptions) (routerconfig.Client, er
return client, nil
}

func (c Client) getConfigFile(ctx context.Context, version string, modifiedSince time.Time) ([]byte, error) {
func (c Client) getConfigFile(ctx context.Context, modifiedSince time.Time) ([]byte, error) {
options := minio.GetObjectOptions{}

if !modifiedSince.IsZero() {
Expand All @@ -82,19 +88,51 @@ func (c Client) getConfigFile(ctx context.Context, version string, modifiedSince
}
}

reader, err := c.client.GetObject(ctx, c.options.BucketName, c.options.ObjectPath, options)
minioReader, err := c.client.GetObject(ctx, c.options.BucketName, c.options.ObjectPath, options)
if err != nil {
return nil, err
return nil, fmt.Errorf("error getting config from s3: %w", err)
}

defer minioReader.Close()

var configReader io.Reader

switch strings.ToLower(filepath.Ext(c.options.ObjectPath)) {
case ".gz":
gzipReader, err := gzip.NewReader(minioReader)
if err != nil {
return nil, fmt.Errorf("error creating gzip reader: %w", err)
}

defer gzipReader.Close()

configReader = gzipReader

case ".zst":
zstdReader, err := zstd.NewReader(minioReader)
if err != nil {
return nil, fmt.Errorf("error creating zstd reader: %w", err)
}

defer zstdReader.Close()

configReader = zstdReader
default:
configReader = minioReader
}

result, err := io.ReadAll(configReader)
if err != nil {
return nil, fmt.Errorf("error reading file: %w", err)
}
defer reader.Close()

return io.ReadAll(reader)
return result, nil
}

func (c Client) RouterConfig(ctx context.Context, version string, modifiedSince time.Time) (*routerconfig.Response, error) {
func (c Client) RouterConfig(ctx context.Context, _ string, modifiedSince time.Time) (*routerconfig.Response, error) {
res := &routerconfig.Response{}

body, err := c.getConfigFile(ctx, version, modifiedSince)
body, err := c.getConfigFile(ctx, modifiedSince)
if err != nil {
var minioErr minio.ErrorResponse
if errors.As(err, &minioErr) {
Expand Down
Loading