From 7c3d0dd1a99b76c06579c20ade1947bed0975a26 Mon Sep 17 00:00:00 2001 From: Weimin Yu Date: Tue, 16 Mar 2021 13:19:30 -0400 Subject: [PATCH] Use shared jar to stage BEAM pipeline if possible (#1008) * Use shared jar to stage BEAM pipeline if possible Allow multiple BEAM pipelines with the same classes and dependencies to share one Uber jar. Added metadata for BulkDeleteDatastorePipeline. Updated shell and Cloud Build scripts to stage all pipelines in one step. --- core/build.gradle | 12 ++-- ....java => BulkDeleteDatastorePipeline.java} | 6 +- ...lk_delete_datastore_pipeline_metadata.json | 20 ++++++ ...a => BulkDeleteDatastorePipelineTest.java} | 22 +++--- release/cloudbuild-nomulus.yaml | 8 ++- release/stage_beam_pipeline.sh | 69 +++++++++++-------- 6 files changed, 88 insertions(+), 49 deletions(-) rename core/src/main/java/google/registry/beam/datastore/{BulkDeletePipeline.java => BulkDeleteDatastorePipeline.java} (98%) create mode 100644 core/src/main/resources/google/registry/beam/bulk_delete_datastore_pipeline_metadata.json rename core/src/test/java/google/registry/beam/datastore/{BulkDeletePipelineTest.java => BulkDeleteDatastorePipelineTest.java} (86%) diff --git a/core/build.gradle b/core/build.gradle index 122116217..2cf4f4246 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -751,7 +751,8 @@ project.tasks.create('initSqlPipeline', JavaExec) { // nom_build :core:bulkDeleteDatastore --args="--project=domain-registry-crash \ // --region=us-central1 --runner=DataflowRunner --kindsToDelete=*" createToolTask( - 'bulkDeleteDatastore', 'google.registry.beam.datastore.BulkDeletePipeline') + 'bulkDeleteDatastore', + 'google.registry.beam.datastore.BulkDeleteDatastorePipeline') project.tasks.create('generateSqlSchema', JavaExec) { classpath = sourceSets.nonprod.runtimeClasspath @@ -782,10 +783,13 @@ generateGoldenImages.finalizedBy(findGoldenImages) createUberJar('nomulus', 'nomulus', 'google.registry.tools.RegistryTool') +// Build the Uber jar shared by all flex-template based BEAM pipelines. +// This packages more code and dependency than necessary. However, without +// restructuring the source tree it is difficult to generate leaner jars. createUberJar( - 'init_sql_pipeline', - 'init_sql_pipeline', - 'google.registry.beam.initsql.InitSqlPipeline') + 'beam_pipeline_common', + 'beam_pipeline_common', + '') // A jar with classes and resources from main sourceSet, excluding internal // data. See comments on configurations.nomulus_test above for details. diff --git a/core/src/main/java/google/registry/beam/datastore/BulkDeletePipeline.java b/core/src/main/java/google/registry/beam/datastore/BulkDeleteDatastorePipeline.java similarity index 98% rename from core/src/main/java/google/registry/beam/datastore/BulkDeletePipeline.java rename to core/src/main/java/google/registry/beam/datastore/BulkDeleteDatastorePipeline.java index 096daf2f9..88e7b8ad8 100644 --- a/core/src/main/java/google/registry/beam/datastore/BulkDeletePipeline.java +++ b/core/src/main/java/google/registry/beam/datastore/BulkDeleteDatastorePipeline.java @@ -78,7 +78,7 @@ import org.apache.beam.sdk.values.TupleTagList; * types in the Datastore using the {@code --numOfKindsHint} argument. If the default value for this * parameter is too low, performance will suffer. */ -public class BulkDeletePipeline { +public class BulkDeleteDatastorePipeline { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); // This tool is not for use in our critical projects. @@ -89,7 +89,7 @@ public class BulkDeletePipeline { private final Pipeline pipeline; - BulkDeletePipeline(BulkDeletePipelineOptions options) { + BulkDeleteDatastorePipeline(BulkDeletePipelineOptions options) { this.options = options; pipeline = Pipeline.create(options); } @@ -303,7 +303,7 @@ public class BulkDeletePipeline { public static void main(String[] args) { BulkDeletePipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BulkDeletePipelineOptions.class); - BulkDeletePipeline pipeline = new BulkDeletePipeline(options); + BulkDeleteDatastorePipeline pipeline = new BulkDeleteDatastorePipeline(options); pipeline.run(); System.exit(0); } diff --git a/core/src/main/resources/google/registry/beam/bulk_delete_datastore_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/bulk_delete_datastore_pipeline_metadata.json new file mode 100644 index 000000000..5ea1d534f --- /dev/null +++ b/core/src/main/resources/google/registry/beam/bulk_delete_datastore_pipeline_metadata.json @@ -0,0 +1,20 @@ +{ + "name": "Bulk Delete Cloud Datastore", + "description": "An Apache Beam batch pipeline that deletes Cloud Datastore in bulk. This is easier to use than the GCP-provided template.", + "parameters": [ + { + "name": "kindsToDelete", + "label": "The data KINDs to delete.", + "helpText": "The Datastore KINDs to be deleted. The format may be: the list of kinds to be deleted as a comma-separated string; or '*', which causes all kinds to be deleted." + }, + { + "name": "getNumOfKindsHint", + "label": "An estimate of the number of KINDs to be deleted.", + "helpText": "An estimate of the number of KINDs to be deleted. This is recommended if --kindsToDelete is '*' and the default value is too low.", + "is_optional": true, + "regexes": [ + "^[1-9][0-9]*$" + ] + } + ] +} diff --git a/core/src/test/java/google/registry/beam/datastore/BulkDeletePipelineTest.java b/core/src/test/java/google/registry/beam/datastore/BulkDeleteDatastorePipelineTest.java similarity index 86% rename from core/src/test/java/google/registry/beam/datastore/BulkDeletePipelineTest.java rename to core/src/test/java/google/registry/beam/datastore/BulkDeleteDatastorePipelineTest.java index a34d83821..da7e7214c 100644 --- a/core/src/test/java/google/registry/beam/datastore/BulkDeletePipelineTest.java +++ b/core/src/test/java/google/registry/beam/datastore/BulkDeleteDatastorePipelineTest.java @@ -14,9 +14,9 @@ package google.registry.beam.datastore; -import static google.registry.beam.datastore.BulkDeletePipeline.discoverEntityKinds; -import static google.registry.beam.datastore.BulkDeletePipeline.getDeletionTags; -import static google.registry.beam.datastore.BulkDeletePipeline.getOneDeletionTag; +import static google.registry.beam.datastore.BulkDeleteDatastorePipeline.discoverEntityKinds; +import static google.registry.beam.datastore.BulkDeleteDatastorePipeline.getDeletionTags; +import static google.registry.beam.datastore.BulkDeleteDatastorePipeline.getOneDeletionTag; import com.google.common.base.Verify; import com.google.common.collect.ImmutableMap; @@ -25,8 +25,8 @@ import com.google.datastore.v1.Entity; import com.google.datastore.v1.Key; import com.google.datastore.v1.Key.PathElement; import google.registry.beam.TestPipelineExtension; -import google.registry.beam.datastore.BulkDeletePipeline.GenerateQueries; -import google.registry.beam.datastore.BulkDeletePipeline.SplitEntities; +import google.registry.beam.datastore.BulkDeleteDatastorePipeline.GenerateQueries; +import google.registry.beam.datastore.BulkDeleteDatastorePipeline.SplitEntities; import java.io.Serializable; import java.util.Map; import org.apache.beam.sdk.testing.PAssert; @@ -44,8 +44,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.junit.jupiter.api.extension.RegisterExtension; -/** Unit tests for {@link BulkDeletePipeline}. */ -class BulkDeletePipelineTest implements Serializable { +/** Unit tests for {@link BulkDeleteDatastorePipeline}. */ +class BulkDeleteDatastorePipelineTest implements Serializable { @RegisterExtension final transient TestPipelineExtension testPipeline = @@ -67,7 +67,7 @@ class BulkDeletePipelineTest implements Serializable { TupleTagList tags = getDeletionTags(2); PCollection kinds = testPipeline.apply("InjectKinds", Create.of("A", "B")); PCollection>> kindToTagMapping = - BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags); + BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags); PAssert.thatMap(kindToTagMapping) .isEqualTo( ImmutableMap.of( @@ -81,7 +81,7 @@ class BulkDeletePipelineTest implements Serializable { TupleTagList tags = getDeletionTags(3); PCollection kinds = testPipeline.apply("InjectKinds", Create.of("A", "B")); PCollection>> kindToTagMapping = - BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags); + BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags); PAssert.thatMap(kindToTagMapping) .isEqualTo( ImmutableMap.of( @@ -95,7 +95,7 @@ class BulkDeletePipelineTest implements Serializable { TupleTagList tags = getDeletionTags(2); PCollection kinds = testPipeline.apply("InjectKinds", Create.of("A", "B", "C")); PCollection>> kindToTagMapping = - BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags); + BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags); PAssert.thatMap(kindToTagMapping) .isEqualTo( ImmutableMap.of( @@ -110,7 +110,7 @@ class BulkDeletePipelineTest implements Serializable { TupleTagList tags = getDeletionTags(2); PCollection kinds = testPipeline.apply("InjectKinds", Create.of("A", "B")); PCollectionView>> kindToTagMapping = - BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags).apply(View.asMap()); + BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags).apply(View.asMap()); Entity entityA = createTestEntity("A", 1); Entity entityB = createTestEntity("B", 2); PCollection entities = diff --git a/release/cloudbuild-nomulus.yaml b/release/cloudbuild-nomulus.yaml index 74f983521..3546cf61a 100644 --- a/release/cloudbuild-nomulus.yaml +++ b/release/cloudbuild-nomulus.yaml @@ -83,11 +83,13 @@ steps: - -c - | ./release/stage_beam_pipeline.sh \ - init_sql_pipeline \ + beam_pipeline_common \ + ${TAG_NAME} \ + ${PROJECT_ID} \ google.registry.beam.initsql.InitSqlPipeline \ google/registry/beam/init_sql_pipeline_metadata.json \ - ${TAG_NAME} \ - ${PROJECT_ID} + google.registry.beam.datastore.BulkDeleteDatastorePipeline \ + google/registry/beam/bulk_delete_datastore_pipeline_metadata.json # Tentatively build and publish Cloud SQL schema jar here, before schema release # process is finalized. Also publish nomulus:core jars that are needed for # server/schema compatibility tests. diff --git a/release/stage_beam_pipeline.sh b/release/stage_beam_pipeline.sh index 766b99dca..0fcdf9b3f 100755 --- a/release/stage_beam_pipeline.sh +++ b/release/stage_beam_pipeline.sh @@ -13,53 +13,66 @@ # See the License for the specific language governing permissions and # limitations under the License. # -# This script builds and stages a flex-template based BEAM pipeline. The -# following parameters are required: -# - pipeline_name: this is also the name of a createUberJar task in :core and -# the name of the jar file created by that task. -# - main_class: the pipeline's main class name. -# - metadata_pathname: the pipeline's metadata file, which is in the resources -# folder of :core. This parameter should be the relative path from resources. +# This script builds and stages one or more flex-template based BEAM pipelines +# that can share one Uber jar. The following positional parameters are required: +# - uberjar_name: this is the name of a createUberJar task in :core and also the +# name of the Uber jar created by that task. It is expected to be in the +# ./core/build/libs folder. # - release_tag # - dev_project +# - main_class: the fully qualified main class name of a pipeline. +# - metadata_pathname: the metadata file of the pipeline named by the previous +# parameter. It is expected to be in the resources folder of :core, and its +# value should be the relative path from resources. # -# If successful, this script will generate and upload two artifacts: +# If successful, this script will generate and upload two artifacts for each +# pipeline: # - A template file to # gs://${dev_project}-deploy/${release_tag}/beam/$(basename metadata_pathname) -# - A docker image to gcs.io/${dev_project}/beam/${pipeline_name}:{release_tag} +# - A docker image to gcs.io/${dev_project}/beam/${pipeline_name}:{release_tag}, +# where ${pipeline_name} is the ${main_class}'s simple name converted to +# lower_underscore form. # -# Please refer to gcloud documentation for how to start the pipeline. +# The staged pipelines may be invoked by gcloud or the flex-template launcher's +# REST API. set -e -if [ $# -ne 5 ]; +if (( "$#" < 5 || $(("$#" % 2)) == 0 )); then - echo "Usage: $0 pipeline_name main_class metadata_pathname release_tag" \ - "dev_project" + echo "Usage: $0 uberjar_name release_tag dev_project " \ + "main_class metadata_pathname [ main_class metadata_pathname ] ..." exit 1 fi -pipeline_name="$1" -main_class="$2" -metadata_pathname="$3" -release_tag="$4" -dev_project="$5" +uberjar_name="$1" +release_tag="$2" +dev_project="$3" +shift 3 -image_name="gcr.io/${dev_project}/beam/${pipeline_name}" -metadata_basename=$(basename ${metadata_pathname}) +maven_gcs_prefix="gcs://domain-registry-maven-repository" +nom_build_dir="$(dirname $0)/.." +${nom_build_dir}/nom_build clean :core:"${uberjar_name}" \ + --mavenUrl="${maven_gcs_prefix}"/maven \ + --pluginsUrl="${maven_gcs_prefix}"/plugins -gcs_prefix="gcs://domain-registry-maven-repository" +while (( "$#" > 0 )); do + main_class="$1"; shift + metadata_pathname="$1"; shift + # Get main_class' simple name in lower_underscore form + pipeline_name=$( + echo "${main_class}" | rev | cut -d. -f1 | rev | \ + sed -r 's/([A-Z])/_\L\1/g' | sed 's/^_//') + image_name="gcr.io/${dev_project}/beam/${pipeline_name}" + metadata_basename=$(basename "${metadata_pathname}") -./gradlew clean :core:"${pipeline_name}" \ - -PmavenUrl="${gcs_prefix}"/maven \ - -PpluginsUrl="${gcs_prefix}"/plugins - -gcloud dataflow flex-template build \ + gcloud dataflow flex-template build \ "gs://${dev_project}-deploy/${release_tag}/beam/${metadata_basename}" \ --image-gcr-path "${image_name}:${release_tag}" \ --sdk-language "JAVA" \ --flex-template-base-image JAVA11 \ --metadata-file "./core/src/main/resources/${metadata_pathname}" \ - --jar "./core/build/libs/${pipeline_name}.jar" \ + --jar "./core/build/libs/${uberjar_name}.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="${main_class}" \ - --project ${dev_project} + --project "${dev_project}" +done