Compare datastore to sql action (#1507)

* Add action to DB comparison pipeline

Add a backend Action in Nomulus server that lanuches the pipeline for
comparing datastore (secondary) with Cloud SQL (primary).

* Save progress

* Revert test changes

* Add pipeline launching
This commit is contained in:
Weimin Yu 2022-02-10 10:43:36 -05:00 committed by GitHub
parent 7b86cba44f
commit 4ca390da98
15 changed files with 563 additions and 53 deletions

View file

@ -41,4 +41,20 @@ public interface Sleeper {
* @see com.google.common.util.concurrent.Uninterruptibles#sleepUninterruptibly
*/
void sleepUninterruptibly(ReadableDuration duration);
/**
* Puts the current thread to interruptible sleep.
*
* <p>This is a convenience method for {@link #sleep} that properly converts an {@link
* InterruptedException} to a {@link RuntimeException}.
*/
default void sleepInterruptibly(ReadableDuration duration) {
try {
sleep(duration);
} catch (InterruptedException e) {
// Restore current thread's interrupted state.
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted.", e);
}
}
}

View file

@ -21,6 +21,7 @@ import static google.registry.backup.ExportCommitLogDiffAction.UPPER_CHECKPOINT_
import static google.registry.backup.RestoreCommitLogsAction.BUCKET_OVERRIDE_PARAM;
import static google.registry.backup.RestoreCommitLogsAction.FROM_TIME_PARAM;
import static google.registry.backup.RestoreCommitLogsAction.TO_TIME_PARAM;
import static google.registry.backup.SyncDatastoreToSqlSnapshotAction.SQL_SNAPSHOT_ID_PARAM;
import static google.registry.request.RequestParameters.extractOptionalParameter;
import static google.registry.request.RequestParameters.extractRequiredDatetimeParameter;
import static google.registry.request.RequestParameters.extractRequiredParameter;
@ -98,6 +99,12 @@ public final class BackupModule {
return extractRequiredDatetimeParameter(req, TO_TIME_PARAM);
}
@Provides
@Parameter(SQL_SNAPSHOT_ID_PARAM)
static String provideSqlSnapshotId(HttpServletRequest req) {
return extractRequiredParameter(req, SQL_SNAPSHOT_ID_PARAM);
}
@Provides
@Backups
static ListeningExecutorService provideListeningExecutorService() {

View file

@ -30,6 +30,7 @@ import google.registry.request.Action.Service;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import google.registry.util.CloudTasksUtils;
import java.util.Optional;
import javax.inject.Inject;
import org.joda.time.DateTime;
@ -64,33 +65,47 @@ public final class CommitLogCheckpointAction implements Runnable {
@Override
public void run() {
createCheckPointAndStartAsyncExport();
}
/**
* Creates a {@link CommitLogCheckpoint} and initiates an asynchronous export task.
*
* @return the {@code CommitLogCheckpoint} to be exported
*/
public Optional<CommitLogCheckpoint> createCheckPointAndStartAsyncExport() {
final CommitLogCheckpoint checkpoint = strategy.computeCheckpoint();
logger.atInfo().log(
"Generated candidate checkpoint for time: %s", checkpoint.getCheckpointTime());
ofyTm()
.transact(
() -> {
DateTime lastWrittenTime = CommitLogCheckpointRoot.loadRoot().getLastWrittenTime();
if (isBeforeOrAt(checkpoint.getCheckpointTime(), lastWrittenTime)) {
logger.atInfo().log(
"Newer checkpoint already written at time: %s", lastWrittenTime);
return;
}
auditedOfy()
.saveIgnoringReadOnlyWithoutBackup()
.entities(
checkpoint, CommitLogCheckpointRoot.create(checkpoint.getCheckpointTime()));
// Enqueue a diff task between previous and current checkpoints.
cloudTasksUtils.enqueue(
QUEUE_NAME,
CloudTasksUtils.createPostTask(
ExportCommitLogDiffAction.PATH,
Service.BACKEND.toString(),
ImmutableMultimap.of(
LOWER_CHECKPOINT_TIME_PARAM,
lastWrittenTime.toString(),
UPPER_CHECKPOINT_TIME_PARAM,
checkpoint.getCheckpointTime().toString())));
});
boolean isCheckPointPersisted =
ofyTm()
.transact(
() -> {
DateTime lastWrittenTime =
CommitLogCheckpointRoot.loadRoot().getLastWrittenTime();
if (isBeforeOrAt(checkpoint.getCheckpointTime(), lastWrittenTime)) {
logger.atInfo().log(
"Newer checkpoint already written at time: %s", lastWrittenTime);
return false;
}
auditedOfy()
.saveIgnoringReadOnlyWithoutBackup()
.entities(
checkpoint,
CommitLogCheckpointRoot.create(checkpoint.getCheckpointTime()));
// Enqueue a diff task between previous and current checkpoints.
cloudTasksUtils.enqueue(
QUEUE_NAME,
CloudTasksUtils.createPostTask(
ExportCommitLogDiffAction.PATH,
Service.BACKEND.toString(),
ImmutableMultimap.of(
LOWER_CHECKPOINT_TIME_PARAM,
lastWrittenTime.toString(),
UPPER_CHECKPOINT_TIME_PARAM,
checkpoint.getCheckpointTime().toString())));
return true;
});
return isCheckPointPersisted ? Optional.of(checkpoint) : Optional.empty();
}
}

View file

@ -0,0 +1,173 @@
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.backup;
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import com.google.common.flogger.FluentLogger;
import google.registry.beam.comparedb.LatestDatastoreSnapshotFinder;
import google.registry.config.RegistryConfig.Config;
import google.registry.model.annotations.DeleteAfterMigration;
import google.registry.model.ofy.CommitLogCheckpoint;
import google.registry.model.replay.ReplicateToDatastoreAction;
import google.registry.request.Action;
import google.registry.request.Action.Service;
import google.registry.request.Parameter;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.util.Sleeper;
import java.util.Optional;
import javax.inject.Inject;
import org.joda.time.DateTime;
import org.joda.time.Duration;
/**
* Synchronizes Datastore to a given SQL snapshot when SQL is the primary database.
*
* <p>The caller takes the responsibility for:
*
* <ul>
* <li>verifying the current migration stage
* <li>acquiring the {@link ReplicateToDatastoreAction#REPLICATE_TO_DATASTORE_LOCK_NAME
* replication lock}, and
* <li>while holding the lock, creating an SQL snapshot and invoking this action with the snapshot
* id
* </ul>
*
* The caller may release the replication lock upon receiving the response from this action. Please
* refer to {@link google.registry.tools.ValidateDatastoreWithSqlCommand} for more information on
* usage.
*
* <p>This action plays SQL transactions up to the user-specified snapshot, creates a new CommitLog
* checkpoint, and exports all CommitLogs to GCS up to this checkpoint. The timestamp of this
* checkpoint can be used to recreate a Datastore snapshot that is equivalent to the given SQL
* snapshot. If this action succeeds, the checkpoint timestamp is included in the response (the
* format of which is defined by {@link #SUCCESS_RESPONSE_TEMPLATE}).
*/
@Action(
service = Service.BACKEND,
path = SyncDatastoreToSqlSnapshotAction.PATH,
method = Action.Method.POST,
auth = Auth.AUTH_INTERNAL_OR_ADMIN)
@DeleteAfterMigration
public class SyncDatastoreToSqlSnapshotAction implements Runnable {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public static final String PATH = "/_dr/task/syncDatastoreToSqlSnapshot";
public static final String SUCCESS_RESPONSE_TEMPLATE =
"Datastore is up-to-date with provided SQL snapshot (%s). CommitLog timestamp is (%s).";
static final String SQL_SNAPSHOT_ID_PARAM = "sqlSnapshotId";
private static final int COMMITLOGS_PRESENCE_CHECK_ATTEMPTS = 10;
private static final Duration COMMITLOGS_PRESENCE_CHECK_DELAY = Duration.standardSeconds(6);
private final Response response;
private final Sleeper sleeper;
@Config("commitLogGcsBucket")
private final String gcsBucket;
private final GcsDiffFileLister gcsDiffFileLister;
private final LatestDatastoreSnapshotFinder datastoreSnapshotFinder;
private final CommitLogCheckpointAction commitLogCheckpointAction;
private final String sqlSnapshotId;
@Inject
SyncDatastoreToSqlSnapshotAction(
Response response,
Sleeper sleeper,
@Config("commitLogGcsBucket") String gcsBucket,
GcsDiffFileLister gcsDiffFileLister,
LatestDatastoreSnapshotFinder datastoreSnapshotFinder,
CommitLogCheckpointAction commitLogCheckpointAction,
@Parameter(SQL_SNAPSHOT_ID_PARAM) String sqlSnapshotId) {
this.response = response;
this.sleeper = sleeper;
this.gcsBucket = gcsBucket;
this.gcsDiffFileLister = gcsDiffFileLister;
this.datastoreSnapshotFinder = datastoreSnapshotFinder;
this.commitLogCheckpointAction = commitLogCheckpointAction;
this.sqlSnapshotId = sqlSnapshotId;
}
@Override
public void run() {
logger.atInfo().log("Datastore validation invoked. SqlSnapshotId is %s.", sqlSnapshotId);
try {
CommitLogCheckpoint checkpoint = ensureDatabasesComparable(sqlSnapshotId);
response.setStatus(SC_OK);
response.setPayload(
String.format(SUCCESS_RESPONSE_TEMPLATE, sqlSnapshotId, checkpoint.getCheckpointTime()));
return;
} catch (Exception e) {
response.setStatus(SC_INTERNAL_SERVER_ERROR);
response.setPayload(e.getMessage());
}
}
private CommitLogCheckpoint ensureDatabasesComparable(String sqlSnapshotId) {
// Replicate SQL transaction to Datastore, up to when this snapshot is taken.
int playbacks = ReplicateToDatastoreAction.replayAllTransactions(Optional.of(sqlSnapshotId));
logger.atInfo().log("Played %s SQL transactions.", playbacks);
Optional<CommitLogCheckpoint> checkpoint = exportCommitLogs();
if (!checkpoint.isPresent()) {
throw new RuntimeException("Cannot create CommitLog checkpoint");
}
logger.atInfo().log(
"CommitLog checkpoint created at %s.", checkpoint.get().getCheckpointTime());
verifyCommitLogsPersisted(checkpoint.get());
return checkpoint.get();
}
private Optional<CommitLogCheckpoint> exportCommitLogs() {
// Trigger an async CommitLog export to GCS. Will check file availability later.
// Although we can add support to synchronous execution, it can disrupt the export cadence
// when the system is busy
Optional<CommitLogCheckpoint> checkpoint =
commitLogCheckpointAction.createCheckPointAndStartAsyncExport();
// Failure to create checkpoint most likely caused by race with cron-triggered checkpointing.
// Retry once.
if (!checkpoint.isPresent()) {
commitLogCheckpointAction.createCheckPointAndStartAsyncExport();
}
return checkpoint;
}
private void verifyCommitLogsPersisted(CommitLogCheckpoint checkpoint) {
DateTime exportStartTime =
datastoreSnapshotFinder
.getSnapshotInfo(checkpoint.getCheckpointTime().toInstant())
.exportInterval()
.getStart();
logger.atInfo().log("Found Datastore export at %s", exportStartTime);
for (int attempts = 0; attempts < COMMITLOGS_PRESENCE_CHECK_ATTEMPTS; attempts++) {
try {
gcsDiffFileLister.listDiffFiles(gcsBucket, exportStartTime, checkpoint.getCheckpointTime());
return;
} catch (IllegalStateException e) {
// Gap in commitlog files. Fall through to sleep and retry.
logger.atInfo().log("Commitlog files not yet found on GCS.");
}
sleeper.sleepInterruptibly(COMMITLOGS_PRESENCE_CHECK_DELAY);
}
throw new RuntimeException("Cannot find all commitlog files.");
}
}

View file

@ -66,7 +66,7 @@ public class LatestDatastoreSnapshotFinder {
* "2021-11-19T06:00:00_76493/2021-11-19T06:00:00_76493.overall_export_metadata".
*/
Optional<String> metaFilePathOptional =
findNewestExportMetadataFileBeforeTime(bucketName, exportEndTimeUpperBound, 2);
findNewestExportMetadataFileBeforeTime(bucketName, exportEndTimeUpperBound, 5);
if (!metaFilePathOptional.isPresent()) {
throw new NoSuchElementException("No exports found over the past 2 days.");
}
@ -125,12 +125,12 @@ public class LatestDatastoreSnapshotFinder {
/** Holds information about a Datastore snapshot. */
@AutoValue
abstract static class DatastoreSnapshotInfo {
abstract String exportDir();
public abstract static class DatastoreSnapshotInfo {
public abstract String exportDir();
abstract String commitLogDir();
public abstract String commitLogDir();
abstract Interval exportInterval();
public abstract Interval exportInterval();
static DatastoreSnapshotInfo create(
String exportDir, String commitLogDir, Interval exportOperationInterval) {

View file

@ -422,6 +422,12 @@ have been in the database for a certain period of time. -->
<url-pattern>/_dr/task/createSyntheticHistoryEntries</url-pattern>
</servlet-mapping>
<!-- Action to sync Datastore to a snapshot of the primary SQL database. -->
<servlet-mapping>
<servlet-name>backend-servlet</servlet-name>
<url-pattern>/_dr/task/syncDatastoreToSqlSnapshot</url-pattern>
</servlet-mapping>
<!-- Security config -->
<security-constraint>
<web-resource-collection>

View file

@ -59,6 +59,10 @@ public class ReplicateToDatastoreAction implements Runnable {
public static final String PATH = "/_dr/cron/replicateToDatastore";
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
/** Name of the lock that ensures sequential execution of replays. */
public static final String REPLICATE_TO_DATASTORE_LOCK_NAME =
ReplicateToDatastoreAction.class.getSimpleName();
/**
* Number of transactions to fetch from SQL. The rationale for 200 is that we're processing these
* every minute and our production instance currently does about 2 mutations per second, so this
@ -66,7 +70,7 @@ public class ReplicateToDatastoreAction implements Runnable {
*/
public static final int BATCH_SIZE = 200;
private static final Duration LEASE_LENGTH = standardHours(1);
public static final Duration REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH = standardHours(1);
private final Clock clock;
private final RequestStatusChecker requestStatusChecker;
@ -81,21 +85,26 @@ public class ReplicateToDatastoreAction implements Runnable {
}
@VisibleForTesting
public List<TransactionEntity> getTransactionBatch() {
public List<TransactionEntity> getTransactionBatchAtSnapshot() {
return getTransactionBatchAtSnapshot(Optional.empty());
}
static List<TransactionEntity> getTransactionBatchAtSnapshot(Optional<String> snapshotId) {
// Get the next batch of transactions that we haven't replicated.
LastSqlTransaction lastSqlTxnBeforeBatch = ofyTm().transact(LastSqlTransaction::load);
try {
return jpaTm()
.transactWithoutBackup(
() ->
jpaTm()
.query(
"SELECT txn FROM TransactionEntity txn WHERE id >"
+ " :lastId ORDER BY id",
TransactionEntity.class)
.setParameter("lastId", lastSqlTxnBeforeBatch.getTransactionId())
.setMaxResults(BATCH_SIZE)
.getResultList());
() -> {
snapshotId.ifPresent(jpaTm()::setDatabaseSnapshot);
return jpaTm()
.query(
"SELECT txn FROM TransactionEntity txn WHERE id >" + " :lastId ORDER BY id",
TransactionEntity.class)
.setParameter("lastId", lastSqlTxnBeforeBatch.getTransactionId())
.setMaxResults(BATCH_SIZE)
.getResultList();
});
} catch (NoResultException e) {
return ImmutableList.of();
}
@ -108,7 +117,7 @@ public class ReplicateToDatastoreAction implements Runnable {
* <p>Throws an exception if a fatal error occurred and the batch should be aborted
*/
@VisibleForTesting
public void applyTransaction(TransactionEntity txnEntity) {
public static void applyTransaction(TransactionEntity txnEntity) {
logger.atInfo().log("Applying a single transaction Cloud SQL -> Cloud Datastore.");
try (UpdateAutoTimestamp.DisableAutoUpdateResource disabler =
UpdateAutoTimestamp.disableAutoUpdate()) {
@ -174,7 +183,11 @@ public class ReplicateToDatastoreAction implements Runnable {
}
Optional<Lock> lock =
Lock.acquireSql(
this.getClass().getSimpleName(), null, LEASE_LENGTH, requestStatusChecker, false);
REPLICATE_TO_DATASTORE_LOCK_NAME,
null,
REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH,
requestStatusChecker,
false);
if (!lock.isPresent()) {
String message = "Can't acquire ReplicateToDatastoreAction lock, aborting.";
logger.atSevere().log(message);
@ -203,10 +216,14 @@ public class ReplicateToDatastoreAction implements Runnable {
}
private int replayAllTransactions() {
return replayAllTransactions(Optional.empty());
}
public static int replayAllTransactions(Optional<String> snapshotId) {
int numTransactionsReplayed = 0;
List<TransactionEntity> transactionBatch;
do {
transactionBatch = getTransactionBatch();
transactionBatch = getTransactionBatchAtSnapshot(snapshotId);
for (TransactionEntity transaction : transactionBatch) {
applyTransaction(transaction);
numTransactionsReplayed++;

View file

@ -21,6 +21,7 @@ import google.registry.backup.CommitLogCheckpointAction;
import google.registry.backup.DeleteOldCommitLogsAction;
import google.registry.backup.ExportCommitLogDiffAction;
import google.registry.backup.ReplayCommitLogsToSqlAction;
import google.registry.backup.SyncDatastoreToSqlSnapshotAction;
import google.registry.batch.BatchModule;
import google.registry.batch.DeleteContactsAndHostsAction;
import google.registry.batch.DeleteExpiredDomainsAction;
@ -199,6 +200,8 @@ interface BackendRequestComponent {
SendExpiringCertificateNotificationEmailAction sendExpiringCertificateNotificationEmailAction();
SyncDatastoreToSqlSnapshotAction syncDatastoreToSqlSnapshotAction();
SyncGroupMembersAction syncGroupMembersAction();
SyncRegistrarsSheetAction syncRegistrarsSheetAction();

View file

@ -123,6 +123,7 @@ public final class RegistryTool {
.put("update_server_locks", UpdateServerLocksCommand.class)
.put("update_tld", UpdateTldCommand.class)
.put("upload_claims_list", UploadClaimsListCommand.class)
.put("validate_datastore_with_sql", ValidateDatastoreWithSqlCommand.class)
.put("validate_escrow_deposit", ValidateEscrowDepositCommand.class)
.put("validate_login_credentials", ValidateLoginCredentialsCommand.class)
.put("verify_ote", VerifyOteCommand.class)

View file

@ -76,6 +76,7 @@ import javax.inject.Singleton;
LocalCredentialModule.class,
PersistenceModule.class,
RdeModule.class,
RegistryToolDataflowModule.class,
RequestFactoryModule.class,
SecretManagerModule.class,
URLFetchServiceModule.class,
@ -170,6 +171,8 @@ interface RegistryToolComponent {
void inject(UpdateTldCommand command);
void inject(ValidateDatastoreWithSqlCommand command);
void inject(ValidateEscrowDepositCommand command);
void inject(ValidateLoginCredentialsCommand command);

View file

@ -0,0 +1,39 @@
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.tools;
import com.google.api.services.dataflow.Dataflow;
import dagger.Module;
import dagger.Provides;
import google.registry.config.CredentialModule.LocalCredential;
import google.registry.config.RegistryConfig.Config;
import google.registry.util.GoogleCredentialsBundle;
/** Provides a {@link Dataflow} API client for use in {@link RegistryTool}. */
@Module
public class RegistryToolDataflowModule {
@Provides
static Dataflow provideDataflow(
@LocalCredential GoogleCredentialsBundle credentialsBundle,
@Config("projectId") String projectId) {
return new Dataflow.Builder(
credentialsBundle.getHttpTransport(),
credentialsBundle.getJsonFactory(),
credentialsBundle.getHttpRequestInitializer())
.setApplicationName(String.format("%s nomulus", projectId))
.build();
}
}

View file

@ -0,0 +1,229 @@
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.tools;
import static google.registry.beam.BeamUtils.createJobName;
import static google.registry.model.replay.ReplicateToDatastoreAction.REPLICATE_TO_DATASTORE_LOCK_NAME;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.MediaType;
import google.registry.backup.SyncDatastoreToSqlSnapshotAction;
import google.registry.beam.common.DatabaseSnapshot;
import google.registry.config.RegistryConfig.Config;
import google.registry.model.common.DatabaseMigrationStateSchedule;
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection;
import google.registry.model.replay.ReplicateToDatastoreAction;
import google.registry.model.server.Lock;
import google.registry.request.Action.Service;
import google.registry.util.Clock;
import google.registry.util.RequestStatusChecker;
import google.registry.util.Sleeper;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import javax.inject.Inject;
import org.joda.time.Duration;
/**
* Validates asynchronously replicated data from the primary Cloud SQL database to Datastore.
*
* <p>This command suspends the replication process (by acquiring the replication lock), take a
* snapshot of the Cloud SQL database, invokes a Nomulus server action to sync Datastore to this
* snapshot (See {@link SyncDatastoreToSqlSnapshotAction} for details), and finally launches a BEAM
* pipeline to compare Datastore with the given SQL snapshot.
*
* <p>This command does not lock up the SQL database. Normal processing can proceed.
*/
@Parameters(commandDescription = "Validates Datastore with Cloud SQL.")
public class ValidateDatastoreWithSqlCommand
implements CommandWithConnection, CommandWithRemoteApi {
private static final Service NOMULUS_SERVICE = Service.BACKEND;
private static final String PIPELINE_NAME = "validate_datastore_pipeline";
// States indicating a job is not finished yet.
private static final ImmutableSet<String> DATAFLOW_JOB_RUNNING_STATES =
ImmutableSet.of(
"JOB_STATE_RUNNING", "JOB_STATE_STOPPED", "JOB_STATE_PENDING", "JOB_STATE_QUEUED");
private static final Duration JOB_POLLING_INTERVAL = Duration.standardSeconds(60);
@Parameter(
names = {"-m", "--manual"},
description =
"If true, let user launch the comparison pipeline manually out of band. "
+ "Command will wait for user key-press to exit after syncing Datastore.")
boolean manualLaunchPipeline;
@Inject Clock clock;
@Inject Dataflow dataflow;
@Inject
@Config("defaultJobRegion")
String jobRegion;
@Inject
@Config("beamStagingBucketUrl")
String stagingBucketUrl;
@Inject
@Config("projectId")
String projectId;
@Inject Sleeper sleeper;
private AppEngineConnection connection;
@Override
public void setConnection(AppEngineConnection connection) {
this.connection = connection;
}
@Override
public void run() throws Exception {
MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc());
if (!state.getReplayDirection().equals(ReplayDirection.SQL_TO_DATASTORE)) {
throw new IllegalStateException("Cannot sync Datastore to SQL in migration step " + state);
}
Optional<Lock> lock =
Lock.acquireSql(
REPLICATE_TO_DATASTORE_LOCK_NAME,
null,
ReplicateToDatastoreAction.REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH,
new FakeRequestStatusChecker(),
false);
if (!lock.isPresent()) {
throw new IllegalStateException("Cannot acquire the async propagation lock.");
}
try {
try (DatabaseSnapshot snapshot = DatabaseSnapshot.createSnapshot()) {
System.out.printf("Obtained snapshot %s\n", snapshot.getSnapshotId());
AppEngineConnection connectionToService = connection.withService(NOMULUS_SERVICE);
String response =
connectionToService.sendPostRequest(
getNomulusEndpoint(snapshot.getSnapshotId()),
ImmutableMap.<String, String>of(),
MediaType.PLAIN_TEXT_UTF_8,
"".getBytes(UTF_8));
System.out.println(response);
lock.ifPresent(Lock::releaseSql);
lock = Optional.empty();
// See SyncDatastoreToSqlSnapshotAction for response format.
String latestCommitTimestamp =
response.substring(response.lastIndexOf('(') + 1, response.lastIndexOf(')'));
if (manualLaunchPipeline) {
System.out.print("\nEnter any key to continue when the pipeline ends:");
System.in.read();
} else {
Job pipelineJob =
launchComparisonPipeline(snapshot.getSnapshotId(), latestCommitTimestamp).getJob();
String jobId = pipelineJob.getId();
System.out.printf(
"Launched comparison pipeline %s (%s).\n", pipelineJob.getName(), jobId);
while (DATAFLOW_JOB_RUNNING_STATES.contains(getDataflowJobStatus(jobId))) {
sleeper.sleepInterruptibly(JOB_POLLING_INTERVAL);
}
System.out.printf(
"Pipeline ended with %s state. Please check counters for results.\n",
getDataflowJobStatus(jobId));
}
}
} finally {
lock.ifPresent(Lock::releaseSql);
}
}
private static String getNomulusEndpoint(String sqlSnapshotId) {
return String.format(
"%s?sqlSnapshotId=%s", SyncDatastoreToSqlSnapshotAction.PATH, sqlSnapshotId);
}
private LaunchFlexTemplateResponse launchComparisonPipeline(
String sqlSnapshotId, String latestCommitLogTimestamp) {
try {
LaunchFlexTemplateParameter parameter =
new LaunchFlexTemplateParameter()
.setJobName(createJobName("validate-datastore", clock))
.setContainerSpecGcsPath(
String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME))
.setParameters(
ImmutableMap.of(
"sqlSnapshotId",
sqlSnapshotId,
"latestCommitLogTimestamp",
latestCommitLogTimestamp,
"registryEnvironment",
RegistryToolEnvironment.get().name()));
return dataflow
.projects()
.locations()
.flexTemplates()
.launch(
projectId, jobRegion, new LaunchFlexTemplateRequest().setLaunchParameter(parameter))
.execute();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private String getDataflowJobStatus(String jobId) {
try {
return dataflow
.projects()
.locations()
.jobs()
.get(projectId, jobRegion, jobId)
.execute()
.getCurrentState();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* A fake implementation of {@link RequestStatusChecker} for managing SQL-backed locks from
* non-AppEngine platforms. This is only required until the Nomulus server is migrated off
* AppEngine.
*/
static class FakeRequestStatusChecker implements RequestStatusChecker {
@Override
public String getLogId() {
return ValidateDatastoreWithSqlCommand.class.getSimpleName() + "-" + UUID.randomUUID();
}
@Override
public boolean isRunning(String requestLogId) {
return false;
}
}
}

View file

@ -15,6 +15,7 @@
package google.registry.model.replay;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.model.replay.ReplicateToDatastoreAction.applyTransaction;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
import static google.registry.testing.DatabaseHelper.insertInDb;
@ -158,23 +159,23 @@ public class ReplicateToDatastoreActionTest {
// Write a transaction and run just the batch fetch.
insertInDb(foo);
List<TransactionEntity> txns1 = action.getTransactionBatch();
List<TransactionEntity> txns1 = action.getTransactionBatchAtSnapshot();
assertThat(txns1).hasSize(1);
// Write a second transaction and do another batch fetch.
insertInDb(bar);
List<TransactionEntity> txns2 = action.getTransactionBatch();
List<TransactionEntity> txns2 = action.getTransactionBatchAtSnapshot();
assertThat(txns2).hasSize(2);
// Apply the first batch.
action.applyTransaction(txns1.get(0));
applyTransaction(txns1.get(0));
// Remove the foo record so we can ensure that this transaction doesn't get doublle-played.
ofyTm().transact(() -> ofyTm().delete(foo.key()));
// Apply the second batch.
for (TransactionEntity txn : txns2) {
action.applyTransaction(txn);
applyTransaction(txn);
}
// Verify that the first transaction didn't get replayed but the second one did.
@ -212,10 +213,9 @@ public class ReplicateToDatastoreActionTest {
// Force the last transaction id back to -1 so that we look for transaction 0.
ofyTm().transact(() -> ofyTm().insert(new LastSqlTransaction(-1)));
List<TransactionEntity> txns = action.getTransactionBatch();
List<TransactionEntity> txns = action.getTransactionBatchAtSnapshot();
assertThat(txns).hasSize(1);
assertThat(
assertThrows(IllegalStateException.class, () -> action.applyTransaction(txns.get(0))))
assertThat(assertThrows(IllegalStateException.class, () -> applyTransaction(txns.get(0))))
.hasMessageThat()
.isEqualTo("Missing transaction: last txn id = -1, next available txn = 1");
}

View file

@ -249,9 +249,9 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback {
List<TransactionEntity> transactionBatch;
do {
transactionBatch = sqlToDsReplicator.getTransactionBatch();
transactionBatch = sqlToDsReplicator.getTransactionBatchAtSnapshot();
for (TransactionEntity txn : transactionBatch) {
sqlToDsReplicator.applyTransaction(txn);
ReplicateToDatastoreAction.applyTransaction(txn);
if (compare) {
ofyTm().transact(() -> compareSqlTransaction(txn));
}

View file

@ -39,6 +39,7 @@ PATH CLASS
/_dr/task/resaveAllEppResources ResaveAllEppResourcesAction GET n INTERNAL,API APP ADMIN
/_dr/task/resaveEntity ResaveEntityAction POST n INTERNAL,API APP ADMIN
/_dr/task/sendExpiringCertificateNotificationEmail SendExpiringCertificateNotificationEmailAction GET n INTERNAL,API APP ADMIN
/_dr/task/syncDatastoreToSqlSnapshot SyncDatastoreToSqlSnapshotAction POST n INTERNAL,API APP ADMIN
/_dr/task/syncGroupMembers SyncGroupMembersAction POST n INTERNAL,API APP ADMIN
/_dr/task/syncRegistrarsSheet SyncRegistrarsSheetAction POST n INTERNAL,API APP ADMIN
/_dr/task/tmchCrl TmchCrlAction POST y INTERNAL,API APP ADMIN