Use shared jar to stage BEAM pipeline if possible (#1008)

* Use shared jar to stage BEAM pipeline if possible

Allow multiple BEAM pipelines with the same classes and dependencies to
share one Uber jar.

Added metadata for BulkDeleteDatastorePipeline.

Updated shell and Cloud Build scripts to stage all pipelines in one
step.
This commit is contained in:
Weimin Yu 2021-03-16 13:19:30 -04:00 committed by GitHub
parent 24db87a4cf
commit 7c3d0dd1a9
6 changed files with 88 additions and 49 deletions

View file

@ -751,7 +751,8 @@ project.tasks.create('initSqlPipeline', JavaExec) {
// nom_build :core:bulkDeleteDatastore --args="--project=domain-registry-crash \ // nom_build :core:bulkDeleteDatastore --args="--project=domain-registry-crash \
// --region=us-central1 --runner=DataflowRunner --kindsToDelete=*" // --region=us-central1 --runner=DataflowRunner --kindsToDelete=*"
createToolTask( createToolTask(
'bulkDeleteDatastore', 'google.registry.beam.datastore.BulkDeletePipeline') 'bulkDeleteDatastore',
'google.registry.beam.datastore.BulkDeleteDatastorePipeline')
project.tasks.create('generateSqlSchema', JavaExec) { project.tasks.create('generateSqlSchema', JavaExec) {
classpath = sourceSets.nonprod.runtimeClasspath classpath = sourceSets.nonprod.runtimeClasspath
@ -782,10 +783,13 @@ generateGoldenImages.finalizedBy(findGoldenImages)
createUberJar('nomulus', 'nomulus', 'google.registry.tools.RegistryTool') createUberJar('nomulus', 'nomulus', 'google.registry.tools.RegistryTool')
// Build the Uber jar shared by all flex-template based BEAM pipelines.
// This packages more code and dependency than necessary. However, without
// restructuring the source tree it is difficult to generate leaner jars.
createUberJar( createUberJar(
'init_sql_pipeline', 'beam_pipeline_common',
'init_sql_pipeline', 'beam_pipeline_common',
'google.registry.beam.initsql.InitSqlPipeline') '')
// A jar with classes and resources from main sourceSet, excluding internal // A jar with classes and resources from main sourceSet, excluding internal
// data. See comments on configurations.nomulus_test above for details. // data. See comments on configurations.nomulus_test above for details.

View file

@ -78,7 +78,7 @@ import org.apache.beam.sdk.values.TupleTagList;
* types in the Datastore using the {@code --numOfKindsHint} argument. If the default value for this * types in the Datastore using the {@code --numOfKindsHint} argument. If the default value for this
* parameter is too low, performance will suffer. * parameter is too low, performance will suffer.
*/ */
public class BulkDeletePipeline { public class BulkDeleteDatastorePipeline {
private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private static final FluentLogger logger = FluentLogger.forEnclosingClass();
// This tool is not for use in our critical projects. // This tool is not for use in our critical projects.
@ -89,7 +89,7 @@ public class BulkDeletePipeline {
private final Pipeline pipeline; private final Pipeline pipeline;
BulkDeletePipeline(BulkDeletePipelineOptions options) { BulkDeleteDatastorePipeline(BulkDeletePipelineOptions options) {
this.options = options; this.options = options;
pipeline = Pipeline.create(options); pipeline = Pipeline.create(options);
} }
@ -303,7 +303,7 @@ public class BulkDeletePipeline {
public static void main(String[] args) { public static void main(String[] args) {
BulkDeletePipelineOptions options = BulkDeletePipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BulkDeletePipelineOptions.class); PipelineOptionsFactory.fromArgs(args).withValidation().as(BulkDeletePipelineOptions.class);
BulkDeletePipeline pipeline = new BulkDeletePipeline(options); BulkDeleteDatastorePipeline pipeline = new BulkDeleteDatastorePipeline(options);
pipeline.run(); pipeline.run();
System.exit(0); System.exit(0);
} }

View file

@ -0,0 +1,20 @@
{
"name": "Bulk Delete Cloud Datastore",
"description": "An Apache Beam batch pipeline that deletes Cloud Datastore in bulk. This is easier to use than the GCP-provided template.",
"parameters": [
{
"name": "kindsToDelete",
"label": "The data KINDs to delete.",
"helpText": "The Datastore KINDs to be deleted. The format may be: the list of kinds to be deleted as a comma-separated string; or '*', which causes all kinds to be deleted."
},
{
"name": "getNumOfKindsHint",
"label": "An estimate of the number of KINDs to be deleted.",
"helpText": "An estimate of the number of KINDs to be deleted. This is recommended if --kindsToDelete is '*' and the default value is too low.",
"is_optional": true,
"regexes": [
"^[1-9][0-9]*$"
]
}
]
}

View file

@ -14,9 +14,9 @@
package google.registry.beam.datastore; package google.registry.beam.datastore;
import static google.registry.beam.datastore.BulkDeletePipeline.discoverEntityKinds; import static google.registry.beam.datastore.BulkDeleteDatastorePipeline.discoverEntityKinds;
import static google.registry.beam.datastore.BulkDeletePipeline.getDeletionTags; import static google.registry.beam.datastore.BulkDeleteDatastorePipeline.getDeletionTags;
import static google.registry.beam.datastore.BulkDeletePipeline.getOneDeletionTag; import static google.registry.beam.datastore.BulkDeleteDatastorePipeline.getOneDeletionTag;
import com.google.common.base.Verify; import com.google.common.base.Verify;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
@ -25,8 +25,8 @@ import com.google.datastore.v1.Entity;
import com.google.datastore.v1.Key; import com.google.datastore.v1.Key;
import com.google.datastore.v1.Key.PathElement; import com.google.datastore.v1.Key.PathElement;
import google.registry.beam.TestPipelineExtension; import google.registry.beam.TestPipelineExtension;
import google.registry.beam.datastore.BulkDeletePipeline.GenerateQueries; import google.registry.beam.datastore.BulkDeleteDatastorePipeline.GenerateQueries;
import google.registry.beam.datastore.BulkDeletePipeline.SplitEntities; import google.registry.beam.datastore.BulkDeleteDatastorePipeline.SplitEntities;
import java.io.Serializable; import java.io.Serializable;
import java.util.Map; import java.util.Map;
import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.PAssert;
@ -44,8 +44,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.extension.RegisterExtension;
/** Unit tests for {@link BulkDeletePipeline}. */ /** Unit tests for {@link BulkDeleteDatastorePipeline}. */
class BulkDeletePipelineTest implements Serializable { class BulkDeleteDatastorePipelineTest implements Serializable {
@RegisterExtension @RegisterExtension
final transient TestPipelineExtension testPipeline = final transient TestPipelineExtension testPipeline =
@ -67,7 +67,7 @@ class BulkDeletePipelineTest implements Serializable {
TupleTagList tags = getDeletionTags(2); TupleTagList tags = getDeletionTags(2);
PCollection<String> kinds = testPipeline.apply("InjectKinds", Create.of("A", "B")); PCollection<String> kinds = testPipeline.apply("InjectKinds", Create.of("A", "B"));
PCollection<KV<String, TupleTag<Entity>>> kindToTagMapping = PCollection<KV<String, TupleTag<Entity>>> kindToTagMapping =
BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags); BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags);
PAssert.thatMap(kindToTagMapping) PAssert.thatMap(kindToTagMapping)
.isEqualTo( .isEqualTo(
ImmutableMap.of( ImmutableMap.of(
@ -81,7 +81,7 @@ class BulkDeletePipelineTest implements Serializable {
TupleTagList tags = getDeletionTags(3); TupleTagList tags = getDeletionTags(3);
PCollection<String> kinds = testPipeline.apply("InjectKinds", Create.of("A", "B")); PCollection<String> kinds = testPipeline.apply("InjectKinds", Create.of("A", "B"));
PCollection<KV<String, TupleTag<Entity>>> kindToTagMapping = PCollection<KV<String, TupleTag<Entity>>> kindToTagMapping =
BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags); BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags);
PAssert.thatMap(kindToTagMapping) PAssert.thatMap(kindToTagMapping)
.isEqualTo( .isEqualTo(
ImmutableMap.of( ImmutableMap.of(
@ -95,7 +95,7 @@ class BulkDeletePipelineTest implements Serializable {
TupleTagList tags = getDeletionTags(2); TupleTagList tags = getDeletionTags(2);
PCollection<String> kinds = testPipeline.apply("InjectKinds", Create.of("A", "B", "C")); PCollection<String> kinds = testPipeline.apply("InjectKinds", Create.of("A", "B", "C"));
PCollection<KV<String, TupleTag<Entity>>> kindToTagMapping = PCollection<KV<String, TupleTag<Entity>>> kindToTagMapping =
BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags); BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags);
PAssert.thatMap(kindToTagMapping) PAssert.thatMap(kindToTagMapping)
.isEqualTo( .isEqualTo(
ImmutableMap.of( ImmutableMap.of(
@ -110,7 +110,7 @@ class BulkDeletePipelineTest implements Serializable {
TupleTagList tags = getDeletionTags(2); TupleTagList tags = getDeletionTags(2);
PCollection<String> kinds = testPipeline.apply("InjectKinds", Create.of("A", "B")); PCollection<String> kinds = testPipeline.apply("InjectKinds", Create.of("A", "B"));
PCollectionView<Map<String, TupleTag<Entity>>> kindToTagMapping = PCollectionView<Map<String, TupleTag<Entity>>> kindToTagMapping =
BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags).apply(View.asMap()); BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags).apply(View.asMap());
Entity entityA = createTestEntity("A", 1); Entity entityA = createTestEntity("A", 1);
Entity entityB = createTestEntity("B", 2); Entity entityB = createTestEntity("B", 2);
PCollection<Entity> entities = PCollection<Entity> entities =

View file

@ -83,11 +83,13 @@ steps:
- -c - -c
- | - |
./release/stage_beam_pipeline.sh \ ./release/stage_beam_pipeline.sh \
init_sql_pipeline \ beam_pipeline_common \
${TAG_NAME} \
${PROJECT_ID} \
google.registry.beam.initsql.InitSqlPipeline \ google.registry.beam.initsql.InitSqlPipeline \
google/registry/beam/init_sql_pipeline_metadata.json \ google/registry/beam/init_sql_pipeline_metadata.json \
${TAG_NAME} \ google.registry.beam.datastore.BulkDeleteDatastorePipeline \
${PROJECT_ID} google/registry/beam/bulk_delete_datastore_pipeline_metadata.json
# Tentatively build and publish Cloud SQL schema jar here, before schema release # Tentatively build and publish Cloud SQL schema jar here, before schema release
# process is finalized. Also publish nomulus:core jars that are needed for # process is finalized. Also publish nomulus:core jars that are needed for
# server/schema compatibility tests. # server/schema compatibility tests.

View file

@ -13,46 +13,58 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# #
# This script builds and stages a flex-template based BEAM pipeline. The # This script builds and stages one or more flex-template based BEAM pipelines
# following parameters are required: # that can share one Uber jar. The following positional parameters are required:
# - pipeline_name: this is also the name of a createUberJar task in :core and # - uberjar_name: this is the name of a createUberJar task in :core and also the
# the name of the jar file created by that task. # name of the Uber jar created by that task. It is expected to be in the
# - main_class: the pipeline's main class name. # ./core/build/libs folder.
# - metadata_pathname: the pipeline's metadata file, which is in the resources
# folder of :core. This parameter should be the relative path from resources.
# - release_tag # - release_tag
# - dev_project # - dev_project
# - main_class: the fully qualified main class name of a pipeline.
# - metadata_pathname: the metadata file of the pipeline named by the previous
# parameter. It is expected to be in the resources folder of :core, and its
# value should be the relative path from resources.
# #
# If successful, this script will generate and upload two artifacts: # If successful, this script will generate and upload two artifacts for each
# pipeline:
# - A template file to # - A template file to
# gs://${dev_project}-deploy/${release_tag}/beam/$(basename metadata_pathname) # gs://${dev_project}-deploy/${release_tag}/beam/$(basename metadata_pathname)
# - A docker image to gcs.io/${dev_project}/beam/${pipeline_name}:{release_tag} # - A docker image to gcs.io/${dev_project}/beam/${pipeline_name}:{release_tag},
# where ${pipeline_name} is the ${main_class}'s simple name converted to
# lower_underscore form.
# #
# Please refer to gcloud documentation for how to start the pipeline. # The staged pipelines may be invoked by gcloud or the flex-template launcher's
# REST API.
set -e set -e
if [ $# -ne 5 ]; if (( "$#" < 5 || $(("$#" % 2)) == 0 ));
then then
echo "Usage: $0 pipeline_name main_class metadata_pathname release_tag" \ echo "Usage: $0 uberjar_name release_tag dev_project " \
"dev_project" "main_class metadata_pathname [ main_class metadata_pathname ] ..."
exit 1 exit 1
fi fi
pipeline_name="$1" uberjar_name="$1"
main_class="$2" release_tag="$2"
metadata_pathname="$3" dev_project="$3"
release_tag="$4" shift 3
dev_project="$5"
maven_gcs_prefix="gcs://domain-registry-maven-repository"
nom_build_dir="$(dirname $0)/.."
${nom_build_dir}/nom_build clean :core:"${uberjar_name}" \
--mavenUrl="${maven_gcs_prefix}"/maven \
--pluginsUrl="${maven_gcs_prefix}"/plugins
while (( "$#" > 0 )); do
main_class="$1"; shift
metadata_pathname="$1"; shift
# Get main_class' simple name in lower_underscore form
pipeline_name=$(
echo "${main_class}" | rev | cut -d. -f1 | rev | \
sed -r 's/([A-Z])/_\L\1/g' | sed 's/^_//')
image_name="gcr.io/${dev_project}/beam/${pipeline_name}" image_name="gcr.io/${dev_project}/beam/${pipeline_name}"
metadata_basename=$(basename ${metadata_pathname}) metadata_basename=$(basename "${metadata_pathname}")
gcs_prefix="gcs://domain-registry-maven-repository"
./gradlew clean :core:"${pipeline_name}" \
-PmavenUrl="${gcs_prefix}"/maven \
-PpluginsUrl="${gcs_prefix}"/plugins
gcloud dataflow flex-template build \ gcloud dataflow flex-template build \
"gs://${dev_project}-deploy/${release_tag}/beam/${metadata_basename}" \ "gs://${dev_project}-deploy/${release_tag}/beam/${metadata_basename}" \
@ -60,6 +72,7 @@ gcloud dataflow flex-template build \
--sdk-language "JAVA" \ --sdk-language "JAVA" \
--flex-template-base-image JAVA11 \ --flex-template-base-image JAVA11 \
--metadata-file "./core/src/main/resources/${metadata_pathname}" \ --metadata-file "./core/src/main/resources/${metadata_pathname}" \
--jar "./core/build/libs/${pipeline_name}.jar" \ --jar "./core/build/libs/${uberjar_name}.jar" \
--env FLEX_TEMPLATE_JAVA_MAIN_CLASS="${main_class}" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="${main_class}" \
--project ${dev_project} --project "${dev_project}"
done