From 1ca19b5b451b0836a2b3b43245ec501a797be413 Mon Sep 17 00:00:00 2001 From: Yrvine Thelusma Date: Mon, 14 Feb 2022 15:58:45 -0600 Subject: [PATCH 01/15] copied project to tools/cloud_run/event_driven_bq_ingest --- .../event_driven_bq_ingest/README.md | 95 ++++++++++ .../cloud_run/event_driven_bq_ingest/pom.xml | 168 ++++++++++++++++++ .../event_driven_bq_ingest/pubsub.iml | 156 ++++++++++++++++ .../java/com/example/cloudrun/BQAccessor.java | 83 +++++++++ .../com/example/cloudrun/GCSAccessor.java | 76 ++++++++ .../com/example/cloudrun/JobAccessor.java | 86 +++++++++ .../example/cloudrun/PipelineApplication.java | 27 +++ .../example/cloudrun/PipelineController.java | 72 ++++++++ .../example/cloudrun/PipelineRequestBody.java | 17 ++ .../example/cloudrun/PubSubMessageData.java | 64 +++++++ .../example/cloudrun/PubSubMessageParser.java | 55 ++++++ .../cloudrun/PubSubMessageProperties.java | 31 ++++ .../java/com/example/cloudrun/testdata.json | 11 ++ .../src/main/resources/application.properties | 14 ++ .../cloudrun/PubSubControllerTests.java | 81 +++++++++ 15 files changed, 1036 insertions(+) create mode 100644 tools/cloud_run/event_driven_bq_ingest/README.md create mode 100644 tools/cloud_run/event_driven_bq_ingest/pom.xml create mode 100644 tools/cloud_run/event_driven_bq_ingest/pubsub.iml create mode 100644 tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java create mode 100644 tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSAccessor.java create mode 100644 tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java create mode 100644 tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineApplication.java create mode 100644 tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java create mode 100644 tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineRequestBody.java create mode 100644 tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageData.java create mode 100644 tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java create mode 100644 tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageProperties.java create mode 100644 tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/testdata.json create mode 100644 tools/cloud_run/event_driven_bq_ingest/src/main/resources/application.properties create mode 100644 tools/cloud_run/event_driven_bq_ingest/src/test/java/com/example/cloudrun/PubSubControllerTests.java diff --git a/tools/cloud_run/event_driven_bq_ingest/README.md b/tools/cloud_run/event_driven_bq_ingest/README.md new file mode 100644 index 000000000..2ccd52fb5 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/README.md @@ -0,0 +1,95 @@ +# Cloud Run Event Driven Pipeline Tutorial + +This sample shows how to create an event driven pipeline on Cloud Run via Pub/Sub + +[![Open in Cloud Shell](https://gstatic.com/cloudssh/images/open-btn.png)](https://ssh.cloud.google.com/cloudshell/open?cloudshell_git_repo=https://github.com/yrvine-g/event-driven-pipeline&cloudshell_tutorial=pipeline/pubsub/README.md) + + + +## Dependencies + +* **Spring Boot**: Web server framework. +* **Jib**: Container build tool. + +## Setup +Create GCS bucket and a folder path with the following format: +gs://bucket/project/dataset/table_name/*.avro + +Enable following APIs: +* container registry +* cloud run handler + +```sh +gcloud config set project MY_PROJECT +``` + +Configure environment variables: + +```sh +export MY_RUN_SERVICE=run-service +export MY_RUN_CONTAINER=run-container +export PROJECT=$(gcloud config get-value project) +export MY_GCS_BUCKET="$(gcloud config get-value project)-gcs-bucket" +export REGION=us-central1 +export SERVICE_ACCOUNT=cloud-run-pubsub-invoker +``` + +## Quickstart + +Use the [Jib Maven Plugin](https://github.com/GoogleContainerTools/jib/tree/master/jib-maven-plugin) to build and push container image: + +Switch to directory with .pom + +```sh +mvn compile jib:build -Dimage=gcr.io/$PROJECT/$MY_RUN_CONTAINER +``` + +Deploy Cloud Run service: +```sh +gcloud config set run/region $REGION +gcloud run deploy $MY_RUN_SERVICE \ +--image gcr.io/$PROJECT/$MY_RUN_CONTAINER \ +--no-allow-unauthenticated +``` + + +Create PubSub Topic and GCS notification +```sh +gcloud pubsub topics create pipelineNotification + +gsutil notification create -f json -t pipelineNotification -e OBJECT_FINALIZE gs://"$MY_GCS_BUCKET" +``` + + +Create Service Account for Cloud Run and Pub/Sub permissions +```sh +gcloud iam service-accounts create $SERVICE_ACCOUNT \ +--display-name "Cloud Run Pub/Sub Invoker" +gcloud run services add-iam-policy-binding $MY_RUN_SERVICE \ +--member=serviceAccount:$SERVICE_ACCOUNT@$PROJECT.iam.gserviceaccount.com \ +--role=roles/run.invoker +``` + +Create Pub/Sub Subscription +```sh +export RUN_SERVICE_URL=$(gcloud run services describe $MY_RUN_SERVICE --format='value(status.url)') +gcloud pubsub subscriptions create pipelineTrigger --topic pipelineNotification \ + --push-endpoint=$RUN_SERVICE_URL \ + --push-auth-service-account=$SERVICE_ACCOUNT@$PROJECT.iam.gserviceaccount.com \ + --ack-deadline=600 +``` + + +Create log sink +```shell +export PROJECT_NO=$(gcloud projects list --filter="$PROJECT" --format="value(PROJECT_NUMBER)") +gcloud logging sinks create bq-job-completed \ +pubsub.googleapis.com/projects/$PROJECT/topics/pipelineNotification \ + --log-filter='resource.type="bigquery_project" +severity=INFO +protoPayload.metadata.jobChange.job.jobStatus.jobState="DONE" +protoPayload.authenticationInfo.principalEmail="'$PROJECT_NO'-compute@developer.gserviceaccount.com"' +export LOG_SERVICE_ACCOUNT=$(gcloud logging sinks describe bq-job-completed --format='value(writerIdentity)') +gcloud pubsub topics add-iam-policy-binding pipelineNotification \ + --member=serviceAccount:LOG_SERVICE_ACCOUNT --role=roles/pubsub.publisher +``` diff --git a/tools/cloud_run/event_driven_bq_ingest/pom.xml b/tools/cloud_run/event_driven_bq_ingest/pom.xml new file mode 100644 index 000000000..fb64d3817 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/pom.xml @@ -0,0 +1,168 @@ + + + + 4.0.0 + com.example.cloudrun + pubsub + 0.0.1-SNAPSHOT + + + com.google.cloud.samples + shared-configuration + 1.0.23 + + + + 11 + 11 + 1.18.12 + 2.13.0 + + + + + + + org.springframework.boot + spring-boot-dependencies + 2.5.5 + pom + import + + + org.springframework.cloud + spring-cloud-dependencies + 2020.0.4 + pom + import + + + com.google.cloud + libraries-bom + 24.0.0 + pom + import + + + + + + org.springframework.boot + spring-boot-starter-web + + + org.apache.commons + commons-lang3 + 3.12.0 + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.boot + spring-boot-configuration-processor + 2.0.1.RELEASE + + + com.google.cloud + google-cloud-bigquery + + + com.google.cloud + google-cloud-storage + 2.1.9 + + + + com.google.oauth-client + google-oauth-client-java6 + 1.32.1 + + + com.google.oauth-client + google-oauth-client-jetty + 1.32.1 + + + com.google.cloud + google-cloud-bigtable + 2.2.0 + test + + + com.google.cloud + google-cloud-bigqueryconnection + 2.1.6 + test + + + junit + junit + 4.13.2 + test + + + com.google.truth + truth + 1.1.3 + test + + + org.projectlombok + lombok + ${lombok-version} + true + + + com.fasterxml.jackson.core + jackson-databind + 2.11.0 + + + + com.google.apis + google-api-services-pubsub + v1-rev452-1.25.0 + + + + + + org.springframework.boot + spring-boot-maven-plugin + 2.5.4 + + + com.google.cloud.tools + jib-maven-plugin + 3.1.4 + + + gcr.io/PROJECT_ID/pubsub + + + + + + \ No newline at end of file diff --git a/tools/cloud_run/event_driven_bq_ingest/pubsub.iml b/tools/cloud_run/event_driven_bq_ingest/pubsub.iml new file mode 100644 index 000000000..e1f4fa1d2 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/pubsub.iml @@ -0,0 +1,156 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java new file mode 100644 index 000000000..979d34a09 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java @@ -0,0 +1,83 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.cloudrun; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.ExternalTableDefinition; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.QueryJobConfiguration; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +public class BQAccessor { + + private static final String EXTERNAL_TABLE_NAME = "externalTable"; + + public static void insertIntoBQ( + PubSubMessageProperties pubSubMessageProperties, String fileFormat) { + + // create sourceUri with format --> gs://bucket/project/dataset/table/table* + String sourceUri = + String.format( + "gs://%s/%s/%s/%s/%s*", + pubSubMessageProperties.getBucketId(), + pubSubMessageProperties.getProject(), + pubSubMessageProperties.getDataset(), + pubSubMessageProperties.getTable(), + pubSubMessageProperties.getTable()); + log.info("source URI is: {}", sourceUri); + + try { + + // Initialize client that will be used to send requests. This client only needs to be created + // once, and can be reused for multiple requests. + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + + FormatOptions format = FormatOptions.of(fileFormat); + + ExternalTableDefinition externalTable = + ExternalTableDefinition.newBuilder(sourceUri, format).build(); + + log.info("external table config: {}", externalTable); + + String query = + String.format( + "CREATE OR REPLACE TABLE `%s.%s.%s` AS SELECT * FROM %s", + pubSubMessageProperties.getProject(), + pubSubMessageProperties.getDataset(), + pubSubMessageProperties.getTable(), + EXTERNAL_TABLE_NAME); + QueryJobConfiguration queryConfig = + QueryJobConfiguration.newBuilder(query) + .addTableDefinition(EXTERNAL_TABLE_NAME, externalTable) + .build(); + log.info("query we fired: {}", query); + JobInfo jobInfo = JobInfo.of(queryConfig); + Job job = bigquery.create(jobInfo); + JobId jobId = job.getJobId(); + log.info("job id: {}", jobId); + + } catch (Exception e) { + throw new RuntimeException("Exception occured during insertion to BQ", e); + } + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSAccessor.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSAccessor.java new file mode 100644 index 000000000..aed0d9a16 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSAccessor.java @@ -0,0 +1,76 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.cloudrun; + +import com.google.api.gax.paging.Page; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.CopyWriter; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; + +import java.util.Arrays; + +import lombok.extern.log4j.Log4j2; + +@Log4j2 +public class GCSAccessor { + + private static final Storage storage = StorageOptions.getDefaultInstance().getService(); + + public static void archiveFiles(String sourceUri) { + + String sourceBucketName = getBucketName(sourceUri); + String targetBucketName = sourceBucketName + "_archival"; + String sourceObjectName = getObjectName(sourceUri); + log.info("source uri: {}", sourceUri); + log.info("sourceBucketName: {}", sourceBucketName); + log.info("targetBucketName: {}", targetBucketName); + log.info("sourceObjectName: {}", sourceObjectName); + + try { + + Page blobs = + storage.list(sourceBucketName, Storage.BlobListOption.prefix(sourceObjectName)); + log.info("Iterating the blobs"); + for (Blob blob : blobs.iterateAll()) { + log.info("Blob is: " + blob.getName()); + if (!blob.getName().equalsIgnoreCase(sourceObjectName)) { + CopyWriter copyWriter = blob.copyTo(targetBucketName, blob.getName()); + Blob copiedBlob = copyWriter.getResult(); + } + blob.delete(); + } + } catch (RuntimeException e) { + log.error("failed to process the message", e); + throw new RuntimeException("failed to archive files", e); + } + } + + public static String getBucketName(String filePath) { + String[] path = filePath.replace("gs://", "").split("/"); + + String bucket = path[0]; + return bucket; + } + + public static String getObjectName(String filePath) { + String[] path = filePath.replace("gs://", "").split("/"); + String objectName = String.join("/", Arrays.copyOfRange(path, 1, path.length)); + + return objectName; + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java new file mode 100644 index 000000000..0ca5f3f3e --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java @@ -0,0 +1,86 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.cloudrun; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.ExternalTableDefinition; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.JobConfiguration; +import lombok.extern.log4j.Log4j2; + +import java.util.Map; +import java.util.List; +import java.util.ArrayList; + +@Log4j2 +public class JobAccessor { + + public static List checkJobCompeletion(PubSubMessageData pubSubMessageData) { + log.info("Resource Name:{}", pubSubMessageData.getProtoPayload().getResourceName()); + + try { + String resourceName = pubSubMessageData.getProtoPayload().getResourceName(); + String[] parsedName = resourceName.split("/"); + String jobName = parsedName[parsedName.length - 1]; + + BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); + JobId jobId = JobId.of(jobName); + Job job = bigquery.getJob(jobId); + + List allSourceUris = new ArrayList(); + // check for error + if (job.isDone()) { + log.info("Job successfully loaded BQ table"); + JobConfiguration jobConfig = job.getConfiguration(); + if (jobConfig instanceof QueryJobConfiguration) { + Map tableConfigs = + ((QueryJobConfiguration) jobConfig).getTableDefinitions(); + tableConfigs.forEach( + (tableKey, tableValue) -> + tableValue + .getSourceUris() + .forEach((item) -> allSourceUris.add(removeWildcard(item)))); + log.info("end of tables"); + } else { + log.info("job configuration is not an instance"); + } + } else { + log.info( + "BigQuery was unable to load into the table due to an error:" + + job.getStatus().getError()); + throw new RuntimeException( + "BigQuery was unable to load into the table due to an error: " + + job.getStatus().getError().getMessage()); + } + return allSourceUris; + } catch (NullPointerException | BigQueryException e) { + log.info("Job not retrieved. \n" + e.toString()); + return null; + } + } + + public static String removeWildcard(String str) { + String[] arrOfStr = str.split("\\*", 0); + return arrOfStr[0]; + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineApplication.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineApplication.java new file mode 100644 index 000000000..b5e5ffd2d --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineApplication.java @@ -0,0 +1,27 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.cloudrun; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class PipelineApplication { + public static void main(String[] args) { + SpringApplication.run(PipelineApplication.class, args); + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java new file mode 100644 index 000000000..0ff22fa16 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java @@ -0,0 +1,72 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.cloudrun; + +import lombok.extern.log4j.Log4j2; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.List; +import com.google.api.services.pubsub.model.PubsubMessage; + +@RestController +@Log4j2 +public class PipelineController { + + private static final String TRIGGER_FILE_NAME = "trigger.txt"; + private static final String FILE_FORMAT = "AVRO"; + + @RequestMapping(value = "/", method = RequestMethod.POST) + public ResponseEntity receiveMessage(@RequestBody PipelineRequestBody request) { + // Get PubSub pubSubMessage from request body. + PubsubMessage pubSubMessage = request.getMessage(); + if (pubSubMessage == null) { + log.info("Bad Request: invalid Pub/Sub pubSubMessage format"); + return new ResponseEntity("invalid Pub/Sub pubSubMessage", HttpStatus.BAD_REQUEST); + } + try { + PubSubMessageProperties pubSubMessageProperties = + PubSubMessageParser.parsePubSubProperties(pubSubMessage); + if (pubSubMessageProperties == null) { + // parse pubsub message as bq job notification + PubSubMessageData pubSubMessageData = + PubSubMessageParser.parsePubSubData(pubSubMessage.getData()); + List sourceUris = JobAccessor.checkJobCompeletion(pubSubMessageData); + sourceUris.forEach((sourceUri -> GCSAccessor.archiveFiles(sourceUri))); + return new ResponseEntity("job completed", HttpStatus.OK); + } else { + // pubsub message was a gcs notification + if (TRIGGER_FILE_NAME.equals(pubSubMessageProperties.getTriggerFile())) { + log.info("Found Trigger file, started BQ insert"); + BQAccessor.insertIntoBQ(pubSubMessageProperties, FILE_FORMAT); + // GCSAccessor.archiveFiles(pubSubMessageProperties); + return new ResponseEntity("triggered successfully", HttpStatus.OK); + } else { + log.info("Not trigger file"); + return new ResponseEntity("Not trigger file", HttpStatus.OK); + } + } + } catch (RuntimeException | JsonProcessingException e) { + log.error("failed to process the message", e); + return new ResponseEntity(e.getMessage(), HttpStatus.OK); + } + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineRequestBody.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineRequestBody.java new file mode 100644 index 000000000..d33343bd3 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineRequestBody.java @@ -0,0 +1,17 @@ +package com.example.cloudrun; + +import com.google.api.services.pubsub.model.PubsubMessage; + +public class PipelineRequestBody { + private PubsubMessage message; + + public PipelineRequestBody() {} + + public PubsubMessage getMessage() { + return message; + } + + public void setMessage(PubsubMessage message) { + this.message = message; + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageData.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageData.java new file mode 100644 index 000000000..3b711c3da --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageData.java @@ -0,0 +1,64 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.cloudrun; + +import com.google.cloud.bigquery.BigQueryError; +import lombok.extern.log4j.Log4j2; + +@Log4j2 +public class PubSubMessageData { + + private String insertId; + private ProtoPayload protoPayload; + + public PubSubMessageData() {} + + public PubSubMessageData(String insertId, ProtoPayload protoPayload) { + this.insertId = insertId; + this.protoPayload = protoPayload; + } + + public String getInsertId() { + return insertId; + } + + public void setInsertId(String insertId) { + this.insertId = insertId; + } + + public ProtoPayload getProtoPayload() { + return protoPayload; + } + + public void setProtoPayload(ProtoPayload protoPayload) { + this.protoPayload = protoPayload; + } + + public class ProtoPayload { + private String resourceName; + + public ProtoPayload() {} + + public String getResourceName() { + return resourceName; + } + + public void setResourceName(String resourceName) { + this.resourceName = resourceName; + } + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java new file mode 100644 index 000000000..a9d8a3275 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java @@ -0,0 +1,55 @@ +package com.example.cloudrun; + +import java.util.Map; +import lombok.extern.log4j.Log4j2; +import java.util.Base64; +import org.apache.commons.lang3.StringUtils; +import com.fasterxml.jackson.databind.*; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.api.services.pubsub.model.PubsubMessage; + +@Log4j2 +public class PubSubMessageParser { + + public static PubSubMessageProperties parsePubSubProperties(PubsubMessage message) { + + Map attributes = message.getAttributes(); + if (attributes == null) { + throw new RuntimeException("No attributes in the pubsub message"); + } + + String bucketId = attributes.get("bucketId"); + String objectId = attributes.get("objectId"); + + // return null if there is no objectId + if (objectId == null) { + return null; + } + + String[] parsedObjectId = objectId.split("/"); + + if (parsedObjectId.length < 4) { + throw new RuntimeException("The object id is formatted incorrectly"); + } + String project = parsedObjectId[0]; + String dataset = parsedObjectId[1]; + String table = parsedObjectId[2]; + String triggerFileName = parsedObjectId[3]; + + return PubSubMessageProperties.builder() + .bucketId(bucketId) + .project(project) + .dataset(dataset) + .table(table) + .triggerFile(triggerFileName) + .build(); + } + + public static PubSubMessageData parsePubSubData(String data) throws JsonProcessingException { + String dataStr = !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : ""; + ObjectMapper mapper = new ObjectMapper(); + mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + PubSubMessageData dataObj = mapper.readValue(dataStr, PubSubMessageData.class); + return dataObj; + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageProperties.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageProperties.java new file mode 100644 index 000000000..ce7169880 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageProperties.java @@ -0,0 +1,31 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.cloudrun; + +import lombok.Builder; +import lombok.Value; + +@Value +@Builder +public class PubSubMessageProperties { + + String bucketId; + String project; + String dataset; + String table; + String triggerFile; +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/testdata.json b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/testdata.json new file mode 100644 index 000000000..130112183 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/testdata.json @@ -0,0 +1,11 @@ +{ + "message": { + "data": "dGVzdA==", + "attributes": { + "bucketId": "event-driven-pipeline-bucket", + "objectId": "yrvine-rotation-demo/san_francisco_bikeshare/bikeshare_regions/trigger.txt" + }, + "publishTime": "2017-09-25T23:16:42.302Z", + "messageId": "91010751788941" + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/resources/application.properties b/tools/cloud_run/event_driven_bq_ingest/src/main/resources/application.properties new file mode 100644 index 000000000..81d437a30 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/resources/application.properties @@ -0,0 +1,14 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +server.port=${PORT:8080} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/test/java/com/example/cloudrun/PubSubControllerTests.java b/tools/cloud_run/event_driven_bq_ingest/src/test/java/com/example/cloudrun/PubSubControllerTests.java new file mode 100644 index 000000000..fa388b9de --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/test/java/com/example/cloudrun/PubSubControllerTests.java @@ -0,0 +1,81 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.cloudrun; + +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.http.MediaType; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.web.servlet.MockMvc; + +@RunWith(SpringRunner.class) +@SpringBootTest +@AutoConfigureMockMvc +public class PubSubControllerTests { + + @Autowired private MockMvc mockMvc; + + @Test + public void addEmptyBody() throws Exception { + mockMvc.perform(post("/")).andExpect(status().isBadRequest()); + } + + @Test + public void addNoMessage() throws Exception { + String mockBody = "{}"; + + mockMvc + .perform(post("/").contentType(MediaType.APPLICATION_JSON).content(mockBody)) + .andExpect(status().isBadRequest()); + } + + @Test + public void addInvalidMimetype() throws Exception { + String mockBody = "{\"message\":{\"data\":\"dGVzdA==\"," + + "\"attributes\":{},\"messageId\":\"91010751788941\"" + + ",\"publishTime\":\"2017-09-25T23:16:42.302Z\"}}"; + + mockMvc + .perform(post("/").contentType(MediaType.TEXT_HTML).content(mockBody)) + .andExpect(status().isUnsupportedMediaType()); + } + + @Test + public void addMinimalBody() throws Exception { + String mockBody = "{\"message\":{}}"; + + mockMvc + .perform(post("/").contentType(MediaType.APPLICATION_JSON).content(mockBody)) + .andExpect(status().isOk()); + } + + @Test + public void addFullBody() throws Exception { + String mockBody = "{\"message\":{\"data\":\"dGVzdA==\"," + + "\"attributes\":{},\"messageId\":\"91010751788941\"" + + ",\"publishTime\":\"2017-09-25T23:16:42.302Z\"}}"; + mockMvc + .perform(post("/").contentType(MediaType.APPLICATION_JSON).content(mockBody)) + .andExpect(status().isOk()); + } +} From b85e7685022d31ab880725a52c741f5da54946c5 Mon Sep 17 00:00:00 2001 From: yrvine-g <94653518+yrvine-g@users.noreply.github.com> Date: Wed, 16 Feb 2022 18:44:09 -0500 Subject: [PATCH 02/15] Update tools/cloud_run/event_driven_bq_ingest/README.md Co-authored-by: Daniel De Leo --- tools/cloud_run/event_driven_bq_ingest/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/cloud_run/event_driven_bq_ingest/README.md b/tools/cloud_run/event_driven_bq_ingest/README.md index 2ccd52fb5..c6fa6ff43 100644 --- a/tools/cloud_run/event_driven_bq_ingest/README.md +++ b/tools/cloud_run/event_driven_bq_ingest/README.md @@ -2,7 +2,7 @@ This sample shows how to create an event driven pipeline on Cloud Run via Pub/Sub -[![Open in Cloud Shell](https://gstatic.com/cloudssh/images/open-btn.png)](https://ssh.cloud.google.com/cloudshell/open?cloudshell_git_repo=https://github.com/yrvine-g/event-driven-pipeline&cloudshell_tutorial=pipeline/pubsub/README.md) +[![Open in Cloud Shell](https://gstatic.com/cloudssh/images/open-btn.png)](https://ssh.cloud.google.com/cloudshell/open?cloudshell_git_repo=https://github.com/yrvine-g/bigquery-utils.git&cloudshell_tutorial=tools/cloud_run/event_driven_bq_ingest/README.md) From 3d8d7eeb2c244f3dbff2354ea8255ca92aabec52 Mon Sep 17 00:00:00 2001 From: yrvine-g <94653518+yrvine-g@users.noreply.github.com> Date: Wed, 16 Feb 2022 18:46:02 -0500 Subject: [PATCH 03/15] Update tools/cloud_run/event_driven_bq_ingest/pom.xml Co-authored-by: Daniel De Leo --- tools/cloud_run/event_driven_bq_ingest/pom.xml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tools/cloud_run/event_driven_bq_ingest/pom.xml b/tools/cloud_run/event_driven_bq_ingest/pom.xml index fb64d3817..498b1c6f9 100644 --- a/tools/cloud_run/event_driven_bq_ingest/pom.xml +++ b/tools/cloud_run/event_driven_bq_ingest/pom.xml @@ -1,7 +1,8 @@ gs://bucket/project/dataset/table/table* String sourceUri = String.format( "gs://%s/%s/%s/%s/%s*", - pubSubMessageProperties.getBucketId(), - pubSubMessageProperties.getProject(), - pubSubMessageProperties.getDataset(), - pubSubMessageProperties.getTable(), - pubSubMessageProperties.getTable()); + GCSObjectProperties.getBucketId(), + GCSObjectProperties.getProject(), + GCSObjectProperties.getDataset(), + GCSObjectProperties.getTable(), + GCSObjectProperties.getTable()); log.info("source URI is: {}", sourceUri); try { @@ -62,9 +61,9 @@ public static void insertIntoBQ( String query = String.format( "INSERT INTO `%s.%s.%s` SELECT * FROM %s", - pubSubMessageProperties.getProject(), - pubSubMessageProperties.getDataset(), - pubSubMessageProperties.getTable(), + GCSObjectProperties.getProject(), + GCSObjectProperties.getDataset(), + GCSObjectProperties.getTable(), EXTERNAL_TABLE_NAME); QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query) diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageData.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java similarity index 88% rename from tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageData.java rename to tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java index 3b711c3da..2d92c9d7d 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageData.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java @@ -16,18 +16,17 @@ package com.example.cloudrun; -import com.google.cloud.bigquery.BigQueryError; import lombok.extern.log4j.Log4j2; @Log4j2 -public class PubSubMessageData { +public class BigQueryLogMetadata extends GenericMessage { private String insertId; private ProtoPayload protoPayload; - public PubSubMessageData() {} + public BigQueryLogMetadata() {} - public PubSubMessageData(String insertId, ProtoPayload protoPayload) { + public BigQueryLogMetadata(String insertId, ProtoPayload protoPayload) { this.insertId = insertId; this.protoPayload = protoPayload; } diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java new file mode 100644 index 000000000..abf7f6783 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java @@ -0,0 +1,79 @@ +package com.example.cloudrun; + +import lombok.Builder; +import lombok.Value; + +public class GCSNotificationMetadata extends GenericMessage{ + private String id; + private String name; + private String bucket; + + public GCSNotificationMetadata() {} + + public GCSNotificationMetadata(String id, String name, String bucket) { + this.id = id; + this.name = name; + this.bucket = bucket; + } + + public String getInsertId() { + return id; + } + + public void setInsertId(String insertId) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getBucket() { + return bucket; + } + + public void setBucket(String bucket) { + this.bucket = bucket; + } + + @Value + @Builder + public class GCSObjectProperties { + String bucketId; + String project; + String dataset; + String table; + String triggerFile; + } + public GCSObjectProperties getGCSObjectProperties() { + String bucketId = this.bucket; + String objectId = this.name; + + // return null if there is no objectId + if (objectId == null || bucketId == null) { + return null; + } + + String[] parsedObjectId = objectId.split("/"); + + if (parsedObjectId.length < 4) { + throw new RuntimeException("The object id is formatted incorrectly"); + } + String project = parsedObjectId[0]; + String dataset = parsedObjectId[1]; + String table = parsedObjectId[2]; + String triggerFileName = parsedObjectId[3]; + + return GCSObjectProperties.builder() + .bucketId(bucketId) + .project(project) + .dataset(dataset) + .table(table) + .triggerFile(triggerFileName) + .build(); + } +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GenericMessage.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GenericMessage.java new file mode 100644 index 000000000..08595bbd4 --- /dev/null +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GenericMessage.java @@ -0,0 +1,4 @@ +package com.example.cloudrun; + +public abstract class GenericMessage { +} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java index 76f38e98c..eeb69c58c 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java @@ -20,9 +20,7 @@ import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.ExternalTableDefinition; -import com.google.cloud.bigquery.FormatOptions; import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.JobId; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.JobConfiguration; @@ -35,11 +33,11 @@ @Log4j2 public class JobAccessor { - public static List checkJobCompeletion(PubSubMessageData pubSubMessageData) { - log.info("Resource Name:{}", pubSubMessageData.getProtoPayload().getResourceName()); + public static List checkJobCompeletion(BigQueryLogMetadata bigQueryLogMetadata) { + log.info("Resource Name:{}", bigQueryLogMetadata.getProtoPayload().getResourceName()); try { - String resourceName = pubSubMessageData.getProtoPayload().getResourceName(); + String resourceName = bigQueryLogMetadata.getProtoPayload().getResourceName(); String[] parsedName = resourceName.split("/"); String jobName = parsedName[parsedName.length - 1]; diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java index 97dafd118..d9352db4c 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java @@ -42,30 +42,32 @@ public ResponseEntity receiveMessage(@RequestBody PipelineRequestBody request) { log.info("Bad Request: invalid Pub/Sub pubSubMessage format"); return new ResponseEntity("invalid Pub/Sub pubSubMessage", HttpStatus.BAD_REQUEST); } - try { - PubSubMessageProperties pubSubMessageProperties = - PubSubMessageParser.parsePubSubProperties(pubSubMessage); - if (pubSubMessageProperties == null) { - // parse pubsub message as bq job notification - PubSubMessageData pubSubMessageData = - PubSubMessageParser.parsePubSubData(pubSubMessage.getData()); - List sourceUris = JobAccessor.checkJobCompeletion(pubSubMessageData); - sourceUris.forEach((sourceUri -> GCSAccessor.archiveFiles(sourceUri))); - return new ResponseEntity("job completed", HttpStatus.OK); + GenericMessage metadata = PubSubMessageParser.parseMessage(pubSubMessage); + if (metadata == null) { + log.info("Bad Request: message does not follow format for data extraction"); + return new ResponseEntity("invalid message data", HttpStatus.BAD_REQUEST); + } else if (metadata instanceof GCSNotificationMetadata) { + // handle gcs notification + GCSNotificationMetadata gcsNotificationMetadata = (GCSNotificationMetadata) metadata; + GCSNotificationMetadata.GCSObjectProperties gcsObjectProperties = gcsNotificationMetadata.getGCSObjectProperties(); + + if (TRIGGER_FILE_NAME.equals(gcsObjectProperties.getTriggerFile())) { + log.info("Found Trigger file, started BQ insert"); + BQAccessor.insertIntoBQ(gcsObjectProperties, FILE_FORMAT); + return new ResponseEntity("triggered successfully", HttpStatus.OK); } else { - // pubsub message was a gcs notification - if (TRIGGER_FILE_NAME.equals(pubSubMessageProperties.getTriggerFile())) { - log.info("Found Trigger file, started BQ insert"); - BQAccessor.insertIntoBQ(pubSubMessageProperties, FILE_FORMAT); - return new ResponseEntity("triggered successfully", HttpStatus.OK); - } else { - log.info("Not trigger file"); - return new ResponseEntity("Not trigger file", HttpStatus.OK); - } + log.info("Not trigger file"); + return new ResponseEntity("Not trigger file", HttpStatus.OK); } - } catch (RuntimeException | JsonProcessingException e) { - log.error("failed to process the message", e); - return new ResponseEntity(e.getMessage(), HttpStatus.OK); + } else if(metadata instanceof BigQueryLogMetadata) { + //handle bq job complete notification + BigQueryLogMetadata bigQueryLogMetadata = (BigQueryLogMetadata) metadata; + List sourceUris = JobAccessor.checkJobCompeletion(bigQueryLogMetadata); + sourceUris.forEach((sourceUri -> GCSAccessor.archiveFiles(sourceUri))); + return new ResponseEntity("job completed", HttpStatus.OK); + } else { + log.error("failed to process the message"); + return new ResponseEntity("failed to process the message", HttpStatus.OK); } } } diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java index a9d8a3275..08154998b 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java @@ -10,46 +10,27 @@ @Log4j2 public class PubSubMessageParser { - - public static PubSubMessageProperties parsePubSubProperties(PubsubMessage message) { - - Map attributes = message.getAttributes(); - if (attributes == null) { - throw new RuntimeException("No attributes in the pubsub message"); - } - - String bucketId = attributes.get("bucketId"); - String objectId = attributes.get("objectId"); - - // return null if there is no objectId - if (objectId == null) { + public static GenericMessage parseMessage(PubsubMessage message) { + String data = message.getData(); + if (data == null) { return null; + } else { + String dataStr = !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : ""; + ObjectMapper mapper = new ObjectMapper(); + mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + try { + GenericMessage dataObj = mapper.readValue(dataStr, GCSNotificationMetadata.class); + return dataObj; + } catch (JsonProcessingException error1) { + try { + GenericMessage dataObj = mapper.readValue(dataStr, BigQueryLogMetadata.class); + return dataObj; + } catch (JsonProcessingException error2) { + log.error("failed to parse GCS Notification metadata", error1); + log.error("failed to parse BQ Log metadata", error2); + return null; + } + } } - - String[] parsedObjectId = objectId.split("/"); - - if (parsedObjectId.length < 4) { - throw new RuntimeException("The object id is formatted incorrectly"); - } - String project = parsedObjectId[0]; - String dataset = parsedObjectId[1]; - String table = parsedObjectId[2]; - String triggerFileName = parsedObjectId[3]; - - return PubSubMessageProperties.builder() - .bucketId(bucketId) - .project(project) - .dataset(dataset) - .table(table) - .triggerFile(triggerFileName) - .build(); - } - - public static PubSubMessageData parsePubSubData(String data) throws JsonProcessingException { - String dataStr = !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : ""; - ObjectMapper mapper = new ObjectMapper(); - mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - PubSubMessageData dataObj = mapper.readValue(dataStr, PubSubMessageData.class); - return dataObj; } } diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageProperties.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageProperties.java deleted file mode 100644 index ce7169880..000000000 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageProperties.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.example.cloudrun; - -import lombok.Builder; -import lombok.Value; - -@Value -@Builder -public class PubSubMessageProperties { - - String bucketId; - String project; - String dataset; - String table; - String triggerFile; -} From 6a4e5154f4feb6d60786d35159ce9ef4604e049a Mon Sep 17 00:00:00 2001 From: Yrvine Thelusma Date: Fri, 18 Feb 2022 10:44:57 -0600 Subject: [PATCH 12/15] Added require variable for metadata pojos --- .../example/cloudrun/BigQueryLogMetadata.java | 11 ++++++----- .../cloudrun/GCSNotificationMetadata.java | 19 +++++++------------ .../example/cloudrun/PipelineController.java | 1 - .../example/cloudrun/PubSubMessageParser.java | 1 + 4 files changed, 14 insertions(+), 18 deletions(-) diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java index 2d92c9d7d..eeb0c8c80 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java @@ -16,6 +16,7 @@ package com.example.cloudrun; +import com.fasterxml.jackson.annotation.JsonProperty; import lombok.extern.log4j.Log4j2; @Log4j2 @@ -24,9 +25,7 @@ public class BigQueryLogMetadata extends GenericMessage { private String insertId; private ProtoPayload protoPayload; - public BigQueryLogMetadata() {} - - public BigQueryLogMetadata(String insertId, ProtoPayload protoPayload) { + public BigQueryLogMetadata(@JsonProperty("insertId") String insertId, @JsonProperty(value = "protoPayload", required = true) ProtoPayload protoPayload) { this.insertId = insertId; this.protoPayload = protoPayload; } @@ -47,10 +46,12 @@ public void setProtoPayload(ProtoPayload protoPayload) { this.protoPayload = protoPayload; } - public class ProtoPayload { + public static class ProtoPayload { private String resourceName; - public ProtoPayload() {} + public ProtoPayload(@JsonProperty(value = "resourceName", required = true) String resourceName) { + this.resourceName = resourceName; + } public String getResourceName() { return resourceName; diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java index abf7f6783..680438a4a 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java @@ -1,29 +1,22 @@ package com.example.cloudrun; +import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Builder; import lombok.Value; +import lombok.extern.log4j.Log4j2; +@Log4j2 public class GCSNotificationMetadata extends GenericMessage{ - private String id; private String name; private String bucket; public GCSNotificationMetadata() {} - public GCSNotificationMetadata(String id, String name, String bucket) { - this.id = id; + public GCSNotificationMetadata(@JsonProperty(value = "name", required = true) String name, @JsonProperty(value = "bucket", required = true) String bucket) { this.name = name; this.bucket = bucket; } - public String getInsertId() { - return id; - } - - public void setInsertId(String insertId) { - this.id = id; - } - public String getName() { return name; } @@ -42,7 +35,7 @@ public void setBucket(String bucket) { @Value @Builder - public class GCSObjectProperties { + public static class GCSObjectProperties { String bucketId; String project; String dataset; @@ -50,6 +43,8 @@ public class GCSObjectProperties { String triggerFile; } public GCSObjectProperties getGCSObjectProperties() { + log.info("bucket", bucket); + log.info("name", name); String bucketId = this.bucket; String objectId = this.name; diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java index d9352db4c..d4e0a704a 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java @@ -50,7 +50,6 @@ public ResponseEntity receiveMessage(@RequestBody PipelineRequestBody request) { // handle gcs notification GCSNotificationMetadata gcsNotificationMetadata = (GCSNotificationMetadata) metadata; GCSNotificationMetadata.GCSObjectProperties gcsObjectProperties = gcsNotificationMetadata.getGCSObjectProperties(); - if (TRIGGER_FILE_NAME.equals(gcsObjectProperties.getTriggerFile())) { log.info("Found Trigger file, started BQ insert"); BQAccessor.insertIntoBQ(gcsObjectProperties, FILE_FORMAT); diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java index 08154998b..0ebab66ac 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java @@ -20,6 +20,7 @@ public static GenericMessage parseMessage(PubsubMessage message) { mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); try { GenericMessage dataObj = mapper.readValue(dataStr, GCSNotificationMetadata.class); + return dataObj; } catch (JsonProcessingException error1) { try { From a2def1b2f7ff7a5cdaff9b2133b994c268d6b69f Mon Sep 17 00:00:00 2001 From: Yrvine Thelusma Date: Fri, 18 Feb 2022 10:54:43 -0600 Subject: [PATCH 13/15] Removed unnecessary print statements --- .../src/main/java/com/example/cloudrun/BQAccessor.java | 2 +- .../main/java/com/example/cloudrun/GCSNotificationMetadata.java | 2 -- .../src/main/java/com/example/cloudrun/JobAccessor.java | 1 - 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java index 090d549c5..ed484c8d0 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java @@ -69,7 +69,7 @@ public static void insertIntoBQ( QueryJobConfiguration.newBuilder(query) .addTableDefinition(EXTERNAL_TABLE_NAME, externalTable) .build(); - log.info("query we fired: {}", query); + log.info("query fired: {}", query); JobInfo jobInfo = JobInfo.of(queryConfig); Job job = bigquery.create(jobInfo); JobId jobId = job.getJobId(); diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java index 680438a4a..440ddaeb9 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java @@ -43,8 +43,6 @@ public static class GCSObjectProperties { String triggerFile; } public GCSObjectProperties getGCSObjectProperties() { - log.info("bucket", bucket); - log.info("name", name); String bucketId = this.bucket; String objectId = this.name; diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java index eeb69c58c..3585adc57 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java @@ -58,7 +58,6 @@ public static List checkJobCompeletion(BigQueryLogMetadata bigQueryLogMe tableValue .getSourceUris() .forEach((item) -> allSourceUris.add(removeWildcard(item)))); - log.info("end of tables"); } else { log.info("job configuration is not an instance"); } From 99ad2f81d970803e7fa682eba8e33a1d69e02ed4 Mon Sep 17 00:00:00 2001 From: Yrvine Thelusma Date: Fri, 18 Feb 2022 11:40:43 -0600 Subject: [PATCH 14/15] Remove unneccessary dependencies from pom file --- .../cloud_run/event_driven_bq_ingest/pom.xml | 34 ------------------- 1 file changed, 34 deletions(-) diff --git a/tools/cloud_run/event_driven_bq_ingest/pom.xml b/tools/cloud_run/event_driven_bq_ingest/pom.xml index f9fb87509..570b42bdf 100644 --- a/tools/cloud_run/event_driven_bq_ingest/pom.xml +++ b/tools/cloud_run/event_driven_bq_ingest/pom.xml @@ -80,11 +80,6 @@ spring-boot-starter-test test - - org.springframework.boot - spring-boot-configuration-processor - 2.0.1.RELEASE - com.google.cloud google-cloud-bigquery @@ -94,41 +89,12 @@ google-cloud-storage 2.1.9 - - - com.google.oauth-client - google-oauth-client-java6 - 1.32.1 - - - com.google.oauth-client - google-oauth-client-jetty - 1.32.1 - - - com.google.cloud - google-cloud-bigtable - 2.2.0 - test - - - com.google.cloud - google-cloud-bigqueryconnection - 2.1.6 - test - junit junit 4.13.2 test - - com.google.truth - truth - 1.1.3 - test - org.projectlombok lombok From c7b8776636acbce9b8f2a41d02550bc7c66e6395 Mon Sep 17 00:00:00 2001 From: Yrvine Thelusma Date: Fri, 18 Feb 2022 18:09:41 +0000 Subject: [PATCH 15/15] Reformat code --- .../cloud_run/event_driven_bq_ingest/pom.xml | 2 +- .../java/com/example/cloudrun/BQAccessor.java | 2 +- .../example/cloudrun/BigQueryLogMetadata.java | 7 +++++-- .../cloudrun/GCSNotificationMetadata.java | 19 ++++++++++--------- .../com/example/cloudrun/GenericMessage.java | 3 +-- .../com/example/cloudrun/JobAccessor.java | 2 +- .../example/cloudrun/PipelineController.java | 7 ++++--- .../example/cloudrun/PubSubMessageParser.java | 3 ++- .../java/com/example/cloudrun/testdata.json | 18 +++++++++--------- 9 files changed, 34 insertions(+), 29 deletions(-) diff --git a/tools/cloud_run/event_driven_bq_ingest/pom.xml b/tools/cloud_run/event_driven_bq_ingest/pom.xml index 570b42bdf..a65cf1419 100644 --- a/tools/cloud_run/event_driven_bq_ingest/pom.xml +++ b/tools/cloud_run/event_driven_bq_ingest/pom.xml @@ -126,7 +126,7 @@ 3.1.4 - gcr.io/PROJECT_ID/pubsub + gcr.io/PROJECT_ID/event_driven_pipeline diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java index ed484c8d0..aaa21972f 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java @@ -32,7 +32,7 @@ public class BQAccessor { private static final String EXTERNAL_TABLE_NAME = "externalTable"; public static void insertIntoBQ( - GCSNotificationMetadata.GCSObjectProperties GCSObjectProperties, String fileFormat) { + GCSNotificationMetadata.GCSObjectProperties GCSObjectProperties, String fileFormat) { // create sourceUri with format --> gs://bucket/project/dataset/table/table* String sourceUri = diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java index eeb0c8c80..61a19afa6 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java @@ -25,7 +25,9 @@ public class BigQueryLogMetadata extends GenericMessage { private String insertId; private ProtoPayload protoPayload; - public BigQueryLogMetadata(@JsonProperty("insertId") String insertId, @JsonProperty(value = "protoPayload", required = true) ProtoPayload protoPayload) { + public BigQueryLogMetadata( + @JsonProperty("insertId") String insertId, + @JsonProperty(value = "protoPayload", required = true) ProtoPayload protoPayload) { this.insertId = insertId; this.protoPayload = protoPayload; } @@ -49,7 +51,8 @@ public void setProtoPayload(ProtoPayload protoPayload) { public static class ProtoPayload { private String resourceName; - public ProtoPayload(@JsonProperty(value = "resourceName", required = true) String resourceName) { + public ProtoPayload( + @JsonProperty(value = "resourceName", required = true) String resourceName) { this.resourceName = resourceName; } diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java index 440ddaeb9..d670c2c51 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java @@ -33,15 +33,6 @@ public void setBucket(String bucket) { this.bucket = bucket; } - @Value - @Builder - public static class GCSObjectProperties { - String bucketId; - String project; - String dataset; - String table; - String triggerFile; - } public GCSObjectProperties getGCSObjectProperties() { String bucketId = this.bucket; String objectId = this.name; @@ -69,4 +60,14 @@ public GCSObjectProperties getGCSObjectProperties() { .triggerFile(triggerFileName) .build(); } + + @Value + @Builder + public static class GCSObjectProperties { + String bucketId; + String project; + String dataset; + String table; + String triggerFile; + } } diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GenericMessage.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GenericMessage.java index 08595bbd4..ac57a7d32 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GenericMessage.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GenericMessage.java @@ -1,4 +1,3 @@ package com.example.cloudrun; -public abstract class GenericMessage { -} +public abstract class GenericMessage {} diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java index 3585adc57..ff5ca3019 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java @@ -71,7 +71,7 @@ public static List checkJobCompeletion(BigQueryLogMetadata bigQueryLogMe } return allSourceUris; } catch (NullPointerException | BigQueryException e) { - log.info("Job not retrieved. \n" + e.toString()); + log.info("Job not retrieved. \n" + e); return null; } } diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java index d4e0a704a..7cbdb9a45 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java @@ -49,7 +49,8 @@ public ResponseEntity receiveMessage(@RequestBody PipelineRequestBody request) { } else if (metadata instanceof GCSNotificationMetadata) { // handle gcs notification GCSNotificationMetadata gcsNotificationMetadata = (GCSNotificationMetadata) metadata; - GCSNotificationMetadata.GCSObjectProperties gcsObjectProperties = gcsNotificationMetadata.getGCSObjectProperties(); + GCSNotificationMetadata.GCSObjectProperties gcsObjectProperties = + gcsNotificationMetadata.getGCSObjectProperties(); if (TRIGGER_FILE_NAME.equals(gcsObjectProperties.getTriggerFile())) { log.info("Found Trigger file, started BQ insert"); BQAccessor.insertIntoBQ(gcsObjectProperties, FILE_FORMAT); @@ -58,8 +59,8 @@ public ResponseEntity receiveMessage(@RequestBody PipelineRequestBody request) { log.info("Not trigger file"); return new ResponseEntity("Not trigger file", HttpStatus.OK); } - } else if(metadata instanceof BigQueryLogMetadata) { - //handle bq job complete notification + } else if (metadata instanceof BigQueryLogMetadata) { + // handle bq job complete notification BigQueryLogMetadata bigQueryLogMetadata = (BigQueryLogMetadata) metadata; List sourceUris = JobAccessor.checkJobCompeletion(bigQueryLogMetadata); sourceUris.forEach((sourceUri -> GCSAccessor.archiveFiles(sourceUri))); diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java index 0ebab66ac..bf5fbd0f1 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java @@ -15,7 +15,8 @@ public static GenericMessage parseMessage(PubsubMessage message) { if (data == null) { return null; } else { - String dataStr = !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : ""; + String dataStr = + !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : ""; ObjectMapper mapper = new ObjectMapper(); mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); try { diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/testdata.json b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/testdata.json index 130112183..12ca354e9 100644 --- a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/testdata.json +++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/testdata.json @@ -1,11 +1,11 @@ { - "message": { - "data": "dGVzdA==", - "attributes": { - "bucketId": "event-driven-pipeline-bucket", - "objectId": "yrvine-rotation-demo/san_francisco_bikeshare/bikeshare_regions/trigger.txt" - }, - "publishTime": "2017-09-25T23:16:42.302Z", - "messageId": "91010751788941" - } + "message": { + "data": "dGVzdA==", + "attributes": { + "bucketId": "event-driven-pipeline-bucket", + "objectId": "yrvine-rotation-demo/san_francisco_bikeshare/bikeshare_regions/trigger.txt" + }, + "publishTime": "2017-09-25T23:16:42.302Z", + "messageId": "91010751788941" + } }