diff --git a/tools/cloud_run/event_driven_bq_ingest/README.md b/tools/cloud_run/event_driven_bq_ingest/README.md new file mode 100644 index 000000000..c6fa6ff43 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/README.md @@ -0,0 +1,95 @@ +# Cloud Run Event Driven Pipeline Tutorial + +This sample shows how to create an event driven pipeline on Cloud Run via Pub/Sub + +[![Open in Cloud Shell](https://gstatic.com/cloudssh/images/open-btn.png)](https://ssh.cloud.google.com/cloudshell/open?cloudshell_git_repo=https://github.com/yrvine-g/bigquery-utils.git&cloudshell_tutorial=tools/cloud_run/event_driven_bq_ingest/README.md) + + + +## Dependencies + +* **Spring Boot**: Web server framework. +* **Jib**: Container build tool. + +## Setup +Create GCS bucket and a folder path with the following format: +gs://bucket/project/dataset/table_name/*.avro + +Enable following APIs: +* container registry +* cloud run handler + +```sh +gcloud config set project MY_PROJECT +``` + +Configure environment variables: + +```sh +export MY_RUN_SERVICE=run-service +export MY_RUN_CONTAINER=run-container +export PROJECT=$(gcloud config get-value project) +export MY_GCS_BUCKET="$(gcloud config get-value project)-gcs-bucket" +export REGION=us-central1 +export SERVICE_ACCOUNT=cloud-run-pubsub-invoker +``` + +## Quickstart + +Use the [Jib Maven Plugin](https://github.com/GoogleContainerTools/jib/tree/master/jib-maven-plugin) to build and push container image: + +Switch to directory with .pom + +```sh +mvn compile jib:build -Dimage=gcr.io/$PROJECT/$MY_RUN_CONTAINER +``` + +Deploy Cloud Run service: +```sh +gcloud config set run/region $REGION +gcloud run deploy $MY_RUN_SERVICE \ +--image gcr.io/$PROJECT/$MY_RUN_CONTAINER \ +--no-allow-unauthenticated +``` + + +Create PubSub Topic and GCS notification +```sh +gcloud pubsub topics create pipelineNotification + +gsutil notification create -f json -t pipelineNotification -e OBJECT_FINALIZE gs://"$MY_GCS_BUCKET" +``` + + +Create Service Account for Cloud Run and Pub/Sub permissions +```sh +gcloud iam service-accounts create $SERVICE_ACCOUNT \ +--display-name "Cloud Run Pub/Sub Invoker" +gcloud run services add-iam-policy-binding $MY_RUN_SERVICE \ +--member=serviceAccount:$SERVICE_ACCOUNT@$PROJECT.iam.gserviceaccount.com \ +--role=roles/run.invoker +``` + +Create Pub/Sub Subscription +```sh +export RUN_SERVICE_URL=$(gcloud run services describe $MY_RUN_SERVICE --format='value(status.url)') +gcloud pubsub subscriptions create pipelineTrigger --topic pipelineNotification \ + --push-endpoint=$RUN_SERVICE_URL \ + --push-auth-service-account=$SERVICE_ACCOUNT@$PROJECT.iam.gserviceaccount.com \ + --ack-deadline=600 +``` + + +Create log sink +```shell +export PROJECT_NO=$(gcloud projects list --filter="$PROJECT" --format="value(PROJECT_NUMBER)") +gcloud logging sinks create bq-job-completed \ +pubsub.googleapis.com/projects/$PROJECT/topics/pipelineNotification \ + --log-filter='resource.type="bigquery_project" +severity=INFO +protoPayload.metadata.jobChange.job.jobStatus.jobState="DONE" +protoPayload.authenticationInfo.principalEmail="'$PROJECT_NO'-compute@developer.gserviceaccount.com"' +export LOG_SERVICE_ACCOUNT=$(gcloud logging sinks describe bq-job-completed --format='value(writerIdentity)') +gcloud pubsub topics add-iam-policy-binding pipelineNotification \ + --member=serviceAccount:LOG_SERVICE_ACCOUNT --role=roles/pubsub.publisher +``` diff --git a/tools/cloud_run/event_driven_bq_ingest/pom.xml b/tools/cloud_run/event_driven_bq_ingest/pom.xml new file mode 100644 index 000000000..a65cf1419 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/pom.xml @@ -0,0 +1,135 @@ + + + + 4.0.0 + com.example.cloudrun + pubsub + 0.0.1-SNAPSHOT + + + com.google.cloud.samples + shared-configuration + 1.0.23 + + + + 11 + 11 + 1.18.12 + 2.13.0 + + + + + + + org.springframework.boot + spring-boot-dependencies + 2.5.5 + pom + import + + + org.springframework.cloud + spring-cloud-dependencies + 2020.0.4 + pom + import + + + com.google.cloud + libraries-bom + 24.0.0 + pom + import + + + + + + org.springframework.boot + spring-boot-starter-web + + + org.apache.commons + commons-lang3 + 3.12.0 + + + org.springframework.boot + spring-boot-starter-test + test + + + com.google.cloud + google-cloud-bigquery + + + com.google.cloud + google-cloud-storage + 2.1.9 + + + junit + junit + 4.13.2 + test + + + org.projectlombok + lombok + ${lombok-version} + true + + + com.fasterxml.jackson.core + jackson-databind + 2.11.0 + + + + com.google.apis + google-api-services-pubsub + v1-rev452-1.25.0 + + + + + + org.springframework.boot + spring-boot-maven-plugin + 2.5.4 + + + com.google.cloud.tools + jib-maven-plugin + 3.1.4 + + + gcr.io/PROJECT_ID/event_driven_pipeline + + + + + + diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java new file mode 100644 index 000000000..aaa21972f --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java @@ -0,0 +1,82 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.cloudrun; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.ExternalTableDefinition; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.QueryJobConfiguration; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +public class BQAccessor { + + private static final String EXTERNAL_TABLE_NAME = "externalTable"; + + public static void insertIntoBQ( + GCSNotificationMetadata.GCSObjectProperties GCSObjectProperties, String fileFormat) { + + // create sourceUri with format --> gs://bucket/project/dataset/table/table* + String sourceUri = + String.format( + "gs://%s/%s/%s/%s/%s*", + GCSObjectProperties.getBucketId(), + GCSObjectProperties.getProject(), + GCSObjectProperties.getDataset(), + GCSObjectProperties.getTable(), + GCSObjectProperties.getTable()); + log.info("source URI is: {}", sourceUri); + + try { + + // Initialize client that will be used to send requests. This client only needs to be created + // once, and can be reused for multiple requests. + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + + FormatOptions format = FormatOptions.of(fileFormat); + + ExternalTableDefinition externalTable = + ExternalTableDefinition.newBuilder(sourceUri, format).build(); + + log.info("external table config: {}", externalTable); + + String query = + String.format( + "INSERT INTO `%s.%s.%s` SELECT * FROM %s", + GCSObjectProperties.getProject(), + GCSObjectProperties.getDataset(), + GCSObjectProperties.getTable(), + EXTERNAL_TABLE_NAME); + QueryJobConfiguration queryConfig = + QueryJobConfiguration.newBuilder(query) + .addTableDefinition(EXTERNAL_TABLE_NAME, externalTable) + .build(); + log.info("query fired: {}", query); + JobInfo jobInfo = JobInfo.of(queryConfig); + Job job = bigquery.create(jobInfo); + JobId jobId = job.getJobId(); + log.info("job id: {}", jobId); + + } catch (Exception e) { + throw new RuntimeException("Exception occured during insertion to BQ", e); + } + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java new file mode 100644 index 000000000..61a19afa6 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java @@ -0,0 +1,67 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.cloudrun; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +public class BigQueryLogMetadata extends GenericMessage { + + private String insertId; + private ProtoPayload protoPayload; + + public BigQueryLogMetadata( + @JsonProperty("insertId") String insertId, + @JsonProperty(value = "protoPayload", required = true) ProtoPayload protoPayload) { + this.insertId = insertId; + this.protoPayload = protoPayload; + } + + public String getInsertId() { + return insertId; + } + + public void setInsertId(String insertId) { + this.insertId = insertId; + } + + public ProtoPayload getProtoPayload() { + return protoPayload; + } + + public void setProtoPayload(ProtoPayload protoPayload) { + this.protoPayload = protoPayload; + } + + public static class ProtoPayload { + private String resourceName; + + public ProtoPayload( + @JsonProperty(value = "resourceName", required = true) String resourceName) { + this.resourceName = resourceName; + } + + public String getResourceName() { + return resourceName; + } + + public void setResourceName(String resourceName) { + this.resourceName = resourceName; + } + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSAccessor.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSAccessor.java new file mode 100644 index 000000000..b556d01d3 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSAccessor.java @@ -0,0 +1,76 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.cloudrun; + +import com.google.api.gax.paging.Page; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.CopyWriter; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; + +import java.util.Arrays; + +import lombok.extern.log4j.Log4j2; + +@Log4j2 +public class GCSAccessor { + + private static final Storage storage = StorageOptions.getDefaultInstance().getService(); + + public static void archiveFiles(String sourceUri) { + + String sourceBucketName = getBucketName(sourceUri); + String targetBucketName = sourceBucketName + "_archival"; + String sourceObjectName = getObjectName(sourceUri); + log.info("source uri: {}", sourceUri); + log.info("sourceBucketName: {}", sourceBucketName); + log.info("targetBucketName: {}", targetBucketName); + log.info("sourceObjectName: {}", sourceObjectName); + + try { + + Page blobs = + storage.list(sourceBucketName, Storage.BlobListOption.prefix(sourceObjectName)); + log.info("Iterating the blobs"); + for (Blob blob : blobs.iterateAll()) { + log.info("Blob is: " + blob.getName()); + if (!blob.getName().equalsIgnoreCase(sourceObjectName)) { + CopyWriter copyWriter = blob.copyTo(targetBucketName, blob.getName()); + Blob copiedBlob = copyWriter.getResult(); + } + blob.delete(); + } + } catch (RuntimeException e) { + log.error("failed to process the message", e); + throw new RuntimeException("failed to archive files", e); + } + } + + public static String getBucketName(String filePath) { + String[] path = filePath.replace("gs://", "").split("/"); + + String bucket = path[0]; + return bucket; + } + + public static String getObjectName(String filePath) { + String[] path = filePath.replace("gs://", "").split("/"); + String objectName = String.join("/", Arrays.copyOfRange(path, 1, path.length)); + + return objectName; + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java new file mode 100644 index 000000000..d670c2c51 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java @@ -0,0 +1,73 @@ +package com.example.cloudrun; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Builder; +import lombok.Value; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +public class GCSNotificationMetadata extends GenericMessage{ + private String name; + private String bucket; + + public GCSNotificationMetadata() {} + + public GCSNotificationMetadata(@JsonProperty(value = "name", required = true) String name, @JsonProperty(value = "bucket", required = true) String bucket) { + this.name = name; + this.bucket = bucket; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getBucket() { + return bucket; + } + + public void setBucket(String bucket) { + this.bucket = bucket; + } + + public GCSObjectProperties getGCSObjectProperties() { + String bucketId = this.bucket; + String objectId = this.name; + + // return null if there is no objectId + if (objectId == null || bucketId == null) { + return null; + } + + String[] parsedObjectId = objectId.split("/"); + + if (parsedObjectId.length < 4) { + throw new RuntimeException("The object id is formatted incorrectly"); + } + String project = parsedObjectId[0]; + String dataset = parsedObjectId[1]; + String table = parsedObjectId[2]; + String triggerFileName = parsedObjectId[3]; + + return GCSObjectProperties.builder() + .bucketId(bucketId) + .project(project) + .dataset(dataset) + .table(table) + .triggerFile(triggerFileName) + .build(); + } + + @Value + @Builder + public static class GCSObjectProperties { + String bucketId; + String project; + String dataset; + String table; + String triggerFile; + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GenericMessage.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GenericMessage.java new file mode 100644 index 000000000..ac57a7d32 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GenericMessage.java @@ -0,0 +1,3 @@ +package com.example.cloudrun; + +public abstract class GenericMessage {} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java new file mode 100644 index 000000000..ff5ca3019 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java @@ -0,0 +1,83 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.cloudrun; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.ExternalTableDefinition; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.JobConfiguration; +import lombok.extern.log4j.Log4j2; + +import java.util.Map; +import java.util.List; +import java.util.ArrayList; + +@Log4j2 +public class JobAccessor { + + public static List checkJobCompeletion(BigQueryLogMetadata bigQueryLogMetadata) { + log.info("Resource Name:{}", bigQueryLogMetadata.getProtoPayload().getResourceName()); + + try { + String resourceName = bigQueryLogMetadata.getProtoPayload().getResourceName(); + String[] parsedName = resourceName.split("/"); + String jobName = parsedName[parsedName.length - 1]; + + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + JobId jobId = JobId.of(jobName); + Job job = bigquery.getJob(jobId); + + List allSourceUris = new ArrayList(); + // check for error + if (job.isDone() && job.getStatus().getError() == null) { + log.info("Job successfully loaded BQ table"); + JobConfiguration jobConfig = job.getConfiguration(); + if (jobConfig instanceof QueryJobConfiguration) { + Map tableConfigs = + ((QueryJobConfiguration) jobConfig).getTableDefinitions(); + tableConfigs.forEach( + (tableKey, tableValue) -> + tableValue + .getSourceUris() + .forEach((item) -> allSourceUris.add(removeWildcard(item)))); + } else { + log.info("job configuration is not an instance"); + } + } else { + log.info( + "BigQuery was unable to load into the table due to an error:" + + job.getStatus().getError()); + throw new RuntimeException( + "BigQuery was unable to load into the table due to an error: " + + job.getStatus().getError().getMessage()); + } + return allSourceUris; + } catch (NullPointerException | BigQueryException e) { + log.info("Job not retrieved. \n" + e); + return null; + } + } + + public static String removeWildcard(String str) { + String[] arrOfStr = str.split("\\*", 0); + return arrOfStr[0]; + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineApplication.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineApplication.java new file mode 100644 index 000000000..fa064b9d2 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineApplication.java @@ -0,0 +1,27 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.cloudrun; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class PipelineApplication { + public static void main(String[] args) { + SpringApplication.run(PipelineApplication.class, args); + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java new file mode 100644 index 000000000..7cbdb9a45 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java @@ -0,0 +1,73 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.cloudrun; + +import lombok.extern.log4j.Log4j2; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.List; +import com.google.api.services.pubsub.model.PubsubMessage; + +@RestController +@Log4j2 +public class PipelineController { + + private static final String TRIGGER_FILE_NAME = "trigger.txt"; + private static final String FILE_FORMAT = "AVRO"; + + @RequestMapping(value = "/", method = RequestMethod.POST) + public ResponseEntity receiveMessage(@RequestBody PipelineRequestBody request) { + // Get PubSub pubSubMessage from request body. + PubsubMessage pubSubMessage = request.getMessage(); + if (pubSubMessage == null) { + log.info("Bad Request: invalid Pub/Sub pubSubMessage format"); + return new ResponseEntity("invalid Pub/Sub pubSubMessage", HttpStatus.BAD_REQUEST); + } + GenericMessage metadata = PubSubMessageParser.parseMessage(pubSubMessage); + if (metadata == null) { + log.info("Bad Request: message does not follow format for data extraction"); + return new ResponseEntity("invalid message data", HttpStatus.BAD_REQUEST); + } else if (metadata instanceof GCSNotificationMetadata) { + // handle gcs notification + GCSNotificationMetadata gcsNotificationMetadata = (GCSNotificationMetadata) metadata; + GCSNotificationMetadata.GCSObjectProperties gcsObjectProperties = + gcsNotificationMetadata.getGCSObjectProperties(); + if (TRIGGER_FILE_NAME.equals(gcsObjectProperties.getTriggerFile())) { + log.info("Found Trigger file, started BQ insert"); + BQAccessor.insertIntoBQ(gcsObjectProperties, FILE_FORMAT); + return new ResponseEntity("triggered successfully", HttpStatus.OK); + } else { + log.info("Not trigger file"); + return new ResponseEntity("Not trigger file", HttpStatus.OK); + } + } else if (metadata instanceof BigQueryLogMetadata) { + // handle bq job complete notification + BigQueryLogMetadata bigQueryLogMetadata = (BigQueryLogMetadata) metadata; + List sourceUris = JobAccessor.checkJobCompeletion(bigQueryLogMetadata); + sourceUris.forEach((sourceUri -> GCSAccessor.archiveFiles(sourceUri))); + return new ResponseEntity("job completed", HttpStatus.OK); + } else { + log.error("failed to process the message"); + return new ResponseEntity("failed to process the message", HttpStatus.OK); + } + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineRequestBody.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineRequestBody.java new file mode 100644 index 000000000..d33343bd3 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineRequestBody.java @@ -0,0 +1,17 @@ +package com.example.cloudrun; + +import com.google.api.services.pubsub.model.PubsubMessage; + +public class PipelineRequestBody { + private PubsubMessage message; + + public PipelineRequestBody() {} + + public PubsubMessage getMessage() { + return message; + } + + public void setMessage(PubsubMessage message) { + this.message = message; + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java new file mode 100644 index 000000000..bf5fbd0f1 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java @@ -0,0 +1,38 @@ +package com.example.cloudrun; + +import java.util.Map; +import lombok.extern.log4j.Log4j2; +import java.util.Base64; +import org.apache.commons.lang3.StringUtils; +import com.fasterxml.jackson.databind.*; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.api.services.pubsub.model.PubsubMessage; + +@Log4j2 +public class PubSubMessageParser { + public static GenericMessage parseMessage(PubsubMessage message) { + String data = message.getData(); + if (data == null) { + return null; + } else { + String dataStr = + !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : ""; + ObjectMapper mapper = new ObjectMapper(); + mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + try { + GenericMessage dataObj = mapper.readValue(dataStr, GCSNotificationMetadata.class); + + return dataObj; + } catch (JsonProcessingException error1) { + try { + GenericMessage dataObj = mapper.readValue(dataStr, BigQueryLogMetadata.class); + return dataObj; + } catch (JsonProcessingException error2) { + log.error("failed to parse GCS Notification metadata", error1); + log.error("failed to parse BQ Log metadata", error2); + return null; + } + } + } + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/testdata.json b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/testdata.json new file mode 100644 index 000000000..12ca354e9 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/testdata.json @@ -0,0 +1,11 @@ +{ + "message": { + "data": "dGVzdA==", + "attributes": { + "bucketId": "event-driven-pipeline-bucket", + "objectId": "yrvine-rotation-demo/san_francisco_bikeshare/bikeshare_regions/trigger.txt" + }, + "publishTime": "2017-09-25T23:16:42.302Z", + "messageId": "91010751788941" + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/resources/application.properties b/tools/cloud_run/event_driven_bq_ingest/src/main/resources/application.properties new file mode 100644 index 000000000..81d437a30 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/resources/application.properties @@ -0,0 +1,14 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +server.port=${PORT:8080} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/test/java/com/example/cloudrun/PubSubControllerTests.java b/tools/cloud_run/event_driven_bq_ingest/src/test/java/com/example/cloudrun/PubSubControllerTests.java new file mode 100644 index 000000000..fa388b9de --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/test/java/com/example/cloudrun/PubSubControllerTests.java @@ -0,0 +1,81 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.cloudrun; + +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.http.MediaType; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.web.servlet.MockMvc; + +@RunWith(SpringRunner.class) +@SpringBootTest +@AutoConfigureMockMvc +public class PubSubControllerTests { + + @Autowired private MockMvc mockMvc; + + @Test + public void addEmptyBody() throws Exception { + mockMvc.perform(post("/")).andExpect(status().isBadRequest()); + } + + @Test + public void addNoMessage() throws Exception { + String mockBody = "{}"; + + mockMvc + .perform(post("/").contentType(MediaType.APPLICATION_JSON).content(mockBody)) + .andExpect(status().isBadRequest()); + } + + @Test + public void addInvalidMimetype() throws Exception { + String mockBody = "{\"message\":{\"data\":\"dGVzdA==\"," + + "\"attributes\":{},\"messageId\":\"91010751788941\"" + + ",\"publishTime\":\"2017-09-25T23:16:42.302Z\"}}"; + + mockMvc + .perform(post("/").contentType(MediaType.TEXT_HTML).content(mockBody)) + .andExpect(status().isUnsupportedMediaType()); + } + + @Test + public void addMinimalBody() throws Exception { + String mockBody = "{\"message\":{}}"; + + mockMvc + .perform(post("/").contentType(MediaType.APPLICATION_JSON).content(mockBody)) + .andExpect(status().isOk()); + } + + @Test + public void addFullBody() throws Exception { + String mockBody = "{\"message\":{\"data\":\"dGVzdA==\"," + + "\"attributes\":{},\"messageId\":\"91010751788941\"" + + ",\"publishTime\":\"2017-09-25T23:16:42.302Z\"}}"; + mockMvc + .perform(post("/").contentType(MediaType.APPLICATION_JSON).content(mockBody)) + .andExpect(status().isOk()); + } +}