diff --git a/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/docker-compose.yml b/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/docker-compose.yml new file mode 100644 index 00000000000..0bfa33fa6a1 --- /dev/null +++ b/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/docker-compose.yml @@ -0,0 +1,34 @@ +version: "2.3" +services: + azure-blob-storage-emulator: + image: mcr.microsoft.com/azure-storage/azurite + command: azurite-blob --blobHost 0.0.0.0 --blobPort 10000 + ports: + - "10000/tcp" + uploader: + image: mcr.microsoft.com/azure-cli + depends_on: + - azure-blob-storage-emulator + volumes: + - ./sample_logs:/sample_logs + entrypoint: > + sh -c " + sleep 5 && + export AZURE_STORAGE_CONNECTION_STRING='DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azure-blob-storage-emulator:10000/devstoreaccount1;' && + az storage container create --name test-container && + az storage blob upload --container-name test-container --file /sample_logs/test-alerts-v2.csv.gz --name test-alerts-v2.csv.gz + " + gcs-mock-service: + image: golang:1.24.7-alpine + working_dir: /app + volumes: + - ./gcs-mock-service:/app + - ./files/manifest.yml:/files/manifest.yml:ro + - ./sample_logs/:/data + ports: + - "4443/tcp" + healthcheck: + test: "wget --no-verbose --tries=1 --spider http://localhost:4443/health || exit 1" + interval: 10s + timeout: 5s + command: go run main.go -manifest /files/manifest.yml diff --git a/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/files/manifest.yml b/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/files/manifest.yml new file mode 100644 index 00000000000..2492c4f47f2 --- /dev/null +++ b/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/files/manifest.yml @@ -0,0 +1,5 @@ +buckets: + testbucket: + files: + - path: /data/test-alerts-v2.csv.gz + content-type: application/x-gzip diff --git a/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/gcs-mock-service/go.mod b/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/gcs-mock-service/go.mod new file mode 100644 index 00000000000..df08767f7e8 --- /dev/null +++ b/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/gcs-mock-service/go.mod @@ -0,0 +1,5 @@ +module gcs-mock-service + +go 1.24.7 + +require gopkg.in/yaml.v3 v3.0.1 diff --git a/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/gcs-mock-service/go.sum b/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/gcs-mock-service/go.sum new file mode 100644 index 00000000000..a62c313c5b0 --- /dev/null +++ b/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/gcs-mock-service/go.sum @@ -0,0 +1,4 @@ +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/gcs-mock-service/main.go b/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/gcs-mock-service/main.go new file mode 100644 index 00000000000..391e75c7efe --- /dev/null +++ b/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/gcs-mock-service/main.go @@ -0,0 +1,297 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package main + +import ( + "encoding/json" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "strconv" + "strings" + + "gopkg.in/yaml.v3" +) + +func main() { + host := flag.String("host", "0.0.0.0", "host to listen on") + port := flag.String("port", "4443", "port to listen on") + manifest := flag.String("manifest", "", "path to YAML manifest file for preloading buckets and objects") + flag.Parse() + + addr := fmt.Sprintf("%s:%s", *host, *port) + + fmt.Printf("Starting mock GCS server on http://%s\n", addr) + if *manifest != "" { + m, err := readManifest(*manifest) + if err != nil { + log.Fatalf("error reading manifest: %v", err) + } + if err := processManifest(m); err != nil { + log.Fatalf("error processing manifest: %v", err) + } + } else { + fmt.Println("Store is empty. Create buckets and objects via API calls.") + } + + // setup HTTP handlers + mux := http.NewServeMux() + // health check + mux.HandleFunc("/health", healthHandler) + // standard gcs api calls + mux.HandleFunc("GET /storage/v1/b/{bucket}/o", handleListObjects) + mux.HandleFunc("GET /storage/v1/b/{bucket}/o/{object...}", handleGetObject) + mux.HandleFunc("POST /storage/v1/b", handleCreateBucket) + mux.HandleFunc("POST /upload/storage/v1/b/{bucket}/o", handleUploadObject) + mux.HandleFunc("POST /upload/storage/v1/b/{bucket}/o/{object...}", handleUploadObject) + // direct path-style gcs sdk calls + mux.HandleFunc("GET /{bucket}/o/{object...}", handleGetObject) + mux.HandleFunc("GET /{bucket}/{object...}", handleGetObject) + // debug: log all requests + loggedMux := loggingMiddleware(mux) + + if err := http.ListenAndServe(addr, loggedMux); err != nil { + log.Fatalf("failed to start server: %v", err) + } +} + +// loggingMiddleware logs incoming HTTP requests. +func loggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Printf("%s %s\n", r.Method, r.URL.Path) + next.ServeHTTP(w, r) + }) +} + +// readManifest reads and parses the YAML manifest file. +func readManifest(path string) (*Manifest, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read manifest: %w", err) + } + + var manifest Manifest + if err := yaml.Unmarshal(data, &manifest); err != nil { + return nil, fmt.Errorf("failed to parse manifest: %w", err) + } + + return &manifest, nil +} + +// processManifest creates buckets and uploads objects as specified in the manifest. +func processManifest(manifest *Manifest) error { + for bucketName, bucket := range manifest.Buckets { + for _, file := range bucket.Files { + fmt.Printf("preloading data for bucket: %s | path: %s | content-type: %s...\n", + bucketName, file.Path, file.ContentType) + + if err := createBucket(bucketName); err != nil { + return fmt.Errorf("failed to create bucket '%s': %w", bucketName, err) + } + data, err := os.ReadFile(file.Path) + if err != nil { + return fmt.Errorf("failed to read bucket data file '%s': %w", file.Path, err) + } + pathParts := strings.Split(file.Path, "/") + if _, err := uploadObject(bucketName, pathParts[len(pathParts)-1], data, file.ContentType); err != nil { + return fmt.Errorf("failed to create object '%s' in bucket '%s': %w", file.Path, bucketName, err) + } + } + } + return nil +} + +// healthHandler responds with a simple "OK" message for health checks. +func healthHandler(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, "OK") +} + +// handleListObjects lists all objects in the specified bucket. +func handleListObjects(w http.ResponseWriter, r *http.Request) { + bucketName := r.PathValue("bucket") + + if bucket, ok := inMemoryStore[bucketName]; ok { + response := GCSListResponse{ + Kind: "storage#objects", + Items: []GCSObject{}, + } + for name, object := range bucket { + item := GCSObject{ + Kind: "storage#object", + Name: name, + Bucket: bucketName, + Size: strconv.Itoa(len(object.Data)), + ContentType: object.ContentType, + } + response.Items = append(response.Items, item) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + return + } + http.Error(w, "not found", http.StatusNotFound) +} + +// handleGetObject retrieves a specific object from a bucket. +func handleGetObject(w http.ResponseWriter, r *http.Request) { + bucketName := r.PathValue("bucket") + objectName := r.PathValue("object") + + if bucketName == "" || objectName == "" { + http.Error(w, "not found: invalid URL format", http.StatusNotFound) + return + } + + if bucket, ok := inMemoryStore[bucketName]; ok { + if object, ok := bucket[objectName]; ok { + w.Header().Set("Content-Type", object.ContentType) + w.Write(object.Data) + return + } + } + http.Error(w, "not found", http.StatusNotFound) +} + +// handleCreateBucket creates a new bucket. +func handleCreateBucket(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.NotFound(w, r) + return + } + + var bucketInfo struct { + Name string `json:"name"` + } + if err := json.NewDecoder(r.Body).Decode(&bucketInfo); err != nil { + http.Error(w, "invalid JSON body", http.StatusBadRequest) + return + } + if bucketInfo.Name == "" { + http.Error(w, "bucket name is required", http.StatusBadRequest) + return + } + + if err := createBucket(bucketInfo.Name); err != nil { + http.Error(w, err.Error(), http.StatusConflict) + return + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(bucketInfo) +} + +// handleUploadObject uploads an object to a specified bucket. +func handleUploadObject(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.NotFound(w, r) + return + } + + bucketName := r.PathValue("bucket") + objectName := r.URL.Query().Get("name") + if objectName == "" { + objectName = r.PathValue("object") + } + + if bucketName == "" || objectName == "" { + http.Error(w, "missing bucket or object name", http.StatusBadRequest) + return + } + + data, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "failed to read request body", http.StatusInternalServerError) + return + } + defer r.Body.Close() + + contentType := r.Header.Get("Content-Type") + if contentType == "" { + contentType = "application/octet-stream" + } + + response, err := uploadObject(bucketName, objectName, data, contentType) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +func createBucket(bucketName string) error { + if _, exists := inMemoryStore[bucketName]; exists { + return fmt.Errorf("bucket already exists") + } + inMemoryStore[bucketName] = make(map[string]ObjectData) + log.Printf("created bucket: %s", bucketName) + return nil +} + +func uploadObject(bucketName, objectName string, data []byte, contentType string) (*GCSObject, error) { + if _, ok := inMemoryStore[bucketName]; !ok { + return nil, fmt.Errorf("bucket not found") + } + + inMemoryStore[bucketName][objectName] = ObjectData{ + Data: data, + ContentType: contentType, + } + log.Printf("created object '%s' in bucket '%s' with Content-Type '%s'", + objectName, bucketName, contentType) + + return &GCSObject{ + Kind: "storage#object", + Name: objectName, + Bucket: bucketName, + Size: strconv.Itoa(len(data)), + ContentType: contentType, + }, nil +} + +// The in-memory store to hold ObjectData structs. +var inMemoryStore = make(map[string]map[string]ObjectData) + +// ObjectData stores the raw data and its content type. +type ObjectData struct { + Data []byte + ContentType string +} + +// GCSListResponse mimics the structure of a real GCS object list response. +type GCSListResponse struct { + Kind string `json:"kind"` + Items []GCSObject `json:"items"` +} + +// GCSObject mimics the structure of a GCS object resource with ContentType. +type GCSObject struct { + Kind string `json:"kind"` + Name string `json:"name"` + Bucket string `json:"bucket"` + Size string `json:"size"` + ContentType string `json:"contentType"` +} + +// Manifest represents the top-level structure of the YAML file +type Manifest struct { + Buckets map[string]Bucket `yaml:"buckets"` +} + +// Bucket represents each bucket and its files +type Bucket struct { + Files []File `yaml:"files"` +} + +// File represents each file entry inside a bucket +type File struct { + Path string `yaml:"path"` + ContentType string `yaml:"content-type"` +} diff --git a/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/sample_logs/test-alerts-v2.csv.gz b/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/sample_logs/test-alerts-v2.csv.gz new file mode 100644 index 00000000000..1131ae856f8 Binary files /dev/null and b/packages/netskope/data_stream/alerts_v2/_dev/deploy/docker/sample_logs/test-alerts-v2.csv.gz differ diff --git a/packages/netskope/data_stream/alerts_v2/_dev/deploy/tf/main.tf b/packages/netskope/data_stream/alerts_v2/_dev/deploy/tf/main.tf index 973c81fbcff..3c384a9b0ba 100644 --- a/packages/netskope/data_stream/alerts_v2/_dev/deploy/tf/main.tf +++ b/packages/netskope/data_stream/alerts_v2/_dev/deploy/tf/main.tf @@ -2,21 +2,21 @@ provider "aws" { region = "us-east-1" default_tags { tags = { - environment = var.ENVIRONMENT - repo = var.REPO - branch = var.BRANCH - build = var.BUILD_ID - created_date = var.CREATED_DATE + aws_environment = var.ENVIRONMENT + aws_repo = var.REPO + aws_branch = var.BRANCH + aws_build = var.BUILD_ID + aws_created_date = var.CREATED_DATE } } } -resource "aws_s3_bucket" "bucket" { - bucket = "elastic-package-netskope-alert-v2-bucket-${var.TEST_RUN_ID}" +resource "aws_s3_bucket" "aws_bucket" { + bucket = "elastic-package-netskope-bucket-${var.TEST_RUN_ID}" } -resource "aws_sqs_queue" "queue" { - name = "elastic-package-netskope-alert-v2-queue-${var.TEST_RUN_ID}" +resource "aws_sqs_queue" "aws_queue" { + name = "elastic-package-netskope-queue-${var.TEST_RUN_ID}" policy = < + sh -c " + sleep 5 && + export AZURE_STORAGE_CONNECTION_STRING='DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azure-blob-storage-emulator:10000/devstoreaccount1;' && + az storage container create --name test-container && + az storage blob upload --container-name test-container --file /sample_logs/events.csv.gz --name events.csv.gz + " + gcs-mock-service: + image: golang:1.24.7-alpine + working_dir: /app + volumes: + - ./gcs-mock-service:/app + - ./files/manifest.yml:/files/manifest.yml:ro + - ./sample_logs/:/data + ports: + - "4443/tcp" + healthcheck: + test: "wget --no-verbose --tries=1 --spider http://localhost:4443/health || exit 1" + interval: 10s + timeout: 5s + command: go run main.go -manifest /files/manifest.yml diff --git a/packages/netskope/data_stream/events_v2/_dev/deploy/docker/files/manifest.yml b/packages/netskope/data_stream/events_v2/_dev/deploy/docker/files/manifest.yml new file mode 100644 index 00000000000..370c9f9c3cb --- /dev/null +++ b/packages/netskope/data_stream/events_v2/_dev/deploy/docker/files/manifest.yml @@ -0,0 +1,5 @@ +buckets: + testbucket: + files: + - path: /data/events.csv.gz + content-type: application/x-gzip diff --git a/packages/netskope/data_stream/events_v2/_dev/deploy/docker/gcs-mock-service/go.mod b/packages/netskope/data_stream/events_v2/_dev/deploy/docker/gcs-mock-service/go.mod new file mode 100644 index 00000000000..df08767f7e8 --- /dev/null +++ b/packages/netskope/data_stream/events_v2/_dev/deploy/docker/gcs-mock-service/go.mod @@ -0,0 +1,5 @@ +module gcs-mock-service + +go 1.24.7 + +require gopkg.in/yaml.v3 v3.0.1 diff --git a/packages/netskope/data_stream/events_v2/_dev/deploy/docker/gcs-mock-service/go.sum b/packages/netskope/data_stream/events_v2/_dev/deploy/docker/gcs-mock-service/go.sum new file mode 100644 index 00000000000..a62c313c5b0 --- /dev/null +++ b/packages/netskope/data_stream/events_v2/_dev/deploy/docker/gcs-mock-service/go.sum @@ -0,0 +1,4 @@ +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/packages/netskope/data_stream/events_v2/_dev/deploy/docker/gcs-mock-service/main.go b/packages/netskope/data_stream/events_v2/_dev/deploy/docker/gcs-mock-service/main.go new file mode 100644 index 00000000000..391e75c7efe --- /dev/null +++ b/packages/netskope/data_stream/events_v2/_dev/deploy/docker/gcs-mock-service/main.go @@ -0,0 +1,297 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package main + +import ( + "encoding/json" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "strconv" + "strings" + + "gopkg.in/yaml.v3" +) + +func main() { + host := flag.String("host", "0.0.0.0", "host to listen on") + port := flag.String("port", "4443", "port to listen on") + manifest := flag.String("manifest", "", "path to YAML manifest file for preloading buckets and objects") + flag.Parse() + + addr := fmt.Sprintf("%s:%s", *host, *port) + + fmt.Printf("Starting mock GCS server on http://%s\n", addr) + if *manifest != "" { + m, err := readManifest(*manifest) + if err != nil { + log.Fatalf("error reading manifest: %v", err) + } + if err := processManifest(m); err != nil { + log.Fatalf("error processing manifest: %v", err) + } + } else { + fmt.Println("Store is empty. Create buckets and objects via API calls.") + } + + // setup HTTP handlers + mux := http.NewServeMux() + // health check + mux.HandleFunc("/health", healthHandler) + // standard gcs api calls + mux.HandleFunc("GET /storage/v1/b/{bucket}/o", handleListObjects) + mux.HandleFunc("GET /storage/v1/b/{bucket}/o/{object...}", handleGetObject) + mux.HandleFunc("POST /storage/v1/b", handleCreateBucket) + mux.HandleFunc("POST /upload/storage/v1/b/{bucket}/o", handleUploadObject) + mux.HandleFunc("POST /upload/storage/v1/b/{bucket}/o/{object...}", handleUploadObject) + // direct path-style gcs sdk calls + mux.HandleFunc("GET /{bucket}/o/{object...}", handleGetObject) + mux.HandleFunc("GET /{bucket}/{object...}", handleGetObject) + // debug: log all requests + loggedMux := loggingMiddleware(mux) + + if err := http.ListenAndServe(addr, loggedMux); err != nil { + log.Fatalf("failed to start server: %v", err) + } +} + +// loggingMiddleware logs incoming HTTP requests. +func loggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Printf("%s %s\n", r.Method, r.URL.Path) + next.ServeHTTP(w, r) + }) +} + +// readManifest reads and parses the YAML manifest file. +func readManifest(path string) (*Manifest, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read manifest: %w", err) + } + + var manifest Manifest + if err := yaml.Unmarshal(data, &manifest); err != nil { + return nil, fmt.Errorf("failed to parse manifest: %w", err) + } + + return &manifest, nil +} + +// processManifest creates buckets and uploads objects as specified in the manifest. +func processManifest(manifest *Manifest) error { + for bucketName, bucket := range manifest.Buckets { + for _, file := range bucket.Files { + fmt.Printf("preloading data for bucket: %s | path: %s | content-type: %s...\n", + bucketName, file.Path, file.ContentType) + + if err := createBucket(bucketName); err != nil { + return fmt.Errorf("failed to create bucket '%s': %w", bucketName, err) + } + data, err := os.ReadFile(file.Path) + if err != nil { + return fmt.Errorf("failed to read bucket data file '%s': %w", file.Path, err) + } + pathParts := strings.Split(file.Path, "/") + if _, err := uploadObject(bucketName, pathParts[len(pathParts)-1], data, file.ContentType); err != nil { + return fmt.Errorf("failed to create object '%s' in bucket '%s': %w", file.Path, bucketName, err) + } + } + } + return nil +} + +// healthHandler responds with a simple "OK" message for health checks. +func healthHandler(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, "OK") +} + +// handleListObjects lists all objects in the specified bucket. +func handleListObjects(w http.ResponseWriter, r *http.Request) { + bucketName := r.PathValue("bucket") + + if bucket, ok := inMemoryStore[bucketName]; ok { + response := GCSListResponse{ + Kind: "storage#objects", + Items: []GCSObject{}, + } + for name, object := range bucket { + item := GCSObject{ + Kind: "storage#object", + Name: name, + Bucket: bucketName, + Size: strconv.Itoa(len(object.Data)), + ContentType: object.ContentType, + } + response.Items = append(response.Items, item) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + return + } + http.Error(w, "not found", http.StatusNotFound) +} + +// handleGetObject retrieves a specific object from a bucket. +func handleGetObject(w http.ResponseWriter, r *http.Request) { + bucketName := r.PathValue("bucket") + objectName := r.PathValue("object") + + if bucketName == "" || objectName == "" { + http.Error(w, "not found: invalid URL format", http.StatusNotFound) + return + } + + if bucket, ok := inMemoryStore[bucketName]; ok { + if object, ok := bucket[objectName]; ok { + w.Header().Set("Content-Type", object.ContentType) + w.Write(object.Data) + return + } + } + http.Error(w, "not found", http.StatusNotFound) +} + +// handleCreateBucket creates a new bucket. +func handleCreateBucket(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.NotFound(w, r) + return + } + + var bucketInfo struct { + Name string `json:"name"` + } + if err := json.NewDecoder(r.Body).Decode(&bucketInfo); err != nil { + http.Error(w, "invalid JSON body", http.StatusBadRequest) + return + } + if bucketInfo.Name == "" { + http.Error(w, "bucket name is required", http.StatusBadRequest) + return + } + + if err := createBucket(bucketInfo.Name); err != nil { + http.Error(w, err.Error(), http.StatusConflict) + return + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(bucketInfo) +} + +// handleUploadObject uploads an object to a specified bucket. +func handleUploadObject(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.NotFound(w, r) + return + } + + bucketName := r.PathValue("bucket") + objectName := r.URL.Query().Get("name") + if objectName == "" { + objectName = r.PathValue("object") + } + + if bucketName == "" || objectName == "" { + http.Error(w, "missing bucket or object name", http.StatusBadRequest) + return + } + + data, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "failed to read request body", http.StatusInternalServerError) + return + } + defer r.Body.Close() + + contentType := r.Header.Get("Content-Type") + if contentType == "" { + contentType = "application/octet-stream" + } + + response, err := uploadObject(bucketName, objectName, data, contentType) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +func createBucket(bucketName string) error { + if _, exists := inMemoryStore[bucketName]; exists { + return fmt.Errorf("bucket already exists") + } + inMemoryStore[bucketName] = make(map[string]ObjectData) + log.Printf("created bucket: %s", bucketName) + return nil +} + +func uploadObject(bucketName, objectName string, data []byte, contentType string) (*GCSObject, error) { + if _, ok := inMemoryStore[bucketName]; !ok { + return nil, fmt.Errorf("bucket not found") + } + + inMemoryStore[bucketName][objectName] = ObjectData{ + Data: data, + ContentType: contentType, + } + log.Printf("created object '%s' in bucket '%s' with Content-Type '%s'", + objectName, bucketName, contentType) + + return &GCSObject{ + Kind: "storage#object", + Name: objectName, + Bucket: bucketName, + Size: strconv.Itoa(len(data)), + ContentType: contentType, + }, nil +} + +// The in-memory store to hold ObjectData structs. +var inMemoryStore = make(map[string]map[string]ObjectData) + +// ObjectData stores the raw data and its content type. +type ObjectData struct { + Data []byte + ContentType string +} + +// GCSListResponse mimics the structure of a real GCS object list response. +type GCSListResponse struct { + Kind string `json:"kind"` + Items []GCSObject `json:"items"` +} + +// GCSObject mimics the structure of a GCS object resource with ContentType. +type GCSObject struct { + Kind string `json:"kind"` + Name string `json:"name"` + Bucket string `json:"bucket"` + Size string `json:"size"` + ContentType string `json:"contentType"` +} + +// Manifest represents the top-level structure of the YAML file +type Manifest struct { + Buckets map[string]Bucket `yaml:"buckets"` +} + +// Bucket represents each bucket and its files +type Bucket struct { + Files []File `yaml:"files"` +} + +// File represents each file entry inside a bucket +type File struct { + Path string `yaml:"path"` + ContentType string `yaml:"content-type"` +} diff --git a/packages/netskope/data_stream/events_v2/_dev/deploy/docker/sample_logs/events.csv.gz b/packages/netskope/data_stream/events_v2/_dev/deploy/docker/sample_logs/events.csv.gz new file mode 100644 index 00000000000..9e796c8db34 Binary files /dev/null and b/packages/netskope/data_stream/events_v2/_dev/deploy/docker/sample_logs/events.csv.gz differ diff --git a/packages/netskope/data_stream/events_v2/_dev/deploy/tf/env.yml b/packages/netskope/data_stream/events_v2/_dev/deploy/tf/env.yml index aee5f1c5900..dcef36e04dc 100644 --- a/packages/netskope/data_stream/events_v2/_dev/deploy/tf/env.yml +++ b/packages/netskope/data_stream/events_v2/_dev/deploy/tf/env.yml @@ -2,6 +2,10 @@ version: '2.3' services: terraform: environment: + - GOOGLE_CLOUD_KEYFILE_JSON=${GOOGLE_CLOUD_KEYFILE_JSON} + - GCLOUD_PROJECT=${GCLOUD_PROJECT} + - GOOGLE_REGION=${GOOGLE_REGION:-US} + - TF_VAR_BUCKET_REGION=${GOOGLE_REGION:-US} - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} - AWS_SESSION_TOKEN=${AWS_SESSION_TOKEN} diff --git a/packages/netskope/data_stream/events_v2/_dev/deploy/tf/main.tf b/packages/netskope/data_stream/events_v2/_dev/deploy/tf/main.tf index eb80e941a15..939d1b476ba 100644 --- a/packages/netskope/data_stream/events_v2/_dev/deploy/tf/main.tf +++ b/packages/netskope/data_stream/events_v2/_dev/deploy/tf/main.tf @@ -2,20 +2,20 @@ provider "aws" { region = "us-east-1" default_tags { tags = { - environment = var.ENVIRONMENT - repo = var.REPO - branch = var.BRANCH - build = var.BUILD_ID - created_date = var.CREATED_DATE + aws_environment = var.ENVIRONMENT + aws_repo = var.REPO + aws_branch = var.BRANCH + aws_build = var.BUILD_ID + aws_created_date = var.CREATED_DATE } } } -resource "aws_s3_bucket" "bucket" { +resource "aws_s3_bucket" "aws_bucket" { bucket = "elastic-package-netskope-bucket-${var.TEST_RUN_ID}" } -resource "aws_sqs_queue" "queue" { +resource "aws_sqs_queue" "aws_queue" { name = "elastic-package-netskope-queue-${var.TEST_RUN_ID}" policy = <