diff --git a/core/build.gradle b/core/build.gradle index 122116217..2cf4f4246 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -751,7 +751,8 @@ project.tasks.create('initSqlPipeline', JavaExec) { // nom_build :core:bulkDeleteDatastore --args="--project=domain-registry-crash \ // --region=us-central1 --runner=DataflowRunner --kindsToDelete=*" createToolTask( - 'bulkDeleteDatastore', 'google.registry.beam.datastore.BulkDeletePipeline') + 'bulkDeleteDatastore', + 'google.registry.beam.datastore.BulkDeleteDatastorePipeline') project.tasks.create('generateSqlSchema', JavaExec) { classpath = sourceSets.nonprod.runtimeClasspath @@ -782,10 +783,13 @@ generateGoldenImages.finalizedBy(findGoldenImages) createUberJar('nomulus', 'nomulus', 'google.registry.tools.RegistryTool') +// Build the Uber jar shared by all flex-template based BEAM pipelines. +// This packages more code and dependency than necessary. However, without +// restructuring the source tree it is difficult to generate leaner jars. createUberJar( - 'init_sql_pipeline', - 'init_sql_pipeline', - 'google.registry.beam.initsql.InitSqlPipeline') + 'beam_pipeline_common', + 'beam_pipeline_common', + '') // A jar with classes and resources from main sourceSet, excluding internal // data. See comments on configurations.nomulus_test above for details. diff --git a/core/src/main/java/google/registry/beam/datastore/BulkDeletePipeline.java b/core/src/main/java/google/registry/beam/datastore/BulkDeleteDatastorePipeline.java similarity index 98% rename from core/src/main/java/google/registry/beam/datastore/BulkDeletePipeline.java rename to core/src/main/java/google/registry/beam/datastore/BulkDeleteDatastorePipeline.java index 096daf2f9..88e7b8ad8 100644 --- a/core/src/main/java/google/registry/beam/datastore/BulkDeletePipeline.java +++ b/core/src/main/java/google/registry/beam/datastore/BulkDeleteDatastorePipeline.java @@ -78,7 +78,7 @@ import org.apache.beam.sdk.values.TupleTagList; * types in the Datastore using the {@code --numOfKindsHint} argument. If the default value for this * parameter is too low, performance will suffer. */ -public class BulkDeletePipeline { +public class BulkDeleteDatastorePipeline { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); // This tool is not for use in our critical projects. @@ -89,7 +89,7 @@ public class BulkDeletePipeline { private final Pipeline pipeline; - BulkDeletePipeline(BulkDeletePipelineOptions options) { + BulkDeleteDatastorePipeline(BulkDeletePipelineOptions options) { this.options = options; pipeline = Pipeline.create(options); } @@ -303,7 +303,7 @@ public class BulkDeletePipeline { public static void main(String[] args) { BulkDeletePipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BulkDeletePipelineOptions.class); - BulkDeletePipeline pipeline = new BulkDeletePipeline(options); + BulkDeleteDatastorePipeline pipeline = new BulkDeleteDatastorePipeline(options); pipeline.run(); System.exit(0); } diff --git a/core/src/main/resources/google/registry/beam/bulk_delete_datastore_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/bulk_delete_datastore_pipeline_metadata.json new file mode 100644 index 000000000..5ea1d534f --- /dev/null +++ b/core/src/main/resources/google/registry/beam/bulk_delete_datastore_pipeline_metadata.json @@ -0,0 +1,20 @@ +{ + "name": "Bulk Delete Cloud Datastore", + "description": "An Apache Beam batch pipeline that deletes Cloud Datastore in bulk. This is easier to use than the GCP-provided template.", + "parameters": [ + { + "name": "kindsToDelete", + "label": "The data KINDs to delete.", + "helpText": "The Datastore KINDs to be deleted. The format may be: the list of kinds to be deleted as a comma-separated string; or '*', which causes all kinds to be deleted." + }, + { + "name": "getNumOfKindsHint", + "label": "An estimate of the number of KINDs to be deleted.", + "helpText": "An estimate of the number of KINDs to be deleted. This is recommended if --kindsToDelete is '*' and the default value is too low.", + "is_optional": true, + "regexes": [ + "^[1-9][0-9]*$" + ] + } + ] +} diff --git a/core/src/test/java/google/registry/beam/datastore/BulkDeletePipelineTest.java b/core/src/test/java/google/registry/beam/datastore/BulkDeleteDatastorePipelineTest.java similarity index 86% rename from core/src/test/java/google/registry/beam/datastore/BulkDeletePipelineTest.java rename to core/src/test/java/google/registry/beam/datastore/BulkDeleteDatastorePipelineTest.java index a34d83821..da7e7214c 100644 --- a/core/src/test/java/google/registry/beam/datastore/BulkDeletePipelineTest.java +++ b/core/src/test/java/google/registry/beam/datastore/BulkDeleteDatastorePipelineTest.java @@ -14,9 +14,9 @@ package google.registry.beam.datastore; -import static google.registry.beam.datastore.BulkDeletePipeline.discoverEntityKinds; -import static google.registry.beam.datastore.BulkDeletePipeline.getDeletionTags; -import static google.registry.beam.datastore.BulkDeletePipeline.getOneDeletionTag; +import static google.registry.beam.datastore.BulkDeleteDatastorePipeline.discoverEntityKinds; +import static google.registry.beam.datastore.BulkDeleteDatastorePipeline.getDeletionTags; +import static google.registry.beam.datastore.BulkDeleteDatastorePipeline.getOneDeletionTag; import com.google.common.base.Verify; import com.google.common.collect.ImmutableMap; @@ -25,8 +25,8 @@ import com.google.datastore.v1.Entity; import com.google.datastore.v1.Key; import com.google.datastore.v1.Key.PathElement; import google.registry.beam.TestPipelineExtension; -import google.registry.beam.datastore.BulkDeletePipeline.GenerateQueries; -import google.registry.beam.datastore.BulkDeletePipeline.SplitEntities; +import google.registry.beam.datastore.BulkDeleteDatastorePipeline.GenerateQueries; +import google.registry.beam.datastore.BulkDeleteDatastorePipeline.SplitEntities; import java.io.Serializable; import java.util.Map; import org.apache.beam.sdk.testing.PAssert; @@ -44,8 +44,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.junit.jupiter.api.extension.RegisterExtension; -/** Unit tests for {@link BulkDeletePipeline}. */ -class BulkDeletePipelineTest implements Serializable { +/** Unit tests for {@link BulkDeleteDatastorePipeline}. */ +class BulkDeleteDatastorePipelineTest implements Serializable { @RegisterExtension final transient TestPipelineExtension testPipeline = @@ -67,7 +67,7 @@ class BulkDeletePipelineTest implements Serializable { TupleTagList tags = getDeletionTags(2); PCollection kinds = testPipeline.apply("InjectKinds", Create.of("A", "B")); PCollection>> kindToTagMapping = - BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags); + BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags); PAssert.thatMap(kindToTagMapping) .isEqualTo( ImmutableMap.of( @@ -81,7 +81,7 @@ class BulkDeletePipelineTest implements Serializable { TupleTagList tags = getDeletionTags(3); PCollection kinds = testPipeline.apply("InjectKinds", Create.of("A", "B")); PCollection>> kindToTagMapping = - BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags); + BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags); PAssert.thatMap(kindToTagMapping) .isEqualTo( ImmutableMap.of( @@ -95,7 +95,7 @@ class BulkDeletePipelineTest implements Serializable { TupleTagList tags = getDeletionTags(2); PCollection kinds = testPipeline.apply("InjectKinds", Create.of("A", "B", "C")); PCollection>> kindToTagMapping = - BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags); + BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags); PAssert.thatMap(kindToTagMapping) .isEqualTo( ImmutableMap.of( @@ -110,7 +110,7 @@ class BulkDeletePipelineTest implements Serializable { TupleTagList tags = getDeletionTags(2); PCollection kinds = testPipeline.apply("InjectKinds", Create.of("A", "B")); PCollectionView>> kindToTagMapping = - BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags).apply(View.asMap()); + BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags).apply(View.asMap()); Entity entityA = createTestEntity("A", 1); Entity entityB = createTestEntity("B", 2); PCollection entities = diff --git a/release/cloudbuild-nomulus.yaml b/release/cloudbuild-nomulus.yaml index 74f983521..3546cf61a 100644 --- a/release/cloudbuild-nomulus.yaml +++ b/release/cloudbuild-nomulus.yaml @@ -83,11 +83,13 @@ steps: - -c - | ./release/stage_beam_pipeline.sh \ - init_sql_pipeline \ + beam_pipeline_common \ + ${TAG_NAME} \ + ${PROJECT_ID} \ google.registry.beam.initsql.InitSqlPipeline \ google/registry/beam/init_sql_pipeline_metadata.json \ - ${TAG_NAME} \ - ${PROJECT_ID} + google.registry.beam.datastore.BulkDeleteDatastorePipeline \ + google/registry/beam/bulk_delete_datastore_pipeline_metadata.json # Tentatively build and publish Cloud SQL schema jar here, before schema release # process is finalized. Also publish nomulus:core jars that are needed for # server/schema compatibility tests. diff --git a/release/stage_beam_pipeline.sh b/release/stage_beam_pipeline.sh index 766b99dca..0fcdf9b3f 100755 --- a/release/stage_beam_pipeline.sh +++ b/release/stage_beam_pipeline.sh @@ -13,53 +13,66 @@ # See the License for the specific language governing permissions and # limitations under the License. # -# This script builds and stages a flex-template based BEAM pipeline. The -# following parameters are required: -# - pipeline_name: this is also the name of a createUberJar task in :core and -# the name of the jar file created by that task. -# - main_class: the pipeline's main class name. -# - metadata_pathname: the pipeline's metadata file, which is in the resources -# folder of :core. This parameter should be the relative path from resources. +# This script builds and stages one or more flex-template based BEAM pipelines +# that can share one Uber jar. The following positional parameters are required: +# - uberjar_name: this is the name of a createUberJar task in :core and also the +# name of the Uber jar created by that task. It is expected to be in the +# ./core/build/libs folder. # - release_tag # - dev_project +# - main_class: the fully qualified main class name of a pipeline. +# - metadata_pathname: the metadata file of the pipeline named by the previous +# parameter. It is expected to be in the resources folder of :core, and its +# value should be the relative path from resources. # -# If successful, this script will generate and upload two artifacts: +# If successful, this script will generate and upload two artifacts for each +# pipeline: # - A template file to # gs://${dev_project}-deploy/${release_tag}/beam/$(basename metadata_pathname) -# - A docker image to gcs.io/${dev_project}/beam/${pipeline_name}:{release_tag} +# - A docker image to gcs.io/${dev_project}/beam/${pipeline_name}:{release_tag}, +# where ${pipeline_name} is the ${main_class}'s simple name converted to +# lower_underscore form. # -# Please refer to gcloud documentation for how to start the pipeline. +# The staged pipelines may be invoked by gcloud or the flex-template launcher's +# REST API. set -e -if [ $# -ne 5 ]; +if (( "$#" < 5 || $(("$#" % 2)) == 0 )); then - echo "Usage: $0 pipeline_name main_class metadata_pathname release_tag" \ - "dev_project" + echo "Usage: $0 uberjar_name release_tag dev_project " \ + "main_class metadata_pathname [ main_class metadata_pathname ] ..." exit 1 fi -pipeline_name="$1" -main_class="$2" -metadata_pathname="$3" -release_tag="$4" -dev_project="$5" +uberjar_name="$1" +release_tag="$2" +dev_project="$3" +shift 3 -image_name="gcr.io/${dev_project}/beam/${pipeline_name}" -metadata_basename=$(basename ${metadata_pathname}) +maven_gcs_prefix="gcs://domain-registry-maven-repository" +nom_build_dir="$(dirname $0)/.." +${nom_build_dir}/nom_build clean :core:"${uberjar_name}" \ + --mavenUrl="${maven_gcs_prefix}"/maven \ + --pluginsUrl="${maven_gcs_prefix}"/plugins -gcs_prefix="gcs://domain-registry-maven-repository" +while (( "$#" > 0 )); do + main_class="$1"; shift + metadata_pathname="$1"; shift + # Get main_class' simple name in lower_underscore form + pipeline_name=$( + echo "${main_class}" | rev | cut -d. -f1 | rev | \ + sed -r 's/([A-Z])/_\L\1/g' | sed 's/^_//') + image_name="gcr.io/${dev_project}/beam/${pipeline_name}" + metadata_basename=$(basename "${metadata_pathname}") -./gradlew clean :core:"${pipeline_name}" \ - -PmavenUrl="${gcs_prefix}"/maven \ - -PpluginsUrl="${gcs_prefix}"/plugins - -gcloud dataflow flex-template build \ + gcloud dataflow flex-template build \ "gs://${dev_project}-deploy/${release_tag}/beam/${metadata_basename}" \ --image-gcr-path "${image_name}:${release_tag}" \ --sdk-language "JAVA" \ --flex-template-base-image JAVA11 \ --metadata-file "./core/src/main/resources/${metadata_pathname}" \ - --jar "./core/build/libs/${pipeline_name}.jar" \ + --jar "./core/build/libs/${uberjar_name}.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="${main_class}" \ - --project ${dev_project} + --project "${dev_project}" +done