diff --git a/core/build.gradle b/core/build.gradle index a699c0318..d53dc0b2d 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -172,6 +172,8 @@ dependencies { compile deps['com.beust:jcommander'] compile deps['com.google.api:gax'] + compile deps['com.google.api.grpc:proto-google-cloud-datastore-v1'] + compile deps['com.google.api.grpc:proto-google-common-protos'] compile deps['com.google.api.grpc:proto-google-cloud-secretmanager-v1'] compile deps['com.google.api-client:google-api-client'] compile deps['com.google.api-client:google-api-client-appengine'] @@ -196,6 +198,8 @@ dependencies { compile deps['com.google.appengine:appengine-remote-api'] compile deps['com.google.auth:google-auth-library-credentials'] compile deps['com.google.auth:google-auth-library-oauth2-http'] + compile deps['com.google.cloud.bigdataoss:util'] + compile deps['com.google.cloud.datastore:datastore-v1-proto-client'] compile deps['com.google.cloud.sql:jdbc-socket-factory-core'] runtimeOnly deps['com.google.cloud.sql:postgres-socket-factory'] compile deps['com.google.cloud:google-cloud-secretmanager'] @@ -736,6 +740,13 @@ project.tasks.create('initSqlPipeline', JavaExec) { } } +// Caller must provide projectId, GCP region, runner, and the kinds to delete +// (comma-separated kind names or '*' for all). E.g.: +// nom_build :core:bulkDeleteDatastore --args="--project=domain-registry-crash \ +// --region=us-central1 --runner=DataflowRunner --kindsToDelete=*" +createToolTask( + 'bulkDeleteDatastore', 'google.registry.beam.datastore.BulkDeletePipeline') + project.tasks.create('generateSqlSchema', JavaExec) { classpath = sourceSets.nonprod.runtimeClasspath main = 'google.registry.tools.DevTool' diff --git a/core/src/main/java/google/registry/beam/datastore/BulkDeletePipeline.java b/core/src/main/java/google/registry/beam/datastore/BulkDeletePipeline.java new file mode 100644 index 000000000..096daf2f9 --- /dev/null +++ b/core/src/main/java/google/registry/beam/datastore/BulkDeletePipeline.java @@ -0,0 +1,330 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.datastore; + +import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.sdk.values.TypeDescriptors.kvs; +import static org.apache.beam.sdk.values.TypeDescriptors.strings; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.flogger.FluentLogger; +import com.google.datastore.v1.Entity; +import java.util.Iterator; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +/** + * A BEAM pipeline that deletes Datastore entities in bulk. + * + *

This pipeline provides an alternative to the GCP builtin template that performs + * the same task. It solves the following performance and usability problems in the builtin + * template: + * + *

+ * + *

A user of this pipeline must specify the types of entities to delete using the {@code + * --kindsToDelete} command line argument. To delete specific entity types, give a comma-separated + * string of their kind names; to delete all data, give {@code "*"}. + * + *

When deleting all data, it is recommended for the user to specify the number of user entity + * types in the Datastore using the {@code --numOfKindsHint} argument. If the default value for this + * parameter is too low, performance will suffer. + */ +public class BulkDeletePipeline { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + // This tool is not for use in our critical projects. + private static final ImmutableSet FORBIDDEN_PROJECTS = + ImmutableSet.of("domain-registry", "domain-registry-sandbox"); + + private final BulkDeletePipelineOptions options; + + private final Pipeline pipeline; + + BulkDeletePipeline(BulkDeletePipelineOptions options) { + this.options = options; + pipeline = Pipeline.create(options); + } + + public void run() { + setupPipeline(); + pipeline.run(); + } + + @SuppressWarnings("deprecation") // org.apache.beam.sdk.transforms.Reshuffle + private void setupPipeline() { + checkState( + !FORBIDDEN_PROJECTS.contains(options.getProject()), + "Bulk delete is forbidden in %s", + options.getProject()); + + // Pre-allocated tags to label entities by kind. In the case of delete-all, we must use a guess. + TupleTagList deletionTags; + PCollection kindsToDelete; + + if (options.getKindsToDelete().equals("*")) { + deletionTags = getDeletionTags(options.getNumOfKindsHint()); + kindsToDelete = + pipeline.apply("DiscoverEntityKinds", discoverEntityKinds(options.getProject())); + } else { + ImmutableList kindsToDeleteParam = parseKindsToDelete(options); + checkState( + !kindsToDeleteParam.contains("*"), + "The --kindsToDelete argument should not contain both '*' and other kinds."); + deletionTags = getDeletionTags(kindsToDeleteParam.size()); + kindsToDelete = pipeline.apply("UseProvidedKinds", Create.of(kindsToDeleteParam)); + } + + // Map each kind to a tag. The "SplitByKind" stage below will group entities by kind using + // this mapping. In practice, this has been effective at avoiding entity group contentions. + PCollectionView>> kindToTagMapping = + mapKindsToDeletionTags(kindsToDelete, deletionTags).apply("GetKindsToTagMap", View.asMap()); + + PCollectionTuple entities = + kindsToDelete + .apply("GenerateQueries", ParDo.of(new GenerateQueries())) + .apply("ReadEntities", DatastoreV1.read().withProjectId(options.getProject())) + .apply( + "SplitByKind", + ParDo.of(new SplitEntities(kindToTagMapping)) + .withSideInputs(kindToTagMapping) + .withOutputTags(getOneDeletionTag("placeholder"), deletionTags)); + + for (TupleTag tag : deletionTags.getAll()) { + entities + .get((TupleTag) tag) + // Reshuffle calls GroupByKey which is one way to trigger load rebalance in the pipeline. + // Using the deprecated "Reshuffle" for convenience given the short life of this tool. + .apply("RebalanceLoad", Reshuffle.viaRandomKey()) + .apply( + "DeleteEntities_" + tag.getId(), + DatastoreIO.v1().deleteEntity().withProjectId(options.getProject())); + } + } + + private static String toKeyOnlyQueryForKind(String kind) { + return "select __key__ from `" + kind + "`"; + } + + /** + * Returns a {@link TupleTag} that retains the generic type parameter and may be used in a + * multi-output {@link ParDo} (e.g. {@link SplitEntities}). + * + *

This method is NOT needed in tests when creating tags for assertions. Simply create them + * with {@code new TupleTag(String)}. + */ + @VisibleForTesting + static TupleTag getOneDeletionTag(String id) { + // The trailing {} is needed to retain generic param type. + return new TupleTag(id) {}; + } + + @VisibleForTesting + static ImmutableList parseKindsToDelete(BulkDeletePipelineOptions options) { + return ImmutableList.copyOf( + Splitter.on(",").omitEmptyStrings().trimResults().split(options.getKindsToDelete().trim())); + } + + /** + * Returns a list of {@code n} {@link TupleTag TupleTags} numbered from {@code 0} to {@code n-1}. + */ + @VisibleForTesting + static TupleTagList getDeletionTags(int n) { + ImmutableList.Builder> builder = new ImmutableList.Builder<>(); + for (int i = 0; i < n; i++) { + builder.add(getOneDeletionTag(String.valueOf(i))); + } + return TupleTagList.of(builder.build()); + } + + /** Returns a {@link PTransform} that finds all entity kinds in Datastore. */ + @VisibleForTesting + static PTransform> discoverEntityKinds(String project) { + return new PTransform>() { + @Override + public PCollection expand(PBegin input) { + // Use the __kind__ table to discover entity kinds. Data in the more informational + // __Stat_Kind__ table may be up to 48-hour stale. + return input + .apply( + "LoadEntityMetaData", + DatastoreIO.v1() + .read() + .withProjectId(project) + .withLiteralGqlQuery("select * from __kind__")) + .apply( + "GetKindNames", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element Entity entity, OutputReceiver out) { + String kind = entity.getKey().getPath(0).getName(); + if (kind.startsWith("_")) { + return; + } + out.output(kind); + } + })); + } + }; + } + + @VisibleForTesting + static PCollection>> mapKindsToDeletionTags( + PCollection kinds, TupleTagList tags) { + // The first two stages send all strings in the 'kinds' PCollection to one worker which + // performs the mapping in the last stage. + return kinds + .apply( + "AssignSingletonKeyToKinds", + MapElements.into(kvs(strings(), strings())).via(kind -> KV.of("", kind))) + .apply("GatherKindsIntoCollection", GroupByKey.create()) + .apply("MapKindsToTag", ParDo.of(new MapKindsToTags(tags))); + } + + /** Transforms each {@code kind} string into a Datastore query for that kind. */ + @VisibleForTesting + static class GenerateQueries extends DoFn { + @ProcessElement + public void processElement(@Element String kind, OutputReceiver out) { + out.output(toKeyOnlyQueryForKind(kind)); + } + } + + private static class MapKindsToTags + extends DoFn>, KV>> { + private final TupleTagList tupleTags; + + MapKindsToTags(TupleTagList tupleTags) { + this.tupleTags = tupleTags; + } + + @ProcessElement + public void processElement( + @Element KV> kv, + OutputReceiver>> out) { + // Sort kinds so that mapping is deterministic. + ImmutableSortedSet sortedKinds = ImmutableSortedSet.copyOf(kv.getValue()); + Iterator kinds = sortedKinds.iterator(); + Iterator> tags = tupleTags.getAll().iterator(); + + while (kinds.hasNext() && tags.hasNext()) { + out.output(KV.of(kinds.next(), (TupleTag) tags.next())); + } + + if (kinds.hasNext()) { + logger.atWarning().log( + "There are more kinds to delete (%s) than our estimate (%s). " + + "Performance may suffer.", + sortedKinds.size(), tupleTags.size()); + } + // Round robin assignment so that mapping is deterministic + while (kinds.hasNext()) { + tags = tupleTags.getAll().iterator(); + while (kinds.hasNext() && tags.hasNext()) { + out.output(KV.of(kinds.next(), (TupleTag) tags.next())); + } + } + } + } + + /** + * {@link DoFn} that splits one {@link PCollection} of mixed kinds into multiple single-kind + * {@code PCollections}. + */ + @VisibleForTesting + static class SplitEntities extends DoFn { + private final PCollectionView>> kindToTagMapping; + + SplitEntities(PCollectionView>> kindToTagMapping) { + super(); + this.kindToTagMapping = kindToTagMapping; + } + + @ProcessElement + public void processElement(ProcessContext context) { + Entity entity = context.element(); + com.google.datastore.v1.Key entityKey = entity.getKey(); + String kind = entityKey.getPath(entityKey.getPathCount() - 1).getKind(); + TupleTag tag = context.sideInput(kindToTagMapping).get(kind); + context.output(tag, entity); + } + } + + public static void main(String[] args) { + BulkDeletePipelineOptions options = + PipelineOptionsFactory.fromArgs(args).withValidation().as(BulkDeletePipelineOptions.class); + BulkDeletePipeline pipeline = new BulkDeletePipeline(options); + pipeline.run(); + System.exit(0); + } + + public interface BulkDeletePipelineOptions extends GcpOptions { + + @Description( + "The Datastore KINDs to be deleted. The format may be:\n" + + "\t- The list of kinds to be deleted as a comma-separated string, or\n" + + "\t- '*', which causes all kinds to be deleted.") + @Validation.Required + String getKindsToDelete(); + + void setKindsToDelete(String kinds); + + @Description( + "An estimate of the number of KINDs to be deleted. " + + "This is recommended if --kindsToDelete is '*' and the default value is too low.") + @Default.Integer(30) + int getNumOfKindsHint(); + + void setNumOfKindsHint(int numOfKindsHint); + } +} diff --git a/core/src/main/java/google/registry/beam/datastore/DatastoreV1.java b/core/src/main/java/google/registry/beam/datastore/DatastoreV1.java new file mode 100644 index 000000000..2a09ebf66 --- /dev/null +++ b/core/src/main/java/google/registry/beam/datastore/DatastoreV1.java @@ -0,0 +1,765 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This class is adapted from the Apache BEAM SDK. The original license may +// be found at +// this link. + +package google.registry.beam.datastore; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Verify.verify; +import static com.google.datastore.v1.PropertyFilter.Operator.EQUAL; +import static com.google.datastore.v1.PropertyOrder.Direction.DESCENDING; +import static com.google.datastore.v1.QueryResultBatch.MoreResultsType.NOT_FINISHED; +import static com.google.datastore.v1.client.DatastoreHelper.makeAndFilter; +import static com.google.datastore.v1.client.DatastoreHelper.makeFilter; +import static com.google.datastore.v1.client.DatastoreHelper.makeOrder; +import static com.google.datastore.v1.client.DatastoreHelper.makeValue; + +import com.google.api.client.http.HttpRequestInitializer; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.auto.value.AutoValue; +import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.flogger.FluentLogger; +import com.google.datastore.v1.Entity; +import com.google.datastore.v1.EntityResult; +import com.google.datastore.v1.GqlQuery; +import com.google.datastore.v1.PartitionId; +import com.google.datastore.v1.Query; +import com.google.datastore.v1.QueryResultBatch; +import com.google.datastore.v1.RunQueryRequest; +import com.google.datastore.v1.RunQueryResponse; +import com.google.datastore.v1.client.Datastore; +import com.google.datastore.v1.client.DatastoreException; +import com.google.datastore.v1.client.DatastoreFactory; +import com.google.datastore.v1.client.DatastoreHelper; +import com.google.datastore.v1.client.DatastoreOptions; +import com.google.datastore.v1.client.QuerySplitter; +import com.google.protobuf.Int32Value; +import com.google.rpc.Code; +import java.io.Serializable; +import java.util.List; +import java.util.NoSuchElementException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; + +/** + * Contains an adaptation of {@link org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read}. See + * {@link MultiRead} for details. + */ +public class DatastoreV1 { + + // A package-private constructor to prevent direct instantiation from outside of this package + DatastoreV1() {} + + /** + * Non-retryable errors. See https://cloud.google.com/datastore/docs/concepts/errors#Error_Codes . + */ + private static final ImmutableSet NON_RETRYABLE_ERRORS = + ImmutableSet.of( + Code.FAILED_PRECONDITION, + Code.INVALID_ARGUMENT, + Code.PERMISSION_DENIED, + Code.UNAUTHENTICATED); + + /** + * Returns an empty {@link MultiRead} builder. Configure the source {@code projectId}, {@code + * query}, and optionally {@code namespace} and {@code numQuerySplits} using {@link + * MultiRead#withProjectId}, {@link MultiRead#withNamespace}, {@link + * MultiRead#withNumQuerySplits}. + */ + public static MultiRead read() { + return new AutoValue_DatastoreV1_MultiRead.Builder().setNumQuerySplits(0).build(); + } + + /** + * A {@link PTransform} that executes every Cloud SQL queries in a {@link PCollection } and reads + * their result rows as {@code Entity} objects. + * + *

This class is adapted from {@link org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read}. It + * uses literal GQL queries in the input {@link PCollection} instead of a constant query provided + * to the builder. Only the {@link #expand} method is modified from the original. Everything else + * including comments have been copied verbatim. + */ + @AutoValue + public abstract static class MultiRead + extends PTransform, PCollection> { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + /** An upper bound on the number of splits for a query. */ + public static final int NUM_QUERY_SPLITS_MAX = 50000; + + /** A lower bound on the number of splits for a query. */ + static final int NUM_QUERY_SPLITS_MIN = 12; + + /** Default bundle size of 64MB. */ + static final long DEFAULT_BUNDLE_SIZE_BYTES = 64L * 1024L * 1024L; + + /** + * Maximum number of results to request per query. + * + *

Must be set, or it may result in an I/O error when querying Cloud Datastore. + */ + static final int QUERY_BATCH_LIMIT = 500; + + public abstract @Nullable String getProjectId(); + + public abstract @Nullable String getNamespace(); + + public abstract int getNumQuerySplits(); + + public abstract @Nullable String getLocalhost(); + + @Override + public abstract String toString(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setProjectId(String projectId); + + abstract Builder setNamespace(String namespace); + + abstract Builder setNumQuerySplits(int numQuerySplits); + + abstract Builder setLocalhost(String localhost); + + abstract MultiRead build(); + } + + /** + * Computes the number of splits to be performed on the given query by querying the estimated + * size from Cloud Datastore. + */ + static int getEstimatedNumSplits(Datastore datastore, Query query, @Nullable String namespace) { + int numSplits; + try { + long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace); + logger.atInfo().log("Estimated size bytes for the query is: %s", estimatedSizeBytes); + numSplits = + (int) + Math.min( + NUM_QUERY_SPLITS_MAX, + Math.round(((double) estimatedSizeBytes) / DEFAULT_BUNDLE_SIZE_BYTES)); + } catch (Exception e) { + logger.atWarning().log("Failed the fetch estimatedSizeBytes for query: %s", query, e); + // Fallback in case estimated size is unavailable. + numSplits = NUM_QUERY_SPLITS_MIN; + } + return Math.max(numSplits, NUM_QUERY_SPLITS_MIN); + } + + /** + * Cloud Datastore system tables with statistics are periodically updated. This method fetches + * the latest timestamp (in microseconds) of statistics update using the {@code __Stat_Total__} + * table. + */ + private static long queryLatestStatisticsTimestamp( + Datastore datastore, @Nullable String namespace) throws DatastoreException { + Query.Builder query = Query.newBuilder(); + // Note: namespace either being null or empty represents the default namespace, in which + // case we treat it as not provided by the user. + if (Strings.isNullOrEmpty(namespace)) { + query.addKindBuilder().setName("__Stat_Total__"); + } else { + query.addKindBuilder().setName("__Stat_Ns_Total__"); + } + query.addOrder(makeOrder("timestamp", DESCENDING)); + query.setLimit(Int32Value.newBuilder().setValue(1)); + RunQueryRequest request = makeRequest(query.build(), namespace); + + RunQueryResponse response = datastore.runQuery(request); + QueryResultBatch batch = response.getBatch(); + if (batch.getEntityResultsCount() == 0) { + throw new NoSuchElementException("Datastore total statistics unavailable"); + } + Entity entity = batch.getEntityResults(0).getEntity(); + return entity.getProperties().get("timestamp").getTimestampValue().getSeconds() * 1000000; + } + + /** Retrieve latest table statistics for a given kind, namespace, and datastore. */ + private static Entity getLatestTableStats( + String ourKind, @Nullable String namespace, Datastore datastore) throws DatastoreException { + long latestTimestamp = queryLatestStatisticsTimestamp(datastore, namespace); + logger.atInfo().log("Latest stats timestamp for kind %s is %s", ourKind, latestTimestamp); + + Query.Builder queryBuilder = Query.newBuilder(); + if (Strings.isNullOrEmpty(namespace)) { + queryBuilder.addKindBuilder().setName("__Stat_Kind__"); + } else { + queryBuilder.addKindBuilder().setName("__Stat_Ns_Kind__"); + } + + queryBuilder.setFilter( + makeAndFilter( + makeFilter("kind_name", EQUAL, makeValue(ourKind).build()).build(), + makeFilter("timestamp", EQUAL, makeValue(latestTimestamp).build()).build())); + + RunQueryRequest request = makeRequest(queryBuilder.build(), namespace); + + long now = System.currentTimeMillis(); + RunQueryResponse response = datastore.runQuery(request); + logger.atFine().log( + "Query for per-kind statistics took %sms", System.currentTimeMillis() - now); + + QueryResultBatch batch = response.getBatch(); + if (batch.getEntityResultsCount() == 0) { + throw new NoSuchElementException( + "Datastore statistics for kind " + ourKind + " unavailable"); + } + return batch.getEntityResults(0).getEntity(); + } + + /** + * Get the estimated size of the data returned by the given query. + * + *

Cloud Datastore provides no way to get a good estimate of how large the result of a query + * entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind is + * specified in the query. + * + *

See https://cloud.google.com/datastore/docs/concepts/stats. + */ + static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace) + throws DatastoreException { + String ourKind = query.getKind(0).getName(); + Entity entity = getLatestTableStats(ourKind, namespace, datastore); + return entity.getProperties().get("entity_bytes").getIntegerValue(); + } + + private static PartitionId.Builder forNamespace(@Nullable String namespace) { + PartitionId.Builder partitionBuilder = PartitionId.newBuilder(); + // Namespace either being null or empty represents the default namespace. + // Datastore Client libraries expect users to not set the namespace proto field in + // either of these cases. + if (!Strings.isNullOrEmpty(namespace)) { + partitionBuilder.setNamespaceId(namespace); + } + return partitionBuilder; + } + + /** Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}. */ + static RunQueryRequest makeRequest(Query query, @Nullable String namespace) { + return RunQueryRequest.newBuilder() + .setQuery(query) + .setPartitionId(forNamespace(namespace)) + .build(); + } + + /** Builds a {@link RunQueryRequest} from the {@code GqlQuery} and {@code namespace}. */ + private static RunQueryRequest makeRequest(GqlQuery gqlQuery, @Nullable String namespace) { + return RunQueryRequest.newBuilder() + .setGqlQuery(gqlQuery) + .setPartitionId(forNamespace(namespace)) + .build(); + } + + /** + * A helper function to get the split queries, taking into account the optional {@code + * namespace}. + */ + private static List splitQuery( + Query query, + @Nullable String namespace, + Datastore datastore, + QuerySplitter querySplitter, + int numSplits) + throws DatastoreException { + // If namespace is set, include it in the split request so splits are calculated accordingly. + return querySplitter.getSplits(query, forNamespace(namespace).build(), numSplits, datastore); + } + + /** + * Translates a Cloud Datastore gql query string to {@link Query}. + * + *

Currently, the only way to translate a gql query string to a Query is to run the query + * against Cloud Datastore and extract the {@code Query} from the response. To prevent reading + * any data, we set the {@code LIMIT} to 0 but if the gql query already has a limit set, we + * catch the exception with {@code INVALID_ARGUMENT} error code and retry the translation + * without the zero limit. + * + *

Note: This may result in reading actual data from Cloud Datastore but the service has a + * cap on the number of entities returned for a single rpc request, so this should not be a + * problem in practice. + */ + private static Query translateGqlQueryWithLimitCheck( + String gql, Datastore datastore, String namespace) throws DatastoreException { + String gqlQueryWithZeroLimit = gql + " LIMIT 0"; + try { + Query translatedQuery = translateGqlQuery(gqlQueryWithZeroLimit, datastore, namespace); + // Clear the limit that we set. + return translatedQuery.toBuilder().clearLimit().build(); + } catch (DatastoreException e) { + // Note: There is no specific error code or message to detect if the query already has a + // limit, so we just check for INVALID_ARGUMENT and assume that that the query might have + // a limit already set. + if (e.getCode() == Code.INVALID_ARGUMENT) { + logger.atWarning().log( + "Failed to translate Gql query '%s': %s", gqlQueryWithZeroLimit, e.getMessage()); + logger.atWarning().log( + "User query might have a limit already set, so trying without zero limit"); + // Retry without the zero limit. + return translateGqlQuery(gql, datastore, namespace); + } else { + throw e; + } + } + } + + /** Translates a gql query string to {@link Query}. */ + private static Query translateGqlQuery(String gql, Datastore datastore, String namespace) + throws DatastoreException { + logger.atInfo().log("Translating gql %s", gql); + GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql).setAllowLiterals(true).build(); + RunQueryRequest req = makeRequest(gqlQuery, namespace); + return datastore.runQuery(req).getQuery(); + } + + /** + * Returns a new {@link MultiRead} that reads from the Cloud Datastore for the specified + * project. + */ + public MultiRead withProjectId(String projectId) { + checkArgument(projectId != null, "projectId can not be null"); + return toBuilder().setProjectId(projectId).build(); + } + + /** Returns a new {@link MultiRead} that reads from the given namespace. */ + public MultiRead withNamespace(String namespace) { + return toBuilder().setNamespace(namespace).build(); + } + + /** + * Returns a new {@link MultiRead} that reads by splitting the given {@code query} into {@code + * numQuerySplits}. + * + *

The semantics for the query splitting is defined below: + * + *

+ */ + public MultiRead withNumQuerySplits(int numQuerySplits) { + return toBuilder() + .setNumQuerySplits(Math.min(Math.max(numQuerySplits, 0), NUM_QUERY_SPLITS_MAX)) + .build(); + } + + /** + * Returns a new {@link MultiRead} that reads from a Datastore Emulator running at the given + * localhost address. + */ + public MultiRead withLocalhost(String localhost) { + return toBuilder().setLocalhost(localhost).build(); + } + + /** Returns Number of entities available for reading. */ + public long getNumEntities( + PipelineOptions options, String ourKind, @Nullable String namespace) { + try { + V1Options v1Options = V1Options.from(getProjectId(), getNamespace(), getLocalhost()); + V1DatastoreFactory datastoreFactory = new V1DatastoreFactory(); + Datastore datastore = + datastoreFactory.getDatastore( + options, v1Options.getProjectId(), v1Options.getLocalhost()); + + Entity entity = getLatestTableStats(ourKind, namespace, datastore); + return entity.getProperties().get("count").getIntegerValue(); + } catch (Exception e) { + return -1; + } + } + + @Override + public PCollection expand(PCollection gqlQueries) { + checkArgument(getProjectId() != null, "projectId cannot be null"); + + V1Options v1Options = V1Options.from(getProjectId(), getNamespace(), getLocalhost()); + + /* + * This composite transform involves the following steps: + * 1. Apply a {@link ParDo} that translates each query in {@code gqlQueries} into a {@code + * query}. + * + * 2. A {@link ParDo} splits the resulting query into {@code numQuerySplits} and + * assign each split query a unique {@code Integer} as the key. The resulting output is + * of the type {@code PCollection>}. + * + * If the value of {@code numQuerySplits} is less than or equal to 0, then the number of + * splits will be computed dynamically based on the size of the data for the {@code query}. + * + * 3. The resulting {@code PCollection} is sharded using a {@link GroupByKey} operation. The + * queries are extracted from they {@code KV>} and flattened to + * output a {@code PCollection}. + * + * 4. In the third step, a {@code ParDo} reads entities for each query and outputs + * a {@code PCollection}. + */ + + PCollection inputQuery = + gqlQueries.apply(ParDo.of(new GqlQueryTranslateFn(v1Options))); + + return inputQuery + .apply("Split", ParDo.of(new SplitQueryFn(v1Options, getNumQuerySplits()))) + .apply("Reshuffle", Reshuffle.viaRandomKey()) + .apply("Read", ParDo.of(new ReadFn(v1Options))); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull(DisplayData.item("projectId", getProjectId()).withLabel("ProjectId")) + .addIfNotNull(DisplayData.item("namespace", getNamespace()).withLabel("Namespace")); + } + + private static class V1Options implements HasDisplayData, Serializable { + private final String project; + private final @Nullable String namespace; + private final @Nullable String localhost; + + private V1Options(String project, @Nullable String namespace, @Nullable String localhost) { + this.project = project; + this.namespace = namespace; + this.localhost = localhost; + } + + public static V1Options from( + String projectId, @Nullable String namespace, @Nullable String localhost) { + return new V1Options(projectId, namespace, localhost); + } + + public String getProjectId() { + return project; + } + + public @Nullable String getNamespace() { + return namespace; + } + + public @Nullable String getLocalhost() { + return localhost; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .addIfNotNull(DisplayData.item("projectId", getProjectId()).withLabel("ProjectId")) + .addIfNotNull(DisplayData.item("namespace", getNamespace()).withLabel("Namespace")); + } + } + + /** A DoFn that translates a Cloud Datastore gql query string to {@code Query}. */ + static class GqlQueryTranslateFn extends DoFn { + private final V1Options v1Options; + private transient Datastore datastore; + private final V1DatastoreFactory datastoreFactory; + + GqlQueryTranslateFn(V1Options options) { + this(options, new V1DatastoreFactory()); + } + + GqlQueryTranslateFn(V1Options options, V1DatastoreFactory datastoreFactory) { + this.v1Options = options; + this.datastoreFactory = datastoreFactory; + } + + @StartBundle + public void startBundle(StartBundleContext c) throws Exception { + datastore = + datastoreFactory.getDatastore( + c.getPipelineOptions(), v1Options.getProjectId(), v1Options.getLocalhost()); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + String gqlQuery = c.element(); + logger.atInfo().log("User query: '%s'", gqlQuery); + Query query = + translateGqlQueryWithLimitCheck(gqlQuery, datastore, v1Options.getNamespace()); + logger.atInfo().log("User gql query translated to Query(%s)", query); + c.output(query); + } + } + + /** + * A {@link DoFn} that splits a given query into multiple sub-queries, assigns them unique keys + * and outputs them as {@link KV}. + */ + private static class SplitQueryFn extends DoFn { + private final V1Options options; + // number of splits to make for a given query + private final int numSplits; + + private final V1DatastoreFactory datastoreFactory; + // Datastore client + private transient Datastore datastore; + // Query splitter + private transient QuerySplitter querySplitter; + + public SplitQueryFn(V1Options options, int numSplits) { + this(options, numSplits, new V1DatastoreFactory()); + } + + private SplitQueryFn(V1Options options, int numSplits, V1DatastoreFactory datastoreFactory) { + this.options = options; + this.numSplits = numSplits; + this.datastoreFactory = datastoreFactory; + } + + @StartBundle + public void startBundle(StartBundleContext c) throws Exception { + datastore = + datastoreFactory.getDatastore( + c.getPipelineOptions(), options.getProjectId(), options.getLocalhost()); + querySplitter = datastoreFactory.getQuerySplitter(); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Query query = c.element(); + + // If query has a user set limit, then do not split. + if (query.hasLimit()) { + c.output(query); + return; + } + + int estimatedNumSplits; + // Compute the estimated numSplits if numSplits is not specified by the user. + if (numSplits <= 0) { + estimatedNumSplits = getEstimatedNumSplits(datastore, query, options.getNamespace()); + } else { + estimatedNumSplits = numSplits; + } + + logger.atInfo().log("Splitting the query into %s splits", estimatedNumSplits); + List querySplits; + try { + querySplits = + splitQuery( + query, options.getNamespace(), datastore, querySplitter, estimatedNumSplits); + } catch (Exception e) { + logger.atWarning().log("Unable to parallelize the given query: %s", query, e); + querySplits = ImmutableList.of(query); + } + + // assign unique keys to query splits. + for (Query subquery : querySplits) { + c.output(subquery); + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.include("options", options); + if (numSplits > 0) { + builder.add( + DisplayData.item("numQuerySplits", numSplits) + .withLabel("Requested number of Query splits")); + } + } + } + + /** A {@link DoFn} that reads entities from Cloud Datastore for each query. */ + private static class ReadFn extends DoFn { + private final V1Options options; + private final V1DatastoreFactory datastoreFactory; + // Datastore client + private transient Datastore datastore; + private final Counter rpcErrors = Metrics.counter(ReadFn.class, "datastoreRpcErrors"); + private final Counter rpcSuccesses = Metrics.counter(ReadFn.class, "datastoreRpcSuccesses"); + private static final int MAX_RETRIES = 5; + private static final FluentBackoff RUNQUERY_BACKOFF = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RETRIES) + .withInitialBackoff(Duration.standardSeconds(5)); + + public ReadFn(V1Options options) { + this(options, new V1DatastoreFactory()); + } + + private ReadFn(V1Options options, V1DatastoreFactory datastoreFactory) { + this.options = options; + this.datastoreFactory = datastoreFactory; + } + + @StartBundle + public void startBundle(StartBundleContext c) throws Exception { + datastore = + datastoreFactory.getDatastore( + c.getPipelineOptions(), options.getProjectId(), options.getLocalhost()); + } + + private RunQueryResponse runQueryWithRetries(RunQueryRequest request) throws Exception { + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = RUNQUERY_BACKOFF.backoff(); + while (true) { + try { + RunQueryResponse response = datastore.runQuery(request); + rpcSuccesses.inc(); + return response; + } catch (DatastoreException exception) { + rpcErrors.inc(); + + if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) { + throw exception; + } + if (!BackOffUtils.next(sleeper, backoff)) { + logger.atSevere().log("Aborting after %s retries.", MAX_RETRIES); + throw exception; + } + } + } + } + + /** Read and output entities for the given query. */ + @ProcessElement + public void processElement(ProcessContext context) throws Exception { + Query query = context.element(); + String namespace = options.getNamespace(); + int userLimit = query.hasLimit() ? query.getLimit().getValue() : Integer.MAX_VALUE; + + boolean moreResults = true; + QueryResultBatch currentBatch = null; + + while (moreResults) { + Query.Builder queryBuilder = query.toBuilder(); + queryBuilder.setLimit( + Int32Value.newBuilder().setValue(Math.min(userLimit, QUERY_BATCH_LIMIT))); + + if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) { + queryBuilder.setStartCursor(currentBatch.getEndCursor()); + } + + RunQueryRequest request = makeRequest(queryBuilder.build(), namespace); + RunQueryResponse response = runQueryWithRetries(request); + + currentBatch = response.getBatch(); + + // MORE_RESULTS_AFTER_LIMIT is not implemented yet: + // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so + // use result count to determine if more results might exist. + int numFetch = currentBatch.getEntityResultsCount(); + if (query.hasLimit()) { + verify( + userLimit >= numFetch, + "Expected userLimit %s >= numFetch %s, because query limit %s must be <= userLimit", + userLimit, + numFetch, + query.getLimit()); + userLimit -= numFetch; + } + + // output all the entities from the current batch. + for (EntityResult entityResult : currentBatch.getEntityResultsList()) { + context.output(entityResult.getEntity()); + } + + // Check if we have more entities to be read. + moreResults = + // User-limit does not exist (so userLimit == MAX_VALUE) and/or has not been satisfied + (userLimit > 0) + // All indications from the API are that there are/may be more results. + && ((numFetch == QUERY_BATCH_LIMIT) + || (currentBatch.getMoreResults() == NOT_FINISHED)); + } + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.include("options", options); + } + } + } + + /** + * A wrapper factory class for Cloud Datastore singleton classes {@link DatastoreFactory} and + * {@link QuerySplitter} + * + *

{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence wrapping + * them under this class, which implements {@link Serializable}. + */ + private static class V1DatastoreFactory implements Serializable { + + /** Builds a Cloud Datastore client for the given pipeline options and project. */ + public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { + return getDatastore(pipelineOptions, projectId, null); + } + + /** + * Builds a Cloud Datastore client for the given pipeline options, project and an optional + * locahost. + */ + public Datastore getDatastore( + PipelineOptions pipelineOptions, String projectId, @Nullable String localhost) { + Credentials credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); + HttpRequestInitializer initializer; + if (credential != null) { + initializer = + new ChainingHttpRequestInitializer( + new HttpCredentialsAdapter(credential), new RetryHttpRequestInitializer()); + } else { + initializer = new RetryHttpRequestInitializer(); + } + + DatastoreOptions.Builder builder = + new DatastoreOptions.Builder().projectId(projectId).initializer(initializer); + + if (localhost != null) { + builder.localHost(localhost); + } else { + builder.host("batch-datastore.googleapis.com"); + } + + return DatastoreFactory.get().create(builder.build()); + } + + /** Builds a Cloud Datastore {@link QuerySplitter}. */ + public QuerySplitter getQuerySplitter() { + return DatastoreHelper.getQuerySplitter(); + } + } +} diff --git a/core/src/test/java/google/registry/beam/datastore/BulkDeletePipelineTest.java b/core/src/test/java/google/registry/beam/datastore/BulkDeletePipelineTest.java new file mode 100644 index 000000000..a34d83821 --- /dev/null +++ b/core/src/test/java/google/registry/beam/datastore/BulkDeletePipelineTest.java @@ -0,0 +1,152 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.datastore; + +import static google.registry.beam.datastore.BulkDeletePipeline.discoverEntityKinds; +import static google.registry.beam.datastore.BulkDeletePipeline.getDeletionTags; +import static google.registry.beam.datastore.BulkDeletePipeline.getOneDeletionTag; + +import com.google.common.base.Verify; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.datastore.v1.Entity; +import com.google.datastore.v1.Key; +import com.google.datastore.v1.Key.PathElement; +import google.registry.beam.TestPipelineExtension; +import google.registry.beam.datastore.BulkDeletePipeline.GenerateQueries; +import google.registry.beam.datastore.BulkDeletePipeline.SplitEntities; +import java.io.Serializable; +import java.util.Map; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** Unit tests for {@link BulkDeletePipeline}. */ +class BulkDeletePipelineTest implements Serializable { + + @RegisterExtension + final transient TestPipelineExtension testPipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); + + @Test + void generateQueries() { + PCollection queries = + testPipeline + .apply("InjectKinds", Create.of("A", "B")) + .apply("GenerateQueries", ParDo.of(new GenerateQueries())); + + PAssert.that(queries).containsInAnyOrder("select __key__ from `A`", "select __key__ from `B`"); + testPipeline.run(); + } + + @Test + void mapKindsToTags() { + TupleTagList tags = getDeletionTags(2); + PCollection kinds = testPipeline.apply("InjectKinds", Create.of("A", "B")); + PCollection>> kindToTagMapping = + BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags); + PAssert.thatMap(kindToTagMapping) + .isEqualTo( + ImmutableMap.of( + "A", new TupleTag("0"), + "B", new TupleTag("1"))); + testPipeline.run(); + } + + @Test + void mapKindsToTags_fewerKindsThanTags() { + TupleTagList tags = getDeletionTags(3); + PCollection kinds = testPipeline.apply("InjectKinds", Create.of("A", "B")); + PCollection>> kindToTagMapping = + BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags); + PAssert.thatMap(kindToTagMapping) + .isEqualTo( + ImmutableMap.of( + "A", new TupleTag("0"), + "B", new TupleTag("1"))); + testPipeline.run(); + } + + @Test + void mapKindsToTags_moreKindsThanTags() { + TupleTagList tags = getDeletionTags(2); + PCollection kinds = testPipeline.apply("InjectKinds", Create.of("A", "B", "C")); + PCollection>> kindToTagMapping = + BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags); + PAssert.thatMap(kindToTagMapping) + .isEqualTo( + ImmutableMap.of( + "A", new TupleTag("0"), + "B", new TupleTag("1"), + "C", new TupleTag("0"))); + testPipeline.run(); + } + + @Test + void splitEntitiesByKind() { + TupleTagList tags = getDeletionTags(2); + PCollection kinds = testPipeline.apply("InjectKinds", Create.of("A", "B")); + PCollectionView>> kindToTagMapping = + BulkDeletePipeline.mapKindsToDeletionTags(kinds, tags).apply(View.asMap()); + Entity entityA = createTestEntity("A", 1); + Entity entityB = createTestEntity("B", 2); + PCollection entities = + testPipeline.apply("InjectEntities", Create.of(entityA, entityB)); + PCollectionTuple allCollections = + entities.apply( + "SplitByKind", + ParDo.of(new SplitEntities(kindToTagMapping)) + .withSideInputs(kindToTagMapping) + .withOutputTags(getOneDeletionTag("placeholder"), tags)); + PAssert.that(allCollections.get((TupleTag) tags.get(0))).containsInAnyOrder(entityA); + PAssert.that(allCollections.get((TupleTag) tags.get(1))).containsInAnyOrder(entityB); + testPipeline.run(); + } + + private static Entity createTestEntity(String kind, long id) { + return Entity.newBuilder() + .setKey(Key.newBuilder().addPath(PathElement.newBuilder().setId(id).setKind(kind))) + .build(); + } + + @Test + @EnabledIfSystemProperty(named = "test.gcp_integration.env", matches = "\\S+") + void discoverKindsFromDatastore() { + String environmentName = System.getProperty("test.gcp_integration.env"); + String project = "domain-registry-" + environmentName; + + PCollection kinds = + testPipeline.apply("DiscoverEntityKinds", discoverEntityKinds(project)); + + PAssert.that(kinds.apply(Count.globally())) + .satisfies( + longs -> { + Verify.verify(Iterables.size(longs) == 1 && Iterables.getFirst(longs, -1L) > 0); + return null; + }); + testPipeline.run(); + } +} diff --git a/core/src/test/java/google/registry/beam/initsql/InitSqlTestUtils.java b/core/src/test/java/google/registry/beam/initsql/InitSqlTestUtils.java index 15cf11996..b23498155 100644 --- a/core/src/test/java/google/registry/beam/initsql/InitSqlTestUtils.java +++ b/core/src/test/java/google/registry/beam/initsql/InitSqlTestUtils.java @@ -92,16 +92,10 @@ public final class InitSqlTestUtils { * VersionedEntities} contains exactly the same elements in the {@code expected} array. * *

This method makes assertions in the pipeline and only use {@link PAssert} on the result. - * This has two advantages over {@code PAssert}: - * - *

    - *
  • It supports assertions on 'containsExactlyElementsIn', which is not available in {@code - * PAssert}. - *
  • It supports assertions on Objectify entities, which {@code PAssert} cannot not do. - * Compared with PAssert-compatible options like {@code google.registry.tools.EntityWrapper} - * or {@link EntityProto}, Objectify entities in Java give better-formatted error messages - * when assertions fail. - *
+ * This way it supports assertions on Objectify entities, which {@code PAssert} cannot do ( since + * we have not implemented Coders for them). Compared with PAssert-compatible options like {@code + * google.registry.tools.EntityWrapper} or {@link EntityProto}, Objectify entities in Java give + * better-formatted error messages when assertions fail. * *

Each {@code expected} {@link KV key-value pair} refers to a versioned state of an Ofy * entity. The {@link KV#getKey key} is the timestamp, while the {@link KV#getValue value} is diff --git a/dependencies.gradle b/dependencies.gradle index 9684cdfe9..44ab04816 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -20,6 +20,9 @@ ext { 'com.beust:jcommander:1.60', 'com.google.api:gax:1.60.0', 'com.google.api.grpc:proto-google-cloud-secretmanager-v1:1.2.3', + // The two below are needed only for Datastore bulk delete pipeline. + 'com.google.api.grpc:proto-google-cloud-datastore-v1:0.85.0', + 'com.google.api.grpc:proto-google-common-protos:2.0.0', 'com.google.api-client:google-api-client-java6:1.27.0', 'com.google.api-client:google-api-client:1.30.8', 'com.google.api-client:google-api-client-appengine:1.30.8', @@ -47,6 +50,9 @@ ext { 'com.google.auto.value:auto-value-annotations:1.6.3', 'com.google.auto.value:auto-value:1.6.3', 'com.google.closure-stylesheets:closure-stylesheets:1.5.0', + // The two below are needed only for Datastore bulk delete pipeline. + 'com.google.cloud.bigdataoss:util:2.1.3', + 'com.google.cloud.datastore:datastore-v1-proto-client:1.6.3', 'com.google.cloud.sql:jdbc-socket-factory-core:1.0.16', 'com.google.cloud.sql:postgres-socket-factory:1.0.16', 'com.google.cloud:google-cloud-core:1.59.0',