diff --git a/common/src/main/java/google/registry/util/Sleeper.java b/common/src/main/java/google/registry/util/Sleeper.java
index 288b0862a..1ee660441 100644
--- a/common/src/main/java/google/registry/util/Sleeper.java
+++ b/common/src/main/java/google/registry/util/Sleeper.java
@@ -41,4 +41,20 @@ public interface Sleeper {
* @see com.google.common.util.concurrent.Uninterruptibles#sleepUninterruptibly
*/
void sleepUninterruptibly(ReadableDuration duration);
+
+ /**
+ * Puts the current thread to interruptible sleep.
+ *
+ *
This is a convenience method for {@link #sleep} that properly converts an {@link
+ * InterruptedException} to a {@link RuntimeException}.
+ */
+ default void sleepInterruptibly(ReadableDuration duration) {
+ try {
+ sleep(duration);
+ } catch (InterruptedException e) {
+ // Restore current thread's interrupted state.
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted.", e);
+ }
+ }
}
diff --git a/core/src/main/java/google/registry/backup/BackupModule.java b/core/src/main/java/google/registry/backup/BackupModule.java
index bea2d9902..263e646e1 100644
--- a/core/src/main/java/google/registry/backup/BackupModule.java
+++ b/core/src/main/java/google/registry/backup/BackupModule.java
@@ -21,6 +21,7 @@ import static google.registry.backup.ExportCommitLogDiffAction.UPPER_CHECKPOINT_
import static google.registry.backup.RestoreCommitLogsAction.BUCKET_OVERRIDE_PARAM;
import static google.registry.backup.RestoreCommitLogsAction.FROM_TIME_PARAM;
import static google.registry.backup.RestoreCommitLogsAction.TO_TIME_PARAM;
+import static google.registry.backup.SyncDatastoreToSqlSnapshotAction.SQL_SNAPSHOT_ID_PARAM;
import static google.registry.request.RequestParameters.extractOptionalParameter;
import static google.registry.request.RequestParameters.extractRequiredDatetimeParameter;
import static google.registry.request.RequestParameters.extractRequiredParameter;
@@ -98,6 +99,12 @@ public final class BackupModule {
return extractRequiredDatetimeParameter(req, TO_TIME_PARAM);
}
+ @Provides
+ @Parameter(SQL_SNAPSHOT_ID_PARAM)
+ static String provideSqlSnapshotId(HttpServletRequest req) {
+ return extractRequiredParameter(req, SQL_SNAPSHOT_ID_PARAM);
+ }
+
@Provides
@Backups
static ListeningExecutorService provideListeningExecutorService() {
diff --git a/core/src/main/java/google/registry/backup/CommitLogCheckpointAction.java b/core/src/main/java/google/registry/backup/CommitLogCheckpointAction.java
index 0800d5c37..92273be23 100644
--- a/core/src/main/java/google/registry/backup/CommitLogCheckpointAction.java
+++ b/core/src/main/java/google/registry/backup/CommitLogCheckpointAction.java
@@ -30,6 +30,7 @@ import google.registry.request.Action.Service;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import google.registry.util.CloudTasksUtils;
+import java.util.Optional;
import javax.inject.Inject;
import org.joda.time.DateTime;
@@ -64,33 +65,47 @@ public final class CommitLogCheckpointAction implements Runnable {
@Override
public void run() {
+ createCheckPointAndStartAsyncExport();
+ }
+
+ /**
+ * Creates a {@link CommitLogCheckpoint} and initiates an asynchronous export task.
+ *
+ * @return the {@code CommitLogCheckpoint} to be exported
+ */
+ public Optional createCheckPointAndStartAsyncExport() {
final CommitLogCheckpoint checkpoint = strategy.computeCheckpoint();
logger.atInfo().log(
"Generated candidate checkpoint for time: %s", checkpoint.getCheckpointTime());
- ofyTm()
- .transact(
- () -> {
- DateTime lastWrittenTime = CommitLogCheckpointRoot.loadRoot().getLastWrittenTime();
- if (isBeforeOrAt(checkpoint.getCheckpointTime(), lastWrittenTime)) {
- logger.atInfo().log(
- "Newer checkpoint already written at time: %s", lastWrittenTime);
- return;
- }
- auditedOfy()
- .saveIgnoringReadOnlyWithoutBackup()
- .entities(
- checkpoint, CommitLogCheckpointRoot.create(checkpoint.getCheckpointTime()));
- // Enqueue a diff task between previous and current checkpoints.
- cloudTasksUtils.enqueue(
- QUEUE_NAME,
- CloudTasksUtils.createPostTask(
- ExportCommitLogDiffAction.PATH,
- Service.BACKEND.toString(),
- ImmutableMultimap.of(
- LOWER_CHECKPOINT_TIME_PARAM,
- lastWrittenTime.toString(),
- UPPER_CHECKPOINT_TIME_PARAM,
- checkpoint.getCheckpointTime().toString())));
- });
+ boolean isCheckPointPersisted =
+ ofyTm()
+ .transact(
+ () -> {
+ DateTime lastWrittenTime =
+ CommitLogCheckpointRoot.loadRoot().getLastWrittenTime();
+ if (isBeforeOrAt(checkpoint.getCheckpointTime(), lastWrittenTime)) {
+ logger.atInfo().log(
+ "Newer checkpoint already written at time: %s", lastWrittenTime);
+ return false;
+ }
+ auditedOfy()
+ .saveIgnoringReadOnlyWithoutBackup()
+ .entities(
+ checkpoint,
+ CommitLogCheckpointRoot.create(checkpoint.getCheckpointTime()));
+ // Enqueue a diff task between previous and current checkpoints.
+ cloudTasksUtils.enqueue(
+ QUEUE_NAME,
+ CloudTasksUtils.createPostTask(
+ ExportCommitLogDiffAction.PATH,
+ Service.BACKEND.toString(),
+ ImmutableMultimap.of(
+ LOWER_CHECKPOINT_TIME_PARAM,
+ lastWrittenTime.toString(),
+ UPPER_CHECKPOINT_TIME_PARAM,
+ checkpoint.getCheckpointTime().toString())));
+ return true;
+ });
+ return isCheckPointPersisted ? Optional.of(checkpoint) : Optional.empty();
}
}
diff --git a/core/src/main/java/google/registry/backup/SyncDatastoreToSqlSnapshotAction.java b/core/src/main/java/google/registry/backup/SyncDatastoreToSqlSnapshotAction.java
new file mode 100644
index 000000000..693606a77
--- /dev/null
+++ b/core/src/main/java/google/registry/backup/SyncDatastoreToSqlSnapshotAction.java
@@ -0,0 +1,173 @@
+// Copyright 2022 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.backup;
+
+import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
+import static javax.servlet.http.HttpServletResponse.SC_OK;
+
+import com.google.common.flogger.FluentLogger;
+import google.registry.beam.comparedb.LatestDatastoreSnapshotFinder;
+import google.registry.config.RegistryConfig.Config;
+import google.registry.model.annotations.DeleteAfterMigration;
+import google.registry.model.ofy.CommitLogCheckpoint;
+import google.registry.model.replay.ReplicateToDatastoreAction;
+import google.registry.request.Action;
+import google.registry.request.Action.Service;
+import google.registry.request.Parameter;
+import google.registry.request.Response;
+import google.registry.request.auth.Auth;
+import google.registry.util.Sleeper;
+import java.util.Optional;
+import javax.inject.Inject;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+
+/**
+ * Synchronizes Datastore to a given SQL snapshot when SQL is the primary database.
+ *
+ * The caller takes the responsibility for:
+ *
+ *
+ * - verifying the current migration stage
+ *
- acquiring the {@link ReplicateToDatastoreAction#REPLICATE_TO_DATASTORE_LOCK_NAME
+ * replication lock}, and
+ *
- while holding the lock, creating an SQL snapshot and invoking this action with the snapshot
+ * id
+ *
+ *
+ * The caller may release the replication lock upon receiving the response from this action. Please
+ * refer to {@link google.registry.tools.ValidateDatastoreWithSqlCommand} for more information on
+ * usage.
+ *
+ * This action plays SQL transactions up to the user-specified snapshot, creates a new CommitLog
+ * checkpoint, and exports all CommitLogs to GCS up to this checkpoint. The timestamp of this
+ * checkpoint can be used to recreate a Datastore snapshot that is equivalent to the given SQL
+ * snapshot. If this action succeeds, the checkpoint timestamp is included in the response (the
+ * format of which is defined by {@link #SUCCESS_RESPONSE_TEMPLATE}).
+ */
+@Action(
+ service = Service.BACKEND,
+ path = SyncDatastoreToSqlSnapshotAction.PATH,
+ method = Action.Method.POST,
+ auth = Auth.AUTH_INTERNAL_OR_ADMIN)
+@DeleteAfterMigration
+public class SyncDatastoreToSqlSnapshotAction implements Runnable {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ public static final String PATH = "/_dr/task/syncDatastoreToSqlSnapshot";
+
+ public static final String SUCCESS_RESPONSE_TEMPLATE =
+ "Datastore is up-to-date with provided SQL snapshot (%s). CommitLog timestamp is (%s).";
+
+ static final String SQL_SNAPSHOT_ID_PARAM = "sqlSnapshotId";
+
+ private static final int COMMITLOGS_PRESENCE_CHECK_ATTEMPTS = 10;
+ private static final Duration COMMITLOGS_PRESENCE_CHECK_DELAY = Duration.standardSeconds(6);
+
+ private final Response response;
+ private final Sleeper sleeper;
+
+ @Config("commitLogGcsBucket")
+ private final String gcsBucket;
+
+ private final GcsDiffFileLister gcsDiffFileLister;
+ private final LatestDatastoreSnapshotFinder datastoreSnapshotFinder;
+ private final CommitLogCheckpointAction commitLogCheckpointAction;
+ private final String sqlSnapshotId;
+
+ @Inject
+ SyncDatastoreToSqlSnapshotAction(
+ Response response,
+ Sleeper sleeper,
+ @Config("commitLogGcsBucket") String gcsBucket,
+ GcsDiffFileLister gcsDiffFileLister,
+ LatestDatastoreSnapshotFinder datastoreSnapshotFinder,
+ CommitLogCheckpointAction commitLogCheckpointAction,
+ @Parameter(SQL_SNAPSHOT_ID_PARAM) String sqlSnapshotId) {
+ this.response = response;
+ this.sleeper = sleeper;
+ this.gcsBucket = gcsBucket;
+ this.gcsDiffFileLister = gcsDiffFileLister;
+ this.datastoreSnapshotFinder = datastoreSnapshotFinder;
+ this.commitLogCheckpointAction = commitLogCheckpointAction;
+ this.sqlSnapshotId = sqlSnapshotId;
+ }
+
+ @Override
+ public void run() {
+ logger.atInfo().log("Datastore validation invoked. SqlSnapshotId is %s.", sqlSnapshotId);
+
+ try {
+ CommitLogCheckpoint checkpoint = ensureDatabasesComparable(sqlSnapshotId);
+ response.setStatus(SC_OK);
+ response.setPayload(
+ String.format(SUCCESS_RESPONSE_TEMPLATE, sqlSnapshotId, checkpoint.getCheckpointTime()));
+ return;
+ } catch (Exception e) {
+ response.setStatus(SC_INTERNAL_SERVER_ERROR);
+ response.setPayload(e.getMessage());
+ }
+ }
+
+ private CommitLogCheckpoint ensureDatabasesComparable(String sqlSnapshotId) {
+ // Replicate SQL transaction to Datastore, up to when this snapshot is taken.
+ int playbacks = ReplicateToDatastoreAction.replayAllTransactions(Optional.of(sqlSnapshotId));
+ logger.atInfo().log("Played %s SQL transactions.", playbacks);
+
+ Optional checkpoint = exportCommitLogs();
+ if (!checkpoint.isPresent()) {
+ throw new RuntimeException("Cannot create CommitLog checkpoint");
+ }
+ logger.atInfo().log(
+ "CommitLog checkpoint created at %s.", checkpoint.get().getCheckpointTime());
+ verifyCommitLogsPersisted(checkpoint.get());
+ return checkpoint.get();
+ }
+
+ private Optional exportCommitLogs() {
+ // Trigger an async CommitLog export to GCS. Will check file availability later.
+ // Although we can add support to synchronous execution, it can disrupt the export cadence
+ // when the system is busy
+ Optional checkpoint =
+ commitLogCheckpointAction.createCheckPointAndStartAsyncExport();
+
+ // Failure to create checkpoint most likely caused by race with cron-triggered checkpointing.
+ // Retry once.
+ if (!checkpoint.isPresent()) {
+ commitLogCheckpointAction.createCheckPointAndStartAsyncExport();
+ }
+ return checkpoint;
+ }
+
+ private void verifyCommitLogsPersisted(CommitLogCheckpoint checkpoint) {
+ DateTime exportStartTime =
+ datastoreSnapshotFinder
+ .getSnapshotInfo(checkpoint.getCheckpointTime().toInstant())
+ .exportInterval()
+ .getStart();
+ logger.atInfo().log("Found Datastore export at %s", exportStartTime);
+ for (int attempts = 0; attempts < COMMITLOGS_PRESENCE_CHECK_ATTEMPTS; attempts++) {
+ try {
+ gcsDiffFileLister.listDiffFiles(gcsBucket, exportStartTime, checkpoint.getCheckpointTime());
+ return;
+ } catch (IllegalStateException e) {
+ // Gap in commitlog files. Fall through to sleep and retry.
+ logger.atInfo().log("Commitlog files not yet found on GCS.");
+ }
+ sleeper.sleepInterruptibly(COMMITLOGS_PRESENCE_CHECK_DELAY);
+ }
+ throw new RuntimeException("Cannot find all commitlog files.");
+ }
+}
diff --git a/core/src/main/java/google/registry/beam/comparedb/LatestDatastoreSnapshotFinder.java b/core/src/main/java/google/registry/beam/comparedb/LatestDatastoreSnapshotFinder.java
index 04d54add7..20dfcc030 100644
--- a/core/src/main/java/google/registry/beam/comparedb/LatestDatastoreSnapshotFinder.java
+++ b/core/src/main/java/google/registry/beam/comparedb/LatestDatastoreSnapshotFinder.java
@@ -66,7 +66,7 @@ public class LatestDatastoreSnapshotFinder {
* "2021-11-19T06:00:00_76493/2021-11-19T06:00:00_76493.overall_export_metadata".
*/
Optional metaFilePathOptional =
- findNewestExportMetadataFileBeforeTime(bucketName, exportEndTimeUpperBound, 2);
+ findNewestExportMetadataFileBeforeTime(bucketName, exportEndTimeUpperBound, 5);
if (!metaFilePathOptional.isPresent()) {
throw new NoSuchElementException("No exports found over the past 2 days.");
}
@@ -125,12 +125,12 @@ public class LatestDatastoreSnapshotFinder {
/** Holds information about a Datastore snapshot. */
@AutoValue
- abstract static class DatastoreSnapshotInfo {
- abstract String exportDir();
+ public abstract static class DatastoreSnapshotInfo {
+ public abstract String exportDir();
- abstract String commitLogDir();
+ public abstract String commitLogDir();
- abstract Interval exportInterval();
+ public abstract Interval exportInterval();
static DatastoreSnapshotInfo create(
String exportDir, String commitLogDir, Interval exportOperationInterval) {
diff --git a/core/src/main/java/google/registry/env/common/backend/WEB-INF/web.xml b/core/src/main/java/google/registry/env/common/backend/WEB-INF/web.xml
index 652ca2aa5..f6b48a58a 100644
--- a/core/src/main/java/google/registry/env/common/backend/WEB-INF/web.xml
+++ b/core/src/main/java/google/registry/env/common/backend/WEB-INF/web.xml
@@ -422,6 +422,12 @@ have been in the database for a certain period of time. -->
/_dr/task/createSyntheticHistoryEntries
+
+
+ backend-servlet
+ /_dr/task/syncDatastoreToSqlSnapshot
+
+
diff --git a/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java b/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java
index 794cfe840..d211606f1 100644
--- a/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java
+++ b/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java
@@ -59,6 +59,10 @@ public class ReplicateToDatastoreAction implements Runnable {
public static final String PATH = "/_dr/cron/replicateToDatastore";
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ /** Name of the lock that ensures sequential execution of replays. */
+ public static final String REPLICATE_TO_DATASTORE_LOCK_NAME =
+ ReplicateToDatastoreAction.class.getSimpleName();
+
/**
* Number of transactions to fetch from SQL. The rationale for 200 is that we're processing these
* every minute and our production instance currently does about 2 mutations per second, so this
@@ -66,7 +70,7 @@ public class ReplicateToDatastoreAction implements Runnable {
*/
public static final int BATCH_SIZE = 200;
- private static final Duration LEASE_LENGTH = standardHours(1);
+ public static final Duration REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH = standardHours(1);
private final Clock clock;
private final RequestStatusChecker requestStatusChecker;
@@ -81,21 +85,26 @@ public class ReplicateToDatastoreAction implements Runnable {
}
@VisibleForTesting
- public List getTransactionBatch() {
+ public List getTransactionBatchAtSnapshot() {
+ return getTransactionBatchAtSnapshot(Optional.empty());
+ }
+
+ static List getTransactionBatchAtSnapshot(Optional snapshotId) {
// Get the next batch of transactions that we haven't replicated.
LastSqlTransaction lastSqlTxnBeforeBatch = ofyTm().transact(LastSqlTransaction::load);
try {
return jpaTm()
.transactWithoutBackup(
- () ->
- jpaTm()
- .query(
- "SELECT txn FROM TransactionEntity txn WHERE id >"
- + " :lastId ORDER BY id",
- TransactionEntity.class)
- .setParameter("lastId", lastSqlTxnBeforeBatch.getTransactionId())
- .setMaxResults(BATCH_SIZE)
- .getResultList());
+ () -> {
+ snapshotId.ifPresent(jpaTm()::setDatabaseSnapshot);
+ return jpaTm()
+ .query(
+ "SELECT txn FROM TransactionEntity txn WHERE id >" + " :lastId ORDER BY id",
+ TransactionEntity.class)
+ .setParameter("lastId", lastSqlTxnBeforeBatch.getTransactionId())
+ .setMaxResults(BATCH_SIZE)
+ .getResultList();
+ });
} catch (NoResultException e) {
return ImmutableList.of();
}
@@ -108,7 +117,7 @@ public class ReplicateToDatastoreAction implements Runnable {
* Throws an exception if a fatal error occurred and the batch should be aborted
*/
@VisibleForTesting
- public void applyTransaction(TransactionEntity txnEntity) {
+ public static void applyTransaction(TransactionEntity txnEntity) {
logger.atInfo().log("Applying a single transaction Cloud SQL -> Cloud Datastore.");
try (UpdateAutoTimestamp.DisableAutoUpdateResource disabler =
UpdateAutoTimestamp.disableAutoUpdate()) {
@@ -174,7 +183,11 @@ public class ReplicateToDatastoreAction implements Runnable {
}
Optional lock =
Lock.acquireSql(
- this.getClass().getSimpleName(), null, LEASE_LENGTH, requestStatusChecker, false);
+ REPLICATE_TO_DATASTORE_LOCK_NAME,
+ null,
+ REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH,
+ requestStatusChecker,
+ false);
if (!lock.isPresent()) {
String message = "Can't acquire ReplicateToDatastoreAction lock, aborting.";
logger.atSevere().log(message);
@@ -203,10 +216,14 @@ public class ReplicateToDatastoreAction implements Runnable {
}
private int replayAllTransactions() {
+ return replayAllTransactions(Optional.empty());
+ }
+
+ public static int replayAllTransactions(Optional snapshotId) {
int numTransactionsReplayed = 0;
List transactionBatch;
do {
- transactionBatch = getTransactionBatch();
+ transactionBatch = getTransactionBatchAtSnapshot(snapshotId);
for (TransactionEntity transaction : transactionBatch) {
applyTransaction(transaction);
numTransactionsReplayed++;
diff --git a/core/src/main/java/google/registry/module/backend/BackendRequestComponent.java b/core/src/main/java/google/registry/module/backend/BackendRequestComponent.java
index 6b63e8cf6..1f4580051 100644
--- a/core/src/main/java/google/registry/module/backend/BackendRequestComponent.java
+++ b/core/src/main/java/google/registry/module/backend/BackendRequestComponent.java
@@ -21,6 +21,7 @@ import google.registry.backup.CommitLogCheckpointAction;
import google.registry.backup.DeleteOldCommitLogsAction;
import google.registry.backup.ExportCommitLogDiffAction;
import google.registry.backup.ReplayCommitLogsToSqlAction;
+import google.registry.backup.SyncDatastoreToSqlSnapshotAction;
import google.registry.batch.BatchModule;
import google.registry.batch.DeleteContactsAndHostsAction;
import google.registry.batch.DeleteExpiredDomainsAction;
@@ -199,6 +200,8 @@ interface BackendRequestComponent {
SendExpiringCertificateNotificationEmailAction sendExpiringCertificateNotificationEmailAction();
+ SyncDatastoreToSqlSnapshotAction syncDatastoreToSqlSnapshotAction();
+
SyncGroupMembersAction syncGroupMembersAction();
SyncRegistrarsSheetAction syncRegistrarsSheetAction();
diff --git a/core/src/main/java/google/registry/tools/RegistryTool.java b/core/src/main/java/google/registry/tools/RegistryTool.java
index 976d9ab63..3a64f14ef 100644
--- a/core/src/main/java/google/registry/tools/RegistryTool.java
+++ b/core/src/main/java/google/registry/tools/RegistryTool.java
@@ -123,6 +123,7 @@ public final class RegistryTool {
.put("update_server_locks", UpdateServerLocksCommand.class)
.put("update_tld", UpdateTldCommand.class)
.put("upload_claims_list", UploadClaimsListCommand.class)
+ .put("validate_datastore_with_sql", ValidateDatastoreWithSqlCommand.class)
.put("validate_escrow_deposit", ValidateEscrowDepositCommand.class)
.put("validate_login_credentials", ValidateLoginCredentialsCommand.class)
.put("verify_ote", VerifyOteCommand.class)
diff --git a/core/src/main/java/google/registry/tools/RegistryToolComponent.java b/core/src/main/java/google/registry/tools/RegistryToolComponent.java
index a2f8f57ff..29dd85845 100644
--- a/core/src/main/java/google/registry/tools/RegistryToolComponent.java
+++ b/core/src/main/java/google/registry/tools/RegistryToolComponent.java
@@ -76,6 +76,7 @@ import javax.inject.Singleton;
LocalCredentialModule.class,
PersistenceModule.class,
RdeModule.class,
+ RegistryToolDataflowModule.class,
RequestFactoryModule.class,
SecretManagerModule.class,
URLFetchServiceModule.class,
@@ -170,6 +171,8 @@ interface RegistryToolComponent {
void inject(UpdateTldCommand command);
+ void inject(ValidateDatastoreWithSqlCommand command);
+
void inject(ValidateEscrowDepositCommand command);
void inject(ValidateLoginCredentialsCommand command);
diff --git a/core/src/main/java/google/registry/tools/RegistryToolDataflowModule.java b/core/src/main/java/google/registry/tools/RegistryToolDataflowModule.java
new file mode 100644
index 000000000..8a3f4fa44
--- /dev/null
+++ b/core/src/main/java/google/registry/tools/RegistryToolDataflowModule.java
@@ -0,0 +1,39 @@
+// Copyright 2022 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.tools;
+
+import com.google.api.services.dataflow.Dataflow;
+import dagger.Module;
+import dagger.Provides;
+import google.registry.config.CredentialModule.LocalCredential;
+import google.registry.config.RegistryConfig.Config;
+import google.registry.util.GoogleCredentialsBundle;
+
+/** Provides a {@link Dataflow} API client for use in {@link RegistryTool}. */
+@Module
+public class RegistryToolDataflowModule {
+
+ @Provides
+ static Dataflow provideDataflow(
+ @LocalCredential GoogleCredentialsBundle credentialsBundle,
+ @Config("projectId") String projectId) {
+ return new Dataflow.Builder(
+ credentialsBundle.getHttpTransport(),
+ credentialsBundle.getJsonFactory(),
+ credentialsBundle.getHttpRequestInitializer())
+ .setApplicationName(String.format("%s nomulus", projectId))
+ .build();
+ }
+}
diff --git a/core/src/main/java/google/registry/tools/ValidateDatastoreWithSqlCommand.java b/core/src/main/java/google/registry/tools/ValidateDatastoreWithSqlCommand.java
new file mode 100644
index 000000000..a1ba5d941
--- /dev/null
+++ b/core/src/main/java/google/registry/tools/ValidateDatastoreWithSqlCommand.java
@@ -0,0 +1,229 @@
+// Copyright 2022 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.tools;
+
+import static google.registry.beam.BeamUtils.createJobName;
+import static google.registry.model.replay.ReplicateToDatastoreAction.REPLICATE_TO_DATASTORE_LOCK_NAME;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.api.services.dataflow.Dataflow;
+import com.google.api.services.dataflow.model.Job;
+import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
+import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
+import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.net.MediaType;
+import google.registry.backup.SyncDatastoreToSqlSnapshotAction;
+import google.registry.beam.common.DatabaseSnapshot;
+import google.registry.config.RegistryConfig.Config;
+import google.registry.model.common.DatabaseMigrationStateSchedule;
+import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
+import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection;
+import google.registry.model.replay.ReplicateToDatastoreAction;
+import google.registry.model.server.Lock;
+import google.registry.request.Action.Service;
+import google.registry.util.Clock;
+import google.registry.util.RequestStatusChecker;
+import google.registry.util.Sleeper;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.UUID;
+import javax.inject.Inject;
+import org.joda.time.Duration;
+
+/**
+ * Validates asynchronously replicated data from the primary Cloud SQL database to Datastore.
+ *
+ * This command suspends the replication process (by acquiring the replication lock), take a
+ * snapshot of the Cloud SQL database, invokes a Nomulus server action to sync Datastore to this
+ * snapshot (See {@link SyncDatastoreToSqlSnapshotAction} for details), and finally launches a BEAM
+ * pipeline to compare Datastore with the given SQL snapshot.
+ *
+ *
This command does not lock up the SQL database. Normal processing can proceed.
+ */
+@Parameters(commandDescription = "Validates Datastore with Cloud SQL.")
+public class ValidateDatastoreWithSqlCommand
+ implements CommandWithConnection, CommandWithRemoteApi {
+
+ private static final Service NOMULUS_SERVICE = Service.BACKEND;
+
+ private static final String PIPELINE_NAME = "validate_datastore_pipeline";
+
+ // States indicating a job is not finished yet.
+ private static final ImmutableSet DATAFLOW_JOB_RUNNING_STATES =
+ ImmutableSet.of(
+ "JOB_STATE_RUNNING", "JOB_STATE_STOPPED", "JOB_STATE_PENDING", "JOB_STATE_QUEUED");
+
+ private static final Duration JOB_POLLING_INTERVAL = Duration.standardSeconds(60);
+
+ @Parameter(
+ names = {"-m", "--manual"},
+ description =
+ "If true, let user launch the comparison pipeline manually out of band. "
+ + "Command will wait for user key-press to exit after syncing Datastore.")
+ boolean manualLaunchPipeline;
+
+ @Inject Clock clock;
+ @Inject Dataflow dataflow;
+
+ @Inject
+ @Config("defaultJobRegion")
+ String jobRegion;
+
+ @Inject
+ @Config("beamStagingBucketUrl")
+ String stagingBucketUrl;
+
+ @Inject
+ @Config("projectId")
+ String projectId;
+
+ @Inject Sleeper sleeper;
+
+ private AppEngineConnection connection;
+
+ @Override
+ public void setConnection(AppEngineConnection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public void run() throws Exception {
+ MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc());
+ if (!state.getReplayDirection().equals(ReplayDirection.SQL_TO_DATASTORE)) {
+ throw new IllegalStateException("Cannot sync Datastore to SQL in migration step " + state);
+ }
+ Optional lock =
+ Lock.acquireSql(
+ REPLICATE_TO_DATASTORE_LOCK_NAME,
+ null,
+ ReplicateToDatastoreAction.REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH,
+ new FakeRequestStatusChecker(),
+ false);
+ if (!lock.isPresent()) {
+ throw new IllegalStateException("Cannot acquire the async propagation lock.");
+ }
+
+ try {
+ try (DatabaseSnapshot snapshot = DatabaseSnapshot.createSnapshot()) {
+ System.out.printf("Obtained snapshot %s\n", snapshot.getSnapshotId());
+ AppEngineConnection connectionToService = connection.withService(NOMULUS_SERVICE);
+ String response =
+ connectionToService.sendPostRequest(
+ getNomulusEndpoint(snapshot.getSnapshotId()),
+ ImmutableMap.of(),
+ MediaType.PLAIN_TEXT_UTF_8,
+ "".getBytes(UTF_8));
+ System.out.println(response);
+
+ lock.ifPresent(Lock::releaseSql);
+ lock = Optional.empty();
+
+ // See SyncDatastoreToSqlSnapshotAction for response format.
+ String latestCommitTimestamp =
+ response.substring(response.lastIndexOf('(') + 1, response.lastIndexOf(')'));
+
+ if (manualLaunchPipeline) {
+ System.out.print("\nEnter any key to continue when the pipeline ends:");
+ System.in.read();
+ } else {
+ Job pipelineJob =
+ launchComparisonPipeline(snapshot.getSnapshotId(), latestCommitTimestamp).getJob();
+ String jobId = pipelineJob.getId();
+
+ System.out.printf(
+ "Launched comparison pipeline %s (%s).\n", pipelineJob.getName(), jobId);
+
+ while (DATAFLOW_JOB_RUNNING_STATES.contains(getDataflowJobStatus(jobId))) {
+ sleeper.sleepInterruptibly(JOB_POLLING_INTERVAL);
+ }
+ System.out.printf(
+ "Pipeline ended with %s state. Please check counters for results.\n",
+ getDataflowJobStatus(jobId));
+ }
+ }
+ } finally {
+ lock.ifPresent(Lock::releaseSql);
+ }
+ }
+
+ private static String getNomulusEndpoint(String sqlSnapshotId) {
+ return String.format(
+ "%s?sqlSnapshotId=%s", SyncDatastoreToSqlSnapshotAction.PATH, sqlSnapshotId);
+ }
+
+ private LaunchFlexTemplateResponse launchComparisonPipeline(
+ String sqlSnapshotId, String latestCommitLogTimestamp) {
+ try {
+ LaunchFlexTemplateParameter parameter =
+ new LaunchFlexTemplateParameter()
+ .setJobName(createJobName("validate-datastore", clock))
+ .setContainerSpecGcsPath(
+ String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME))
+ .setParameters(
+ ImmutableMap.of(
+ "sqlSnapshotId",
+ sqlSnapshotId,
+ "latestCommitLogTimestamp",
+ latestCommitLogTimestamp,
+ "registryEnvironment",
+ RegistryToolEnvironment.get().name()));
+ return dataflow
+ .projects()
+ .locations()
+ .flexTemplates()
+ .launch(
+ projectId, jobRegion, new LaunchFlexTemplateRequest().setLaunchParameter(parameter))
+ .execute();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String getDataflowJobStatus(String jobId) {
+ try {
+ return dataflow
+ .projects()
+ .locations()
+ .jobs()
+ .get(projectId, jobRegion, jobId)
+ .execute()
+ .getCurrentState();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * A fake implementation of {@link RequestStatusChecker} for managing SQL-backed locks from
+ * non-AppEngine platforms. This is only required until the Nomulus server is migrated off
+ * AppEngine.
+ */
+ static class FakeRequestStatusChecker implements RequestStatusChecker {
+
+ @Override
+ public String getLogId() {
+ return ValidateDatastoreWithSqlCommand.class.getSimpleName() + "-" + UUID.randomUUID();
+ }
+
+ @Override
+ public boolean isRunning(String requestLogId) {
+ return false;
+ }
+ }
+}
diff --git a/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java b/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java
index 630820193..1ce02c91a 100644
--- a/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java
+++ b/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java
@@ -15,6 +15,7 @@
package google.registry.model.replay;
import static com.google.common.truth.Truth.assertThat;
+import static google.registry.model.replay.ReplicateToDatastoreAction.applyTransaction;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
import static google.registry.testing.DatabaseHelper.insertInDb;
@@ -158,23 +159,23 @@ public class ReplicateToDatastoreActionTest {
// Write a transaction and run just the batch fetch.
insertInDb(foo);
- List txns1 = action.getTransactionBatch();
+ List txns1 = action.getTransactionBatchAtSnapshot();
assertThat(txns1).hasSize(1);
// Write a second transaction and do another batch fetch.
insertInDb(bar);
- List txns2 = action.getTransactionBatch();
+ List txns2 = action.getTransactionBatchAtSnapshot();
assertThat(txns2).hasSize(2);
// Apply the first batch.
- action.applyTransaction(txns1.get(0));
+ applyTransaction(txns1.get(0));
// Remove the foo record so we can ensure that this transaction doesn't get doublle-played.
ofyTm().transact(() -> ofyTm().delete(foo.key()));
// Apply the second batch.
for (TransactionEntity txn : txns2) {
- action.applyTransaction(txn);
+ applyTransaction(txn);
}
// Verify that the first transaction didn't get replayed but the second one did.
@@ -212,10 +213,9 @@ public class ReplicateToDatastoreActionTest {
// Force the last transaction id back to -1 so that we look for transaction 0.
ofyTm().transact(() -> ofyTm().insert(new LastSqlTransaction(-1)));
- List txns = action.getTransactionBatch();
+ List txns = action.getTransactionBatchAtSnapshot();
assertThat(txns).hasSize(1);
- assertThat(
- assertThrows(IllegalStateException.class, () -> action.applyTransaction(txns.get(0))))
+ assertThat(assertThrows(IllegalStateException.class, () -> applyTransaction(txns.get(0))))
.hasMessageThat()
.isEqualTo("Missing transaction: last txn id = -1, next available txn = 1");
}
diff --git a/core/src/test/java/google/registry/testing/ReplayExtension.java b/core/src/test/java/google/registry/testing/ReplayExtension.java
index 033b2b73c..af090a1fa 100644
--- a/core/src/test/java/google/registry/testing/ReplayExtension.java
+++ b/core/src/test/java/google/registry/testing/ReplayExtension.java
@@ -249,9 +249,9 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback {
List transactionBatch;
do {
- transactionBatch = sqlToDsReplicator.getTransactionBatch();
+ transactionBatch = sqlToDsReplicator.getTransactionBatchAtSnapshot();
for (TransactionEntity txn : transactionBatch) {
- sqlToDsReplicator.applyTransaction(txn);
+ ReplicateToDatastoreAction.applyTransaction(txn);
if (compare) {
ofyTm().transact(() -> compareSqlTransaction(txn));
}
diff --git a/core/src/test/resources/google/registry/module/backend/backend_routing.txt b/core/src/test/resources/google/registry/module/backend/backend_routing.txt
index c85aa0703..3228ab536 100644
--- a/core/src/test/resources/google/registry/module/backend/backend_routing.txt
+++ b/core/src/test/resources/google/registry/module/backend/backend_routing.txt
@@ -39,6 +39,7 @@ PATH CLASS
/_dr/task/resaveAllEppResources ResaveAllEppResourcesAction GET n INTERNAL,API APP ADMIN
/_dr/task/resaveEntity ResaveEntityAction POST n INTERNAL,API APP ADMIN
/_dr/task/sendExpiringCertificateNotificationEmail SendExpiringCertificateNotificationEmailAction GET n INTERNAL,API APP ADMIN
+/_dr/task/syncDatastoreToSqlSnapshot SyncDatastoreToSqlSnapshotAction POST n INTERNAL,API APP ADMIN
/_dr/task/syncGroupMembers SyncGroupMembersAction POST n INTERNAL,API APP ADMIN
/_dr/task/syncRegistrarsSheet SyncRegistrarsSheetAction POST n INTERNAL,API APP ADMIN
/_dr/task/tmchCrl TmchCrlAction POST y INTERNAL,API APP ADMIN