diff --git a/core/build.gradle b/core/build.gradle index 2534beae1..3f3b25948 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -708,6 +708,9 @@ createToolTask( createToolTask( 'validateSqlPipeline', 'google.registry.beam.comparedb.ValidateSqlPipeline') +createToolTask( + 'validateDatastorePipeline', 'google.registry.beam.comparedb.ValidateDatastorePipeline') + createToolTask( 'jpaDemoPipeline', 'google.registry.beam.common.JpaDemoPipeline') diff --git a/core/src/main/java/google/registry/beam/comparedb/LatestDatastoreSnapshotFinder.java b/core/src/main/java/google/registry/beam/comparedb/LatestDatastoreSnapshotFinder.java index e7b293d43..04d54add7 100644 --- a/core/src/main/java/google/registry/beam/comparedb/LatestDatastoreSnapshotFinder.java +++ b/core/src/main/java/google/registry/beam/comparedb/LatestDatastoreSnapshotFinder.java @@ -53,11 +53,11 @@ public class LatestDatastoreSnapshotFinder { } /** - * Finds information of the most recent Datastore snapshot, including the GCS folder of the - * exported data files and the start and stop times of the export. The folder of the CommitLogs is - * also included in the return. + * Finds information of the most recent Datastore snapshot that ends strictly before {@code + * exportEndTimeUpperBound}, including the GCS folder of the exported data files and the start and + * stop times of the export. The folder of the CommitLogs is also included in the return. */ - public DatastoreSnapshotInfo getSnapshotInfo() { + public DatastoreSnapshotInfo getSnapshotInfo(Instant exportEndTimeUpperBound) { String bucketName = RegistryConfig.getDatastoreBackupsBucket().substring("gs://".length()); /** * Find the bucket-relative path to the overall metadata file of the last Datastore export. @@ -65,7 +65,8 @@ public class LatestDatastoreSnapshotFinder { * return value is like * "2021-11-19T06:00:00_76493/2021-11-19T06:00:00_76493.overall_export_metadata". */ - Optional metaFilePathOptional = findMostRecentExportMetadataFile(bucketName, 2); + Optional metaFilePathOptional = + findNewestExportMetadataFileBeforeTime(bucketName, exportEndTimeUpperBound, 2); if (!metaFilePathOptional.isPresent()) { throw new NoSuchElementException("No exports found over the past 2 days."); } @@ -85,8 +86,9 @@ public class LatestDatastoreSnapshotFinder { } /** - * Finds the bucket-relative path of the overall export metadata file, in the given bucket, - * searching back up to {@code lookBackDays} days, including today. + * Finds the latest Datastore export that ends strictly before {@code endTimeUpperBound} and + * returns the bucket-relative path of the overall export metadata file, in the given bucket. The + * search goes back for up to {@code lookBackDays} days in time, including today. * *

The overall export metadata file is the last file created during a Datastore export. All * data has been exported by the creation time of this file. The name of this file, like that of @@ -95,7 +97,8 @@ public class LatestDatastoreSnapshotFinder { *

An example return value: {@code * 2021-11-19T06:00:00_76493/2021-11-19T06:00:00_76493.overall_export_metadata}. */ - private Optional findMostRecentExportMetadataFile(String bucketName, int lookBackDays) { + private Optional findNewestExportMetadataFileBeforeTime( + String bucketName, Instant endTimeUpperBound, int lookBackDays) { DateTime today = clock.nowUtc(); for (int day = 0; day < lookBackDays; day++) { String dateString = today.minusDays(day).toString("yyyy-MM-dd"); @@ -107,7 +110,11 @@ public class LatestDatastoreSnapshotFinder { .sorted(Comparator.naturalOrder().reversed()) .findFirst(); if (metaFilePath.isPresent()) { - return metaFilePath; + BlobInfo blobInfo = gcsUtils.getBlobInfo(BlobId.of(bucketName, metaFilePath.get())); + Instant exportEndTime = new Instant(blobInfo.getCreateTime()); + if (exportEndTime.isBefore(endTimeUpperBound)) { + return metaFilePath; + } } } catch (IOException ioe) { throw new RuntimeException(ioe); diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateDatastorePipeline.java b/core/src/main/java/google/registry/beam/comparedb/ValidateDatastorePipeline.java new file mode 100644 index 000000000..8571e40c3 --- /dev/null +++ b/core/src/main/java/google/registry/beam/comparedb/ValidateDatastorePipeline.java @@ -0,0 +1,83 @@ +// Copyright 2022 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.comparedb; + +import google.registry.beam.common.RegistryPipelineOptions; +import google.registry.beam.common.RegistryPipelineWorkerInitializer; +import google.registry.beam.comparedb.LatestDatastoreSnapshotFinder.DatastoreSnapshotInfo; +import google.registry.model.annotations.DeleteAfterMigration; +import google.registry.persistence.PersistenceModule.JpaTransactionManagerType; +import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; +import java.util.Optional; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.joda.time.DateTime; + +/** + * Validates the asynchronous data replication process from Cloud SQL (primary) to Datastore + * (secondary). + * + *

This pipeline simply compares the snapshots provided by an invoker, which is responsible for + * obtaining two consistent snapshots for the same point of time. + */ +// TODO(weiminyu): Implement the invoker action in a followup PR. +@DeleteAfterMigration +public class ValidateDatastorePipeline { + + private final ValidateDatastorePipelineOptions options; + private final LatestDatastoreSnapshotFinder datastoreSnapshotFinder; + + public ValidateDatastorePipeline( + ValidateDatastorePipelineOptions options, + LatestDatastoreSnapshotFinder datastoreSnapshotFinder) { + this.options = options; + this.datastoreSnapshotFinder = datastoreSnapshotFinder; + } + + void run(Pipeline pipeline) { + DateTime latestCommitLogTime = DateTime.parse(options.getLatestCommitLogTimestamp()); + DatastoreSnapshotInfo mostRecentExport = + datastoreSnapshotFinder.getSnapshotInfo(latestCommitLogTime.toInstant()); + + ValidateSqlPipeline.setupPipeline( + pipeline, + Optional.ofNullable(options.getSqlSnapshotId()), + mostRecentExport, + latestCommitLogTime, + Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse)); + + pipeline.run(); + } + + public static void main(String[] args) { + ValidateDatastorePipelineOptions options = + PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(ValidateDatastorePipelineOptions.class); + RegistryPipelineOptions.validateRegistryPipelineOptions(options); + + // Defensively set important options. + options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_REPEATABLE_READ); + options.setJpaTransactionManagerType(JpaTransactionManagerType.BULK_QUERY); + + // Reuse Dataflow worker initialization code to set up JPA in the pipeline harness. + new RegistryPipelineWorkerInitializer().beforeProcessing(options); + + LatestDatastoreSnapshotFinder datastoreSnapshotFinder = + DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create() + .datastoreSnapshotInfoFinder(); + new ValidateDatastorePipeline(options, datastoreSnapshotFinder).run(Pipeline.create(options)); + } +} diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateDatastorePipelineOptions.java b/core/src/main/java/google/registry/beam/comparedb/ValidateDatastorePipelineOptions.java new file mode 100644 index 000000000..020cd3826 --- /dev/null +++ b/core/src/main/java/google/registry/beam/comparedb/ValidateDatastorePipelineOptions.java @@ -0,0 +1,39 @@ +// Copyright 2022 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.comparedb; + +import google.registry.model.annotations.DeleteAfterMigration; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.Validation; + +/** BEAM pipeline options for {@link ValidateDatastorePipelineOptions}. */ +@DeleteAfterMigration +public interface ValidateDatastorePipelineOptions extends ValidateSqlPipelineOptions { + + @Description( + "The id of the SQL snapshot to be compared with Datastore. " + + "If null, the current state of the SQL database is used.") + @Nullable + String getSqlSnapshotId(); + + void setSqlSnapshotId(String snapshotId); + + @Description("The latest CommitLogs to load, in ISO8601 format.") + @Validation.Required + String getLatestCommitLogTimestamp(); + + void setLatestCommitLogTimestamp(String commitLogEndTimestamp); +} diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java index b9aace11d..f11c8c0ed 100644 --- a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java +++ b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java @@ -26,6 +26,9 @@ import google.registry.beam.common.RegistryPipelineWorkerInitializer; import google.registry.beam.comparedb.LatestDatastoreSnapshotFinder.DatastoreSnapshotInfo; import google.registry.beam.comparedb.ValidateSqlUtils.CompareSqlEntity; import google.registry.model.annotations.DeleteAfterMigration; +import google.registry.model.common.DatabaseMigrationStateSchedule; +import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState; +import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection; import google.registry.model.domain.DomainBase; import google.registry.model.domain.DomainHistory; import google.registry.model.replay.SqlEntity; @@ -35,6 +38,7 @@ import google.registry.persistence.PersistenceModule.JpaTransactionManagerType; import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import google.registry.persistence.transaction.TransactionManagerFactory; import google.registry.util.RequestStatusChecker; +import google.registry.util.SystemClock; import java.io.Serializable; import java.util.Optional; import org.apache.beam.sdk.Pipeline; @@ -76,21 +80,16 @@ public class ValidateSqlPipeline { java.time.Duration.ofSeconds(30); private final ValidateSqlPipelineOptions options; - private final DatastoreSnapshotInfo mostRecentExport; + private final LatestDatastoreSnapshotFinder datastoreSnapshotFinder; public ValidateSqlPipeline( - ValidateSqlPipelineOptions options, DatastoreSnapshotInfo mostRecentExport) { + ValidateSqlPipelineOptions options, LatestDatastoreSnapshotFinder datastoreSnapshotFinder) { this.options = options; - this.mostRecentExport = mostRecentExport; - } - - void run() { - run(Pipeline.create(options)); + this.datastoreSnapshotFinder = datastoreSnapshotFinder; } @VisibleForTesting void run(Pipeline pipeline) { - // TODO(weiminyu): ensure migration stage is DATASTORE_PRIMARY or DATASTORE_PRIMARY_READ_ONLY Optional lock = acquireCommitLogReplayLock(); if (lock.isPresent()) { logger.atInfo().log("Acquired CommitLog Replay lock."); @@ -101,6 +100,8 @@ public class ValidateSqlPipeline { try { DateTime latestCommitLogTime = TransactionManagerFactory.jpaTm().transact(() -> SqlReplayCheckpoint.get()); + DatastoreSnapshotInfo mostRecentExport = + datastoreSnapshotFinder.getSnapshotInfo(latestCommitLogTime.toInstant()); Preconditions.checkState( latestCommitLogTime.isAfter(mostRecentExport.exportInterval().getEnd()), "Cannot recreate Datastore snapshot since target time is in the middle of an export."); @@ -109,7 +110,16 @@ public class ValidateSqlPipeline { lock.ifPresent(Lock::releaseSql); lock = Optional.empty(); - setupPipeline(pipeline, Optional.of(databaseSnapshot.getSnapshotId()), latestCommitLogTime); + logger.atInfo().log( + "Starting comparison with export at %s and latestCommitLogTime at %s", + mostRecentExport.exportDir(), latestCommitLogTime); + + setupPipeline( + pipeline, + Optional.of(databaseSnapshot.getSnapshotId()), + mostRecentExport, + latestCommitLogTime, + Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse)); State state = pipeline.run().waitUntilFinish(); if (!State.DONE.equals(state)) { throw new IllegalStateException("Unexpected pipeline state: " + state); @@ -120,15 +130,16 @@ public class ValidateSqlPipeline { } } - void setupPipeline( - Pipeline pipeline, Optional sqlSnapshotId, DateTime latestCommitLogTime) { + static void setupPipeline( + Pipeline pipeline, + Optional sqlSnapshotId, + DatastoreSnapshotInfo mostRecentExport, + DateTime latestCommitLogTime, + Optional compareStartTime) { pipeline .getCoderRegistry() .registerCoderForClass(SqlEntity.class, SerializableCoder.of(Serializable.class)); - Optional compareStartTime = - Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse); - PCollectionTuple datastoreSnapshot = DatastoreSnapshots.loadDatastoreSnapshotByKind( pipeline, @@ -216,6 +227,10 @@ public class ValidateSqlPipeline { return "ValidateSqlPipeline"; } + LatestDatastoreSnapshotFinder datastoreSnapshotFinder = + DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create() + .datastoreSnapshotInfoFinder(); + @Override public boolean isRunning(String requestLogId) { return true; @@ -234,11 +249,16 @@ public class ValidateSqlPipeline { // Reuse Dataflow worker initialization code to set up JPA in the pipeline harness. new RegistryPipelineWorkerInitializer().beforeProcessing(options); - DatastoreSnapshotInfo mostRecentExport = - DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create() - .datastoreSnapshotInfoFinder() - .getSnapshotInfo(); + MigrationState state = + DatabaseMigrationStateSchedule.getValueAtTime(new SystemClock().nowUtc()); + if (!state.getReplayDirection().equals(ReplayDirection.DATASTORE_TO_SQL)) { + throw new IllegalStateException("This pipeline is not designed for migration phase " + state); + } - new ValidateSqlPipeline(options, mostRecentExport).run(Pipeline.create(options)); + LatestDatastoreSnapshotFinder datastoreSnapshotFinder = + DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create() + .datastoreSnapshotInfoFinder(); + + new ValidateSqlPipeline(options, datastoreSnapshotFinder).run(Pipeline.create(options)); } }