Use shared jar to stage BEAM pipeline if possible (#1008)

* Use shared jar to stage BEAM pipeline if possible

Allow multiple BEAM pipelines with the same classes and dependencies to
share one Uber jar.

Added metadata for BulkDeleteDatastorePipeline.

Updated shell and Cloud Build scripts to stage all pipelines in one
step.
This commit is contained in:
Weimin Yu 2021-03-16 13:19:30 -04:00 committed by GitHub
parent bae5dacbae
commit eb2e1c60ca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 88 additions and 49 deletions

View file

@ -14,9 +14,9 @@
package google.registry.beam.datastore;
import static google.registry.beam.datastore.BulkDeletePipeline.discoverEntityKinds;
import static google.registry.beam.datastore.BulkDeletePipeline.getDeletionTags;
import static google.registry.beam.datastore.BulkDeletePipeline.getOneDeletionTag;
import static google.registry.beam.datastore.BulkDeleteDatastorePipeline.discoverEntityKinds;
import static google.registry.beam.datastore.BulkDeleteDatastorePipeline.getDeletionTags;
import static google.registry.beam.datastore.BulkDeleteDatastorePipeline.getOneDeletionTag;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableMap;
@ -25,8 +25,8 @@ import com.google.datastore.v1.Entity;
import com.google.datastore.v1.Key;
import com.google.datastore.v1.Key.PathElement;
import google.registry.beam.TestPipelineExtension;
import google.registry.beam.datastore.BulkDeletePipeline.GenerateQueries;
import google.registry.beam.datastore.BulkDeletePipeline.SplitEntities;
import google.registry.beam.datastore.BulkDeleteDatastorePipeline.GenerateQueries;
import google.registry.beam.datastore.BulkDeleteDatastorePipeline.SplitEntities;
import java.io.Serializable;
import java.util.Map;
import org.apache.beam.sdk.testing.PAssert;
@ -44,8 +44,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.RegisterExtension;
/** Unit tests for {@link BulkDeletePipeline}. */
class BulkDeletePipelineTest implements Serializable {
/** Unit tests for {@link BulkDeleteDatastorePipeline}. */
class BulkDeleteDatastorePipelineTest implements Serializable {
@RegisterExtension
final transient TestPipelineExtension testPipeline =
@ -67,7 +67,7 @@ class BulkDeletePipelineTest implements Serializable {
TupleTagList tags = getDeletionTags(2);
PCollection<String> kinds = testPipeline.apply("InjectKinds", Create.of("A", "B"));
PCollection<KV<String, TupleTag<Entity>>> kindToTagMapping =
BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags);
BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags);
PAssert.thatMap(kindToTagMapping)
.isEqualTo(
ImmutableMap.of(
@ -81,7 +81,7 @@ class BulkDeletePipelineTest implements Serializable {
TupleTagList tags = getDeletionTags(3);
PCollection<String> kinds = testPipeline.apply("InjectKinds", Create.of("A", "B"));
PCollection<KV<String, TupleTag<Entity>>> kindToTagMapping =
BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags);
BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags);
PAssert.thatMap(kindToTagMapping)
.isEqualTo(
ImmutableMap.of(
@ -95,7 +95,7 @@ class BulkDeletePipelineTest implements Serializable {
TupleTagList tags = getDeletionTags(2);
PCollection<String> kinds = testPipeline.apply("InjectKinds", Create.of("A", "B", "C"));
PCollection<KV<String, TupleTag<Entity>>> kindToTagMapping =
BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags);
BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags);
PAssert.thatMap(kindToTagMapping)
.isEqualTo(
ImmutableMap.of(
@ -110,7 +110,7 @@ class BulkDeletePipelineTest implements Serializable {
TupleTagList tags = getDeletionTags(2);
PCollection<String> kinds = testPipeline.apply("InjectKinds", Create.of("A", "B"));
PCollectionView<Map<String, TupleTag<Entity>>> kindToTagMapping =
BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags).apply(View.asMap());
BulkDeleteDatastorePipeline.mapKindsToDeletionTags(kinds, tags).apply(View.asMap());
Entity entityA = createTestEntity("A", 1);
Entity entityB = createTestEntity("B", 2);
PCollection<Entity> entities =