diff --git a/core/build.gradle b/core/build.gradle index 3d2496e9c..40cdbf31b 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -707,10 +707,7 @@ createToolTask( 'initSqlPipeline', 'google.registry.beam.initsql.InitSqlPipeline') createToolTask( - 'validateSqlPipeline', 'google.registry.beam.comparedb.ValidateSqlPipeline') - -createToolTask( - 'validateDatastorePipeline', 'google.registry.beam.comparedb.ValidateDatastorePipeline') + 'validateDatabasePipeline', 'google.registry.beam.comparedb.ValidateDatabasePipeline') createToolTask( @@ -797,15 +794,10 @@ if (environment == 'alpha') { mainClass: 'google.registry.beam.rde.RdePipeline', metaData : 'google/registry/beam/rde_pipeline_metadata.json' ], - validateDatastore : + validateDatabase : [ - mainClass: 'google.registry.beam.comparedb.ValidateDatastorePipeline', - metaData: 'google/registry/beam/validate_datastore_pipeline_metadata.json' - ], - validateSql : - [ - mainClass: 'google.registry.beam.comparedb.ValidateSqlPipeline', - metaData: 'google/registry/beam/validate_sql_pipeline_metadata.json' + mainClass: 'google.registry.beam.comparedb.ValidateDatabasePipeline', + metaData: 'google/registry/beam/validate_database_pipeline_metadata.json' ], ] project.tasks.create("stageBeamPipelines") { diff --git a/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java b/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java index 158671d18..f17c0f502 100644 --- a/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java +++ b/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java @@ -76,7 +76,10 @@ public class ReplayCommitLogsToSqlAction implements Runnable { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); - private static final Duration LEASE_LENGTH = standardHours(1); + public static final String REPLAY_TO_SQL_LOCK_NAME = + ReplayCommitLogsToSqlAction.class.getSimpleName(); + + public static final Duration REPLAY_TO_SQL_LOCK_LEASE_LENGTH = standardHours(1); // Stop / pause where we are if we've been replaying for more than five minutes to avoid GAE // request timeouts private static final Duration REPLAY_TIMEOUT_DURATION = Duration.standardMinutes(5); @@ -115,7 +118,11 @@ public class ReplayCommitLogsToSqlAction implements Runnable { } Optional lock = Lock.acquireSql( - this.getClass().getSimpleName(), null, LEASE_LENGTH, requestStatusChecker, false); + REPLAY_TO_SQL_LOCK_NAME, + null, + REPLAY_TO_SQL_LOCK_LEASE_LENGTH, + requestStatusChecker, + false); if (!lock.isPresent()) { String message = "Can't acquire SQL commit log replay lock, aborting."; logger.atSevere().log(message); diff --git a/core/src/main/java/google/registry/backup/SyncDatastoreToSqlSnapshotAction.java b/core/src/main/java/google/registry/backup/SyncDatastoreToSqlSnapshotAction.java index 693606a77..17c647826 100644 --- a/core/src/main/java/google/registry/backup/SyncDatastoreToSqlSnapshotAction.java +++ b/core/src/main/java/google/registry/backup/SyncDatastoreToSqlSnapshotAction.java @@ -48,8 +48,7 @@ import org.joda.time.Duration; * * * The caller may release the replication lock upon receiving the response from this action. Please - * refer to {@link google.registry.tools.ValidateDatastoreWithSqlCommand} for more information on - * usage. + * refer to {@link google.registry.tools.ValidateDatastoreCommand} for more information on usage. * *

This action plays SQL transactions up to the user-specified snapshot, creates a new CommitLog * checkpoint, and exports all CommitLogs to GCS up to this checkpoint. The timestamp of this diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java b/core/src/main/java/google/registry/beam/comparedb/ValidateDatabasePipeline.java similarity index 51% rename from core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java rename to core/src/main/java/google/registry/beam/comparedb/ValidateDatabasePipeline.java index f11c8c0ed..8dc2d9a8a 100644 --- a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java +++ b/core/src/main/java/google/registry/beam/comparedb/ValidateDatabasePipeline.java @@ -18,31 +18,20 @@ import static com.google.common.base.Verify.verify; import static org.apache.beam.sdk.values.TypeDescriptors.strings; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; import com.google.common.flogger.FluentLogger; -import google.registry.beam.common.DatabaseSnapshot; +import google.registry.beam.common.RegistryPipelineOptions; import google.registry.beam.common.RegistryPipelineWorkerInitializer; import google.registry.beam.comparedb.LatestDatastoreSnapshotFinder.DatastoreSnapshotInfo; import google.registry.beam.comparedb.ValidateSqlUtils.CompareSqlEntity; import google.registry.model.annotations.DeleteAfterMigration; -import google.registry.model.common.DatabaseMigrationStateSchedule; -import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState; -import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection; import google.registry.model.domain.DomainBase; import google.registry.model.domain.DomainHistory; import google.registry.model.replay.SqlEntity; -import google.registry.model.replay.SqlReplayCheckpoint; -import google.registry.model.server.Lock; import google.registry.persistence.PersistenceModule.JpaTransactionManagerType; import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; -import google.registry.persistence.transaction.TransactionManagerFactory; -import google.registry.util.RequestStatusChecker; -import google.registry.util.SystemClock; import java.io.Serializable; import java.util.Optional; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Flatten; @@ -56,78 +45,46 @@ import org.joda.time.DateTime; import org.joda.time.Duration; /** - * Validates the asynchronous data replication process from Datastore (primary storage) to Cloud SQL - * (secondary storage). + * Validates the asynchronous data replication process between Datastore and Cloud SQL. + * + *

This pipeline is to be launched by {@link google.registry.tools.ValidateDatastoreCommand} or + * {@link google.registry.tools.ValidateSqlCommand}. */ @DeleteAfterMigration -public class ValidateSqlPipeline { +public class ValidateDatabasePipeline { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); /** Specifies the extra CommitLogs to load before the start of a Database export. */ private static final Duration COMMITLOG_START_TIME_MARGIN = Duration.standardMinutes(10); - /** - * Name of the lock used by the commitlog replay process. - * - *

See {@link google.registry.backup.ReplayCommitLogsToSqlAction} for more information. - */ - private static final String COMMITLOG_REPLAY_LOCK_NAME = "ReplayCommitLogsToSqlAction"; - - private static final Duration REPLAY_LOCK_LEASE_LENGTH = Duration.standardHours(1); - private static final java.time.Duration REPLAY_LOCK_ACQUIRE_TIMEOUT = - java.time.Duration.ofMinutes(6); - private static final java.time.Duration REPLAY_LOCK_ACQUIRE_DELAY = - java.time.Duration.ofSeconds(30); - - private final ValidateSqlPipelineOptions options; + private final ValidateDatabasePipelineOptions options; private final LatestDatastoreSnapshotFinder datastoreSnapshotFinder; - public ValidateSqlPipeline( - ValidateSqlPipelineOptions options, LatestDatastoreSnapshotFinder datastoreSnapshotFinder) { + public ValidateDatabasePipeline( + ValidateDatabasePipelineOptions options, + LatestDatastoreSnapshotFinder datastoreSnapshotFinder) { this.options = options; this.datastoreSnapshotFinder = datastoreSnapshotFinder; } @VisibleForTesting void run(Pipeline pipeline) { - Optional lock = acquireCommitLogReplayLock(); - if (lock.isPresent()) { - logger.atInfo().log("Acquired CommitLog Replay lock."); - } else { - throw new RuntimeException("Failed to acquire CommitLog Replay lock."); - } + DateTime latestCommitLogTime = DateTime.parse(options.getLatestCommitLogTimestamp()); + DatastoreSnapshotInfo mostRecentExport = + datastoreSnapshotFinder.getSnapshotInfo(latestCommitLogTime.toInstant()); - try { - DateTime latestCommitLogTime = - TransactionManagerFactory.jpaTm().transact(() -> SqlReplayCheckpoint.get()); - DatastoreSnapshotInfo mostRecentExport = - datastoreSnapshotFinder.getSnapshotInfo(latestCommitLogTime.toInstant()); - Preconditions.checkState( - latestCommitLogTime.isAfter(mostRecentExport.exportInterval().getEnd()), - "Cannot recreate Datastore snapshot since target time is in the middle of an export."); - try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) { - // Eagerly release the commitlog replay lock so that replay can resume. - lock.ifPresent(Lock::releaseSql); - lock = Optional.empty(); + logger.atInfo().log( + "Comparing datastore export at %s and commitlog timestamp %s.", + mostRecentExport.exportDir(), latestCommitLogTime); - logger.atInfo().log( - "Starting comparison with export at %s and latestCommitLogTime at %s", - mostRecentExport.exportDir(), latestCommitLogTime); + setupPipeline( + pipeline, + Optional.ofNullable(options.getSqlSnapshotId()), + mostRecentExport, + latestCommitLogTime, + Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse)); - setupPipeline( - pipeline, - Optional.of(databaseSnapshot.getSnapshotId()), - mostRecentExport, - latestCommitLogTime, - Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse)); - State state = pipeline.run().waitUntilFinish(); - if (!State.DONE.equals(state)) { - throw new IllegalStateException("Unexpected pipeline state: " + state); - } - } - } finally { - lock.ifPresent(Lock::releaseSql); - } + pipeline.run(); } static void setupPipeline( @@ -170,7 +127,7 @@ public class ValidateSqlPipeline { .apply("Combine from both snapshots: " + clazz.getSimpleName(), Flatten.pCollections()) .apply( "Assign primary key to merged " + clazz.getSimpleName(), - WithKeys.of(ValidateSqlPipeline::getPrimaryKeyString).withKeyType(strings())) + WithKeys.of(ValidateDatabasePipeline::getPrimaryKeyString).withKeyType(strings())) .apply("Group by primary key " + clazz.getSimpleName(), GroupByKey.create()) .apply("Compare " + clazz.getSimpleName(), ParDo.of(new CompareSqlEntity())); } @@ -189,76 +146,25 @@ public class ValidateSqlPipeline { return sqlEntity.getPrimaryKeyString(); } - private static Optional acquireCommitLogReplayLock() { - Stopwatch stopwatch = Stopwatch.createStarted(); - while (stopwatch.elapsed().minus(REPLAY_LOCK_ACQUIRE_TIMEOUT).isNegative()) { - Optional lock = tryAcquireCommitLogReplayLock(); - if (lock.isPresent()) { - return lock; - } - logger.atInfo().log("Failed to acquired CommitLog Replay lock. Will retry..."); - try { - Thread.sleep(REPLAY_LOCK_ACQUIRE_DELAY.toMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted."); - } - } - return Optional.empty(); - } - - private static Optional tryAcquireCommitLogReplayLock() { - return Lock.acquireSql( - COMMITLOG_REPLAY_LOCK_NAME, - null, - REPLAY_LOCK_LEASE_LENGTH, - getLockingRequestStatusChecker(), - false); - } - - /** - * Returns a fake implementation of {@link RequestStatusChecker} that is required for lock - * acquisition. The default implementation is AppEngine-specific and is unusable on GCE. - */ - private static RequestStatusChecker getLockingRequestStatusChecker() { - return new RequestStatusChecker() { - @Override - public String getLogId() { - return "ValidateSqlPipeline"; - } - - LatestDatastoreSnapshotFinder datastoreSnapshotFinder = - DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create() - .datastoreSnapshotInfoFinder(); - - @Override - public boolean isRunning(String requestLogId) { - return true; - } - }; - } - public static void main(String[] args) { - ValidateSqlPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).withValidation().as(ValidateSqlPipelineOptions.class); + ValidateDatabasePipelineOptions options = + PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(ValidateDatabasePipelineOptions.class); + RegistryPipelineOptions.validateRegistryPipelineOptions(options); // Defensively set important options. options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_REPEATABLE_READ); options.setJpaTransactionManagerType(JpaTransactionManagerType.BULK_QUERY); - // Reuse Dataflow worker initialization code to set up JPA in the pipeline harness. + // Set up JPA in the pipeline harness (the locally executed part of the main() method). Reuse + // code in RegistryPipelineWorkerInitializer, which only applies to pipeline worker VMs. new RegistryPipelineWorkerInitializer().beforeProcessing(options); - MigrationState state = - DatabaseMigrationStateSchedule.getValueAtTime(new SystemClock().nowUtc()); - if (!state.getReplayDirection().equals(ReplayDirection.DATASTORE_TO_SQL)) { - throw new IllegalStateException("This pipeline is not designed for migration phase " + state); - } - LatestDatastoreSnapshotFinder datastoreSnapshotFinder = DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create() .datastoreSnapshotInfoFinder(); - new ValidateSqlPipeline(options, datastoreSnapshotFinder).run(Pipeline.create(options)); + new ValidateDatabasePipeline(options, datastoreSnapshotFinder).run(Pipeline.create(options)); } } diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateDatastorePipelineOptions.java b/core/src/main/java/google/registry/beam/comparedb/ValidateDatabasePipelineOptions.java similarity index 69% rename from core/src/main/java/google/registry/beam/comparedb/ValidateDatastorePipelineOptions.java rename to core/src/main/java/google/registry/beam/comparedb/ValidateDatabasePipelineOptions.java index 020cd3826..f2222efe0 100644 --- a/core/src/main/java/google/registry/beam/comparedb/ValidateDatastorePipelineOptions.java +++ b/core/src/main/java/google/registry/beam/comparedb/ValidateDatabasePipelineOptions.java @@ -14,14 +14,15 @@ package google.registry.beam.comparedb; +import google.registry.beam.common.RegistryPipelineOptions; import google.registry.model.annotations.DeleteAfterMigration; import javax.annotation.Nullable; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.Validation; -/** BEAM pipeline options for {@link ValidateDatastorePipelineOptions}. */ +/** BEAM pipeline options for {@link ValidateDatabasePipeline}. */ @DeleteAfterMigration -public interface ValidateDatastorePipelineOptions extends ValidateSqlPipelineOptions { +public interface ValidateDatabasePipelineOptions extends RegistryPipelineOptions { @Description( "The id of the SQL snapshot to be compared with Datastore. " @@ -36,4 +37,13 @@ public interface ValidateDatastorePipelineOptions extends ValidateSqlPipelineOpt String getLatestCommitLogTimestamp(); void setLatestCommitLogTimestamp(String commitLogEndTimestamp); + + @Description( + "For history entries and EPP resources, only those modified strictly after this time are " + + "included in comparison. Value is in ISO8601 format. " + + "Other entity types are not affected.") + @Nullable + String getComparisonStartTimestamp(); + + void setComparisonStartTimestamp(String comparisonStartTimestamp); } diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateDatastorePipeline.java b/core/src/main/java/google/registry/beam/comparedb/ValidateDatastorePipeline.java deleted file mode 100644 index 8571e40c3..000000000 --- a/core/src/main/java/google/registry/beam/comparedb/ValidateDatastorePipeline.java +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2022 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.beam.comparedb; - -import google.registry.beam.common.RegistryPipelineOptions; -import google.registry.beam.common.RegistryPipelineWorkerInitializer; -import google.registry.beam.comparedb.LatestDatastoreSnapshotFinder.DatastoreSnapshotInfo; -import google.registry.model.annotations.DeleteAfterMigration; -import google.registry.persistence.PersistenceModule.JpaTransactionManagerType; -import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; -import java.util.Optional; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.joda.time.DateTime; - -/** - * Validates the asynchronous data replication process from Cloud SQL (primary) to Datastore - * (secondary). - * - *

This pipeline simply compares the snapshots provided by an invoker, which is responsible for - * obtaining two consistent snapshots for the same point of time. - */ -// TODO(weiminyu): Implement the invoker action in a followup PR. -@DeleteAfterMigration -public class ValidateDatastorePipeline { - - private final ValidateDatastorePipelineOptions options; - private final LatestDatastoreSnapshotFinder datastoreSnapshotFinder; - - public ValidateDatastorePipeline( - ValidateDatastorePipelineOptions options, - LatestDatastoreSnapshotFinder datastoreSnapshotFinder) { - this.options = options; - this.datastoreSnapshotFinder = datastoreSnapshotFinder; - } - - void run(Pipeline pipeline) { - DateTime latestCommitLogTime = DateTime.parse(options.getLatestCommitLogTimestamp()); - DatastoreSnapshotInfo mostRecentExport = - datastoreSnapshotFinder.getSnapshotInfo(latestCommitLogTime.toInstant()); - - ValidateSqlPipeline.setupPipeline( - pipeline, - Optional.ofNullable(options.getSqlSnapshotId()), - mostRecentExport, - latestCommitLogTime, - Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse)); - - pipeline.run(); - } - - public static void main(String[] args) { - ValidateDatastorePipelineOptions options = - PipelineOptionsFactory.fromArgs(args) - .withValidation() - .as(ValidateDatastorePipelineOptions.class); - RegistryPipelineOptions.validateRegistryPipelineOptions(options); - - // Defensively set important options. - options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_REPEATABLE_READ); - options.setJpaTransactionManagerType(JpaTransactionManagerType.BULK_QUERY); - - // Reuse Dataflow worker initialization code to set up JPA in the pipeline harness. - new RegistryPipelineWorkerInitializer().beforeProcessing(options); - - LatestDatastoreSnapshotFinder datastoreSnapshotFinder = - DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create() - .datastoreSnapshotInfoFinder(); - new ValidateDatastorePipeline(options, datastoreSnapshotFinder).run(Pipeline.create(options)); - } -} diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipelineOptions.java b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipelineOptions.java deleted file mode 100644 index a0f3f46ee..000000000 --- a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipelineOptions.java +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2021 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.beam.comparedb; - -import google.registry.beam.common.RegistryPipelineOptions; -import google.registry.model.annotations.DeleteAfterMigration; -import javax.annotation.Nullable; -import org.apache.beam.sdk.options.Description; - -/** BEAM pipeline options for {@link ValidateSqlPipeline}. */ -@DeleteAfterMigration -public interface ValidateSqlPipelineOptions extends RegistryPipelineOptions { - - @Description( - "For history entries and EPP resources, only those modified strictly after this time are " - + "included in comparison. Value is in ISO8601 format. " - + "Other entity types are not affected.") - @Nullable - String getComparisonStartTimestamp(); - - void setComparisonStartTimestamp(String comparisonStartTimestamp); -} diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlUtils.java b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlUtils.java index 1c84076eb..eeca07e1a 100644 --- a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlUtils.java +++ b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlUtils.java @@ -51,7 +51,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; -/** Helpers for use by {@link ValidateSqlPipeline}. */ +/** Helpers for use by {@link ValidateDatabasePipeline}. */ @DeleteAfterMigration final class ValidateSqlUtils { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); @@ -66,7 +66,7 @@ final class ValidateSqlUtils { * Query template for finding the median value of the {@code history_revision_id} column in one of * the History tables. * - *

The {@link ValidateSqlPipeline} uses this query to parallelize the query to some of the + *

The {@link ValidateDatabasePipeline} uses this query to parallelize the query to some of the * history tables. Although the {@code repo_id} column is the leading column in the primary keys * of these tables, in practice and with production data, division by {@code history_revision_id} * works slightly faster for unknown reasons. diff --git a/core/src/main/java/google/registry/tools/RegistryTool.java b/core/src/main/java/google/registry/tools/RegistryTool.java index 3a64f14ef..2447c2e5b 100644 --- a/core/src/main/java/google/registry/tools/RegistryTool.java +++ b/core/src/main/java/google/registry/tools/RegistryTool.java @@ -123,9 +123,10 @@ public final class RegistryTool { .put("update_server_locks", UpdateServerLocksCommand.class) .put("update_tld", UpdateTldCommand.class) .put("upload_claims_list", UploadClaimsListCommand.class) - .put("validate_datastore_with_sql", ValidateDatastoreWithSqlCommand.class) + .put("validate_datastore", ValidateDatastoreCommand.class) .put("validate_escrow_deposit", ValidateEscrowDepositCommand.class) .put("validate_login_credentials", ValidateLoginCredentialsCommand.class) + .put("validate_sql", ValidateSqlCommand.class) .put("verify_ote", VerifyOteCommand.class) .put("whois_query", WhoisQueryCommand.class) .build(); diff --git a/core/src/main/java/google/registry/tools/RegistryToolComponent.java b/core/src/main/java/google/registry/tools/RegistryToolComponent.java index 29dd85845..e6a4d665e 100644 --- a/core/src/main/java/google/registry/tools/RegistryToolComponent.java +++ b/core/src/main/java/google/registry/tools/RegistryToolComponent.java @@ -171,12 +171,14 @@ interface RegistryToolComponent { void inject(UpdateTldCommand command); - void inject(ValidateDatastoreWithSqlCommand command); + void inject(ValidateDatastoreCommand command); void inject(ValidateEscrowDepositCommand command); void inject(ValidateLoginCredentialsCommand command); + void inject(ValidateSqlCommand command); + void inject(WhoisQueryCommand command); AppEngineConnection appEngineConnection(); diff --git a/core/src/main/java/google/registry/tools/ValidateDatabaseMigrationCommand.java b/core/src/main/java/google/registry/tools/ValidateDatabaseMigrationCommand.java new file mode 100644 index 000000000..b9b9126b2 --- /dev/null +++ b/core/src/main/java/google/registry/tools/ValidateDatabaseMigrationCommand.java @@ -0,0 +1,217 @@ +// Copyright 2022 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.tools; + +import static google.registry.beam.BeamUtils.createJobName; + +import com.beust.jcommander.Parameter; +import com.google.api.services.dataflow.Dataflow; +import com.google.api.services.dataflow.model.Job; +import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter; +import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; +import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; +import com.google.common.base.Ascii; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import google.registry.beam.common.DatabaseSnapshot; +import google.registry.config.RegistryConfig.Config; +import google.registry.tools.params.DateTimeParameter; +import google.registry.util.Clock; +import google.registry.util.RequestStatusChecker; +import google.registry.util.Sleeper; +import java.io.IOException; +import java.util.UUID; +import javax.inject.Inject; +import org.joda.time.DateTime; +import org.joda.time.Duration; + +/** Shared setup for commands that validate the data replication between Datastore and Cloud SQL. */ +abstract class ValidateDatabaseMigrationCommand + implements CommandWithConnection, CommandWithRemoteApi { + + private static final String PIPELINE_NAME = "validate_database_pipeline"; + + private static final String MANUAL_PIPELINE_LAUNCH_COMMAND_TEMPLATE = + "gcloud dataflow flex-template run " + + "\"%s-${USER}-$(date +%%Y%%m%%dt%%H%%M%%S)\" " + + "--template-file-gcs-location %s " + + "--project %s " + + "--region=%s " + + "--worker-machine-type=n2-standard-8 --num-workers=8 " + + "--parameters registryEnvironment=%s " + + "--parameters sqlSnapshotId=%s " + + "--parameters latestCommitLogTimestamp=%s "; + + // States indicating a job is not finished yet. + static final ImmutableSet DATAFLOW_JOB_RUNNING_STATES = + ImmutableSet.of( + "JOB_STATE_UNKNOWN", + "JOB_STATE_RUNNING", + "JOB_STATE_STOPPED", + "JOB_STATE_PENDING", + "JOB_STATE_QUEUED"); + + static final Duration JOB_POLLING_INTERVAL = Duration.standardSeconds(60); + + @Parameter( + names = {"-m", "--manual"}, + description = + "If true, let user launch the comparison pipeline manually out of band. " + + "Command will wait for user key-press to exit after syncing Datastore.") + boolean manualLaunchPipeline; + + @Parameter( + names = {"-r", "--release"}, + description = "The release tag of the BEAM pipeline to run. It defaults to 'live'.") + String release = "live"; + + @Parameter( + names = {"-c", "--comparisonStartTimestamp"}, + description = + "When comparing History and Epp Resource entities, ignore those that have not" + + " changed since this time.", + converter = DateTimeParameter.class) + DateTime comparisonStartTimestamp; + + @Inject Clock clock; + @Inject Dataflow dataflow; + + @Inject + @Config("defaultJobRegion") + String jobRegion; + + @Inject + @Config("beamStagingBucketUrl") + String stagingBucketUrl; + + @Inject + @Config("projectId") + String projectId; + + @Inject Sleeper sleeper; + + AppEngineConnection connection; + + @Override + public void setConnection(AppEngineConnection connection) { + this.connection = connection; + } + + String getDataflowJobStatus(String jobId) { + try { + return dataflow + .projects() + .locations() + .jobs() + .get(projectId, jobRegion, jobId) + .execute() + .getCurrentState(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + void launchPipelineAndWaitUntilFinish( + String pipelineName, DatabaseSnapshot snapshot, String latestCommitTimestamp) { + Job pipelineJob = + launchComparisonPipeline(pipelineName, snapshot.getSnapshotId(), latestCommitTimestamp) + .getJob(); + String jobId = pipelineJob.getId(); + + System.out.printf("Launched comparison pipeline %s (%s).\n", pipelineJob.getName(), jobId); + + while (DATAFLOW_JOB_RUNNING_STATES.contains(getDataflowJobStatus(jobId))) { + sleeper.sleepInterruptibly(JOB_POLLING_INTERVAL); + } + System.out.printf( + "Pipeline ended with %s state. Please check counters for results.\n", + getDataflowJobStatus(jobId)); + } + + String getContainerSpecGcsPath() { + return String.format( + "%s/%s_metadata.json", stagingBucketUrl.replace("live", release), PIPELINE_NAME); + } + + String getManualLaunchCommand( + String jobName, String snapshotId, String latestCommitLogTimestamp) { + String baseCommand = + String.format( + MANUAL_PIPELINE_LAUNCH_COMMAND_TEMPLATE, + jobName, + getContainerSpecGcsPath(), + projectId, + jobRegion, + RegistryToolEnvironment.get().name(), + snapshotId, + latestCommitLogTimestamp); + if (comparisonStartTimestamp == null) { + return baseCommand; + } + return baseCommand + "--parameters comparisonStartTimestamp=" + comparisonStartTimestamp; + } + + LaunchFlexTemplateResponse launchComparisonPipeline( + String jobName, String sqlSnapshotId, String latestCommitLogTimestamp) { + try { + // Hardcode machine type and initial workers to force a quick start. + ImmutableMap.Builder paramsBuilder = + new ImmutableMap.Builder() + .put("workerMachineType", "n2-standard-8") + .put("numWorkers", "8") + .put("sqlSnapshotId", sqlSnapshotId) + .put("latestCommitLogTimestamp", latestCommitLogTimestamp) + .put("registryEnvironment", RegistryToolEnvironment.get().name()); + if (comparisonStartTimestamp != null) { + paramsBuilder.put("comparisonStartTimestamp", comparisonStartTimestamp.toString()); + } + LaunchFlexTemplateParameter parameter = + new LaunchFlexTemplateParameter() + .setJobName(createJobName(Ascii.toLowerCase(jobName).replace('_', '-'), clock)) + .setContainerSpecGcsPath(getContainerSpecGcsPath()) + .setParameters(paramsBuilder.build()); + return dataflow + .projects() + .locations() + .flexTemplates() + .launch( + projectId, jobRegion, new LaunchFlexTemplateRequest().setLaunchParameter(parameter)) + .execute(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * A fake implementation of {@link RequestStatusChecker} for managing SQL-backed locks from + * non-AppEngine platforms. This is only required until the Nomulus server is migrated off + * AppEngine. + */ + static class FakeRequestStatusChecker implements RequestStatusChecker { + + private final String logId = + ValidateDatastoreCommand.class.getSimpleName() + "-" + UUID.randomUUID(); + + @Override + public String getLogId() { + return logId; + } + + @Override + public boolean isRunning(String requestLogId) { + return false; + } + } +} diff --git a/core/src/main/java/google/registry/tools/ValidateDatastoreCommand.java b/core/src/main/java/google/registry/tools/ValidateDatastoreCommand.java new file mode 100644 index 000000000..84897be7b --- /dev/null +++ b/core/src/main/java/google/registry/tools/ValidateDatastoreCommand.java @@ -0,0 +1,105 @@ +// Copyright 2022 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.tools; + +import static google.registry.model.replay.ReplicateToDatastoreAction.REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH; +import static google.registry.model.replay.ReplicateToDatastoreAction.REPLICATE_TO_DATASTORE_LOCK_NAME; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.beust.jcommander.Parameters; +import com.google.common.collect.ImmutableMap; +import com.google.common.net.MediaType; +import google.registry.backup.SyncDatastoreToSqlSnapshotAction; +import google.registry.beam.common.DatabaseSnapshot; +import google.registry.model.common.DatabaseMigrationStateSchedule; +import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState; +import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection; +import google.registry.model.server.Lock; +import google.registry.request.Action.Service; +import java.util.Optional; + +/** + * Validates asynchronously replicated data from the primary Cloud SQL database to Datastore. + * + *

This command suspends the replication process (by acquiring the replication lock), take a + * snapshot of the Cloud SQL database, invokes a Nomulus server action to sync Datastore to this + * snapshot (See {@link SyncDatastoreToSqlSnapshotAction} for details), and finally launches a BEAM + * pipeline to compare Datastore with the given SQL snapshot. + * + *

This command does not lock up the SQL database. Normal processing can proceed. + */ +@Parameters(commandDescription = "Validates Datastore with the primary Cloud SQL database.") +public class ValidateDatastoreCommand extends ValidateDatabaseMigrationCommand { + + private static final Service NOMULUS_SERVICE = Service.BACKEND; + + @Override + public void run() throws Exception { + MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc()); + if (!state.getReplayDirection().equals(ReplayDirection.SQL_TO_DATASTORE)) { + throw new IllegalStateException("Cannot validate Datastore in migration step " + state); + } + Optional lock = + Lock.acquireSql( + REPLICATE_TO_DATASTORE_LOCK_NAME, + null, + REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH, + new FakeRequestStatusChecker(), + false); + if (!lock.isPresent()) { + throw new IllegalStateException("Cannot acquire the async propagation lock."); + } + + try { + try (DatabaseSnapshot snapshot = DatabaseSnapshot.createSnapshot()) { + System.out.printf("Obtained snapshot %s\n", snapshot.getSnapshotId()); + AppEngineConnection connectionToService = connection.withService(NOMULUS_SERVICE); + String response = + connectionToService.sendPostRequest( + getNomulusEndpoint(snapshot.getSnapshotId()), + ImmutableMap.of(), + MediaType.PLAIN_TEXT_UTF_8, + "".getBytes(UTF_8)); + System.out.println(response); + + lock.ifPresent(Lock::releaseSql); + lock = Optional.empty(); + + // See SyncDatastoreToSqlSnapshotAction for response format. + String latestCommitTimestamp = + response.substring(response.lastIndexOf('(') + 1, response.lastIndexOf(')')); + + if (manualLaunchPipeline) { + System.out.printf( + "To launch the pipeline manually, use the following command:\n%s\n", + getManualLaunchCommand( + "validate-datastore", snapshot.getSnapshotId(), latestCommitTimestamp)); + + System.out.print("\nEnter any key to continue when the pipeline ends:"); + System.in.read(); + } else { + launchPipelineAndWaitUntilFinish("validate-datastore", snapshot, latestCommitTimestamp); + } + } + } finally { + lock.ifPresent(Lock::releaseSql); + } + } + + private static String getNomulusEndpoint(String sqlSnapshotId) { + return String.format( + "%s?sqlSnapshotId=%s", SyncDatastoreToSqlSnapshotAction.PATH, sqlSnapshotId); + } +} diff --git a/core/src/main/java/google/registry/tools/ValidateDatastoreWithSqlCommand.java b/core/src/main/java/google/registry/tools/ValidateDatastoreWithSqlCommand.java deleted file mode 100644 index a1ba5d941..000000000 --- a/core/src/main/java/google/registry/tools/ValidateDatastoreWithSqlCommand.java +++ /dev/null @@ -1,229 +0,0 @@ -// Copyright 2022 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.tools; - -import static google.registry.beam.BeamUtils.createJobName; -import static google.registry.model.replay.ReplicateToDatastoreAction.REPLICATE_TO_DATASTORE_LOCK_NAME; -import static java.nio.charset.StandardCharsets.UTF_8; - -import com.beust.jcommander.Parameter; -import com.beust.jcommander.Parameters; -import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.dataflow.model.Job; -import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter; -import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; -import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.net.MediaType; -import google.registry.backup.SyncDatastoreToSqlSnapshotAction; -import google.registry.beam.common.DatabaseSnapshot; -import google.registry.config.RegistryConfig.Config; -import google.registry.model.common.DatabaseMigrationStateSchedule; -import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState; -import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection; -import google.registry.model.replay.ReplicateToDatastoreAction; -import google.registry.model.server.Lock; -import google.registry.request.Action.Service; -import google.registry.util.Clock; -import google.registry.util.RequestStatusChecker; -import google.registry.util.Sleeper; -import java.io.IOException; -import java.util.Optional; -import java.util.UUID; -import javax.inject.Inject; -import org.joda.time.Duration; - -/** - * Validates asynchronously replicated data from the primary Cloud SQL database to Datastore. - * - *

This command suspends the replication process (by acquiring the replication lock), take a - * snapshot of the Cloud SQL database, invokes a Nomulus server action to sync Datastore to this - * snapshot (See {@link SyncDatastoreToSqlSnapshotAction} for details), and finally launches a BEAM - * pipeline to compare Datastore with the given SQL snapshot. - * - *

This command does not lock up the SQL database. Normal processing can proceed. - */ -@Parameters(commandDescription = "Validates Datastore with Cloud SQL.") -public class ValidateDatastoreWithSqlCommand - implements CommandWithConnection, CommandWithRemoteApi { - - private static final Service NOMULUS_SERVICE = Service.BACKEND; - - private static final String PIPELINE_NAME = "validate_datastore_pipeline"; - - // States indicating a job is not finished yet. - private static final ImmutableSet DATAFLOW_JOB_RUNNING_STATES = - ImmutableSet.of( - "JOB_STATE_RUNNING", "JOB_STATE_STOPPED", "JOB_STATE_PENDING", "JOB_STATE_QUEUED"); - - private static final Duration JOB_POLLING_INTERVAL = Duration.standardSeconds(60); - - @Parameter( - names = {"-m", "--manual"}, - description = - "If true, let user launch the comparison pipeline manually out of band. " - + "Command will wait for user key-press to exit after syncing Datastore.") - boolean manualLaunchPipeline; - - @Inject Clock clock; - @Inject Dataflow dataflow; - - @Inject - @Config("defaultJobRegion") - String jobRegion; - - @Inject - @Config("beamStagingBucketUrl") - String stagingBucketUrl; - - @Inject - @Config("projectId") - String projectId; - - @Inject Sleeper sleeper; - - private AppEngineConnection connection; - - @Override - public void setConnection(AppEngineConnection connection) { - this.connection = connection; - } - - @Override - public void run() throws Exception { - MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc()); - if (!state.getReplayDirection().equals(ReplayDirection.SQL_TO_DATASTORE)) { - throw new IllegalStateException("Cannot sync Datastore to SQL in migration step " + state); - } - Optional lock = - Lock.acquireSql( - REPLICATE_TO_DATASTORE_LOCK_NAME, - null, - ReplicateToDatastoreAction.REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH, - new FakeRequestStatusChecker(), - false); - if (!lock.isPresent()) { - throw new IllegalStateException("Cannot acquire the async propagation lock."); - } - - try { - try (DatabaseSnapshot snapshot = DatabaseSnapshot.createSnapshot()) { - System.out.printf("Obtained snapshot %s\n", snapshot.getSnapshotId()); - AppEngineConnection connectionToService = connection.withService(NOMULUS_SERVICE); - String response = - connectionToService.sendPostRequest( - getNomulusEndpoint(snapshot.getSnapshotId()), - ImmutableMap.of(), - MediaType.PLAIN_TEXT_UTF_8, - "".getBytes(UTF_8)); - System.out.println(response); - - lock.ifPresent(Lock::releaseSql); - lock = Optional.empty(); - - // See SyncDatastoreToSqlSnapshotAction for response format. - String latestCommitTimestamp = - response.substring(response.lastIndexOf('(') + 1, response.lastIndexOf(')')); - - if (manualLaunchPipeline) { - System.out.print("\nEnter any key to continue when the pipeline ends:"); - System.in.read(); - } else { - Job pipelineJob = - launchComparisonPipeline(snapshot.getSnapshotId(), latestCommitTimestamp).getJob(); - String jobId = pipelineJob.getId(); - - System.out.printf( - "Launched comparison pipeline %s (%s).\n", pipelineJob.getName(), jobId); - - while (DATAFLOW_JOB_RUNNING_STATES.contains(getDataflowJobStatus(jobId))) { - sleeper.sleepInterruptibly(JOB_POLLING_INTERVAL); - } - System.out.printf( - "Pipeline ended with %s state. Please check counters for results.\n", - getDataflowJobStatus(jobId)); - } - } - } finally { - lock.ifPresent(Lock::releaseSql); - } - } - - private static String getNomulusEndpoint(String sqlSnapshotId) { - return String.format( - "%s?sqlSnapshotId=%s", SyncDatastoreToSqlSnapshotAction.PATH, sqlSnapshotId); - } - - private LaunchFlexTemplateResponse launchComparisonPipeline( - String sqlSnapshotId, String latestCommitLogTimestamp) { - try { - LaunchFlexTemplateParameter parameter = - new LaunchFlexTemplateParameter() - .setJobName(createJobName("validate-datastore", clock)) - .setContainerSpecGcsPath( - String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME)) - .setParameters( - ImmutableMap.of( - "sqlSnapshotId", - sqlSnapshotId, - "latestCommitLogTimestamp", - latestCommitLogTimestamp, - "registryEnvironment", - RegistryToolEnvironment.get().name())); - return dataflow - .projects() - .locations() - .flexTemplates() - .launch( - projectId, jobRegion, new LaunchFlexTemplateRequest().setLaunchParameter(parameter)) - .execute(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private String getDataflowJobStatus(String jobId) { - try { - return dataflow - .projects() - .locations() - .jobs() - .get(projectId, jobRegion, jobId) - .execute() - .getCurrentState(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * A fake implementation of {@link RequestStatusChecker} for managing SQL-backed locks from - * non-AppEngine platforms. This is only required until the Nomulus server is migrated off - * AppEngine. - */ - static class FakeRequestStatusChecker implements RequestStatusChecker { - - @Override - public String getLogId() { - return ValidateDatastoreWithSqlCommand.class.getSimpleName() + "-" + UUID.randomUUID(); - } - - @Override - public boolean isRunning(String requestLogId) { - return false; - } - } -} diff --git a/core/src/main/java/google/registry/tools/ValidateSqlCommand.java b/core/src/main/java/google/registry/tools/ValidateSqlCommand.java new file mode 100644 index 000000000..71644a006 --- /dev/null +++ b/core/src/main/java/google/registry/tools/ValidateSqlCommand.java @@ -0,0 +1,91 @@ +// Copyright 2022 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.tools; + +import static google.registry.backup.ReplayCommitLogsToSqlAction.REPLAY_TO_SQL_LOCK_LEASE_LENGTH; +import static google.registry.backup.ReplayCommitLogsToSqlAction.REPLAY_TO_SQL_LOCK_NAME; + +import com.beust.jcommander.Parameters; +import google.registry.beam.common.DatabaseSnapshot; +import google.registry.model.common.DatabaseMigrationStateSchedule; +import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState; +import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection; +import google.registry.model.replay.SqlReplayCheckpoint; +import google.registry.model.server.Lock; +import google.registry.persistence.transaction.TransactionManagerFactory; +import java.util.Optional; +import org.joda.time.DateTime; + +/** + * Validates asynchronously replicated data from the primary Datastore to Cloud SQL. + * + *

This command suspends the replication process (by acquiring the replication lock), take a + * snapshot of the Cloud SQL database, finds the corresponding Datastore snapshot, and finally + * launches a BEAM pipeline to compare the two snapshots. + * + *

This command does not lock up either database. Normal processing can proceed. + */ +@Parameters(commandDescription = "Validates Cloud SQL with the primary Datastore.") +public class ValidateSqlCommand extends ValidateDatabaseMigrationCommand { + + @Override + public void run() throws Exception { + MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc()); + if (!state.getReplayDirection().equals(ReplayDirection.DATASTORE_TO_SQL)) { + throw new IllegalStateException("Cannot validate SQL in migration step " + state); + } + Optional lock = + Lock.acquireSql( + REPLAY_TO_SQL_LOCK_NAME, + null, + REPLAY_TO_SQL_LOCK_LEASE_LENGTH, + new FakeRequestStatusChecker(), + false); + if (!lock.isPresent()) { + throw new IllegalStateException("Cannot acquire the async propagation lock."); + } + + try { + DateTime latestCommitLogTime = + TransactionManagerFactory.jpaTm().transact(() -> SqlReplayCheckpoint.get()); + try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) { + // Eagerly release the commitlog replay lock so that replay can resume. + lock.ifPresent(Lock::releaseSql); + lock = Optional.empty(); + + System.out.printf( + "Start comparison with SQL snapshot (%s) and CommitLog timestamp (%s).\n", + databaseSnapshot.getSnapshotId(), latestCommitLogTime); + + if (manualLaunchPipeline) { + System.out.printf( + "To launch the pipeline manually, use the following command:\n%s\n", + getManualLaunchCommand( + "validate-sql", + databaseSnapshot.getSnapshotId(), + latestCommitLogTime.toString())); + + System.out.print("\nEnter any key to continue when the pipeline ends:"); + System.in.read(); + } else { + launchPipelineAndWaitUntilFinish( + "validate-sql", databaseSnapshot, latestCommitLogTime.toString()); + } + } + } finally { + lock.ifPresent(Lock::releaseSql); + } + } +} diff --git a/core/src/main/resources/google/registry/beam/validate_datastore_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/validate_database_pipeline_metadata.json similarity index 88% rename from core/src/main/resources/google/registry/beam/validate_datastore_pipeline_metadata.json rename to core/src/main/resources/google/registry/beam/validate_database_pipeline_metadata.json index 789129acc..809dffb8a 100644 --- a/core/src/main/resources/google/registry/beam/validate_datastore_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/validate_database_pipeline_metadata.json @@ -1,6 +1,6 @@ { - "name": "Validate Datastore with Cloud SQL", - "description": "An Apache Beam batch pipeline that compares Datastore with the primary Cloud SQL database.", + "name": "Validate Datastore and Cloud SQL", + "description": "An Apache Beam batch pipeline that compares the data in Datastore and Cloud SQL database.", "parameters": [ { "name": "registryEnvironment", diff --git a/core/src/main/resources/google/registry/beam/validate_sql_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/validate_sql_pipeline_metadata.json deleted file mode 100644 index 4ac557f24..000000000 --- a/core/src/main/resources/google/registry/beam/validate_sql_pipeline_metadata.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "name": "Validate Cloud SQL with Datastore being primary", - "description": "An Apache Beam batch pipeline that compares Cloud SQL with the primary Datastore.", - "parameters": [ - { - "name": "registryEnvironment", - "label": "The Registry environment.", - "helpText": "The Registry environment.", - "is_optional": false, - "regexes": [ - "^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$" - ] - }, - { - "name": "comparisonStartTimestamp", - "label": "Only entities updated at or after this time are included for validation.", - "helpText": "The earliest entity update time allowed for inclusion in validation, in ISO8601 format.", - "is_optional": true - } - ] -} diff --git a/release/cloudbuild-nomulus.yaml b/release/cloudbuild-nomulus.yaml index d15b09edc..0903cca92 100644 --- a/release/cloudbuild-nomulus.yaml +++ b/release/cloudbuild-nomulus.yaml @@ -97,10 +97,8 @@ steps: google/registry/beam/invoicing_pipeline_metadata.json \ google.registry.beam.rde.RdePipeline \ google/registry/beam/rde_pipeline_metadata.json \ - google.registry.beam.comparedb.ValidateDatastorePipeline \ - google/registry/beam/validate_datastore_pipeline_metadata.json \ - google.registry.beam.comparedb.ValidateSqlPipeline \ - google/registry/beam/validate_sql_pipeline_metadata.json + google.registry.beam.comparedb.ValidateDatabasePipeline \ + google/registry/beam/validate_database_pipeline_metadata.json # Tentatively build and publish Cloud SQL schema jar here, before schema release # process is finalized. Also publish nomulus:core jars that are needed for # server/schema compatibility tests.