Add a tools command to launch SQL validation job (#1526)

* Add a tools command to launch SQL validation job

Stopping using Pipeline.run().waitUntilFinish in
ValidateDatastorePipeline. Flex-templalate does not support blocking
wait in the main thread.

This PR adds a new ValidateSqlCommand that launches the pipeline and
maintains the SQL snapshot while the pipeline is running.

This PR also added more parameters to both ValidateSqlCommand and
ValidateDatastoreCommand:
- The -c option to supply an optional incremental comparison start time
- The -r option to supply an optional release tag that is not 'live',
  e.g., nomulus-DDDDYYMM-RC00

If the manual launch option (-m) is enabled, the commands will print the
gcloud command that can launch the pipeline.

Tested with sandbox, qa and the dev project.
This commit is contained in:
Weimin Yu 2022-02-28 13:14:57 -05:00 committed by GitHub
parent 55c368cf13
commit e47be4fa2c
17 changed files with 482 additions and 521 deletions

View file

@ -707,10 +707,7 @@ createToolTask(
'initSqlPipeline', 'google.registry.beam.initsql.InitSqlPipeline')
createToolTask(
'validateSqlPipeline', 'google.registry.beam.comparedb.ValidateSqlPipeline')
createToolTask(
'validateDatastorePipeline', 'google.registry.beam.comparedb.ValidateDatastorePipeline')
'validateDatabasePipeline', 'google.registry.beam.comparedb.ValidateDatabasePipeline')
createToolTask(
@ -797,15 +794,10 @@ if (environment == 'alpha') {
mainClass: 'google.registry.beam.rde.RdePipeline',
metaData : 'google/registry/beam/rde_pipeline_metadata.json'
],
validateDatastore :
validateDatabase :
[
mainClass: 'google.registry.beam.comparedb.ValidateDatastorePipeline',
metaData: 'google/registry/beam/validate_datastore_pipeline_metadata.json'
],
validateSql :
[
mainClass: 'google.registry.beam.comparedb.ValidateSqlPipeline',
metaData: 'google/registry/beam/validate_sql_pipeline_metadata.json'
mainClass: 'google.registry.beam.comparedb.ValidateDatabasePipeline',
metaData: 'google/registry/beam/validate_database_pipeline_metadata.json'
],
]
project.tasks.create("stageBeamPipelines") {

View file

@ -76,7 +76,10 @@ public class ReplayCommitLogsToSqlAction implements Runnable {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final Duration LEASE_LENGTH = standardHours(1);
public static final String REPLAY_TO_SQL_LOCK_NAME =
ReplayCommitLogsToSqlAction.class.getSimpleName();
public static final Duration REPLAY_TO_SQL_LOCK_LEASE_LENGTH = standardHours(1);
// Stop / pause where we are if we've been replaying for more than five minutes to avoid GAE
// request timeouts
private static final Duration REPLAY_TIMEOUT_DURATION = Duration.standardMinutes(5);
@ -115,7 +118,11 @@ public class ReplayCommitLogsToSqlAction implements Runnable {
}
Optional<Lock> lock =
Lock.acquireSql(
this.getClass().getSimpleName(), null, LEASE_LENGTH, requestStatusChecker, false);
REPLAY_TO_SQL_LOCK_NAME,
null,
REPLAY_TO_SQL_LOCK_LEASE_LENGTH,
requestStatusChecker,
false);
if (!lock.isPresent()) {
String message = "Can't acquire SQL commit log replay lock, aborting.";
logger.atSevere().log(message);

View file

@ -48,8 +48,7 @@ import org.joda.time.Duration;
* </ul>
*
* The caller may release the replication lock upon receiving the response from this action. Please
* refer to {@link google.registry.tools.ValidateDatastoreWithSqlCommand} for more information on
* usage.
* refer to {@link google.registry.tools.ValidateDatastoreCommand} for more information on usage.
*
* <p>This action plays SQL transactions up to the user-specified snapshot, creates a new CommitLog
* checkpoint, and exports all CommitLogs to GCS up to this checkpoint. The timestamp of this

View file

@ -18,31 +18,20 @@ import static com.google.common.base.Verify.verify;
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.flogger.FluentLogger;
import google.registry.beam.common.DatabaseSnapshot;
import google.registry.beam.common.RegistryPipelineOptions;
import google.registry.beam.common.RegistryPipelineWorkerInitializer;
import google.registry.beam.comparedb.LatestDatastoreSnapshotFinder.DatastoreSnapshotInfo;
import google.registry.beam.comparedb.ValidateSqlUtils.CompareSqlEntity;
import google.registry.model.annotations.DeleteAfterMigration;
import google.registry.model.common.DatabaseMigrationStateSchedule;
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection;
import google.registry.model.domain.DomainBase;
import google.registry.model.domain.DomainHistory;
import google.registry.model.replay.SqlEntity;
import google.registry.model.replay.SqlReplayCheckpoint;
import google.registry.model.server.Lock;
import google.registry.persistence.PersistenceModule.JpaTransactionManagerType;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import google.registry.persistence.transaction.TransactionManagerFactory;
import google.registry.util.RequestStatusChecker;
import google.registry.util.SystemClock;
import java.io.Serializable;
import java.util.Optional;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Flatten;
@ -56,78 +45,46 @@ import org.joda.time.DateTime;
import org.joda.time.Duration;
/**
* Validates the asynchronous data replication process from Datastore (primary storage) to Cloud SQL
* (secondary storage).
* Validates the asynchronous data replication process between Datastore and Cloud SQL.
*
* <p>This pipeline is to be launched by {@link google.registry.tools.ValidateDatastoreCommand} or
* {@link google.registry.tools.ValidateSqlCommand}.
*/
@DeleteAfterMigration
public class ValidateSqlPipeline {
public class ValidateDatabasePipeline {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
/** Specifies the extra CommitLogs to load before the start of a Database export. */
private static final Duration COMMITLOG_START_TIME_MARGIN = Duration.standardMinutes(10);
/**
* Name of the lock used by the commitlog replay process.
*
* <p>See {@link google.registry.backup.ReplayCommitLogsToSqlAction} for more information.
*/
private static final String COMMITLOG_REPLAY_LOCK_NAME = "ReplayCommitLogsToSqlAction";
private static final Duration REPLAY_LOCK_LEASE_LENGTH = Duration.standardHours(1);
private static final java.time.Duration REPLAY_LOCK_ACQUIRE_TIMEOUT =
java.time.Duration.ofMinutes(6);
private static final java.time.Duration REPLAY_LOCK_ACQUIRE_DELAY =
java.time.Duration.ofSeconds(30);
private final ValidateSqlPipelineOptions options;
private final ValidateDatabasePipelineOptions options;
private final LatestDatastoreSnapshotFinder datastoreSnapshotFinder;
public ValidateSqlPipeline(
ValidateSqlPipelineOptions options, LatestDatastoreSnapshotFinder datastoreSnapshotFinder) {
public ValidateDatabasePipeline(
ValidateDatabasePipelineOptions options,
LatestDatastoreSnapshotFinder datastoreSnapshotFinder) {
this.options = options;
this.datastoreSnapshotFinder = datastoreSnapshotFinder;
}
@VisibleForTesting
void run(Pipeline pipeline) {
Optional<Lock> lock = acquireCommitLogReplayLock();
if (lock.isPresent()) {
logger.atInfo().log("Acquired CommitLog Replay lock.");
} else {
throw new RuntimeException("Failed to acquire CommitLog Replay lock.");
}
DateTime latestCommitLogTime = DateTime.parse(options.getLatestCommitLogTimestamp());
DatastoreSnapshotInfo mostRecentExport =
datastoreSnapshotFinder.getSnapshotInfo(latestCommitLogTime.toInstant());
try {
DateTime latestCommitLogTime =
TransactionManagerFactory.jpaTm().transact(() -> SqlReplayCheckpoint.get());
DatastoreSnapshotInfo mostRecentExport =
datastoreSnapshotFinder.getSnapshotInfo(latestCommitLogTime.toInstant());
Preconditions.checkState(
latestCommitLogTime.isAfter(mostRecentExport.exportInterval().getEnd()),
"Cannot recreate Datastore snapshot since target time is in the middle of an export.");
try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {
// Eagerly release the commitlog replay lock so that replay can resume.
lock.ifPresent(Lock::releaseSql);
lock = Optional.empty();
logger.atInfo().log(
"Comparing datastore export at %s and commitlog timestamp %s.",
mostRecentExport.exportDir(), latestCommitLogTime);
logger.atInfo().log(
"Starting comparison with export at %s and latestCommitLogTime at %s",
mostRecentExport.exportDir(), latestCommitLogTime);
setupPipeline(
pipeline,
Optional.ofNullable(options.getSqlSnapshotId()),
mostRecentExport,
latestCommitLogTime,
Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse));
setupPipeline(
pipeline,
Optional.of(databaseSnapshot.getSnapshotId()),
mostRecentExport,
latestCommitLogTime,
Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse));
State state = pipeline.run().waitUntilFinish();
if (!State.DONE.equals(state)) {
throw new IllegalStateException("Unexpected pipeline state: " + state);
}
}
} finally {
lock.ifPresent(Lock::releaseSql);
}
pipeline.run();
}
static void setupPipeline(
@ -170,7 +127,7 @@ public class ValidateSqlPipeline {
.apply("Combine from both snapshots: " + clazz.getSimpleName(), Flatten.pCollections())
.apply(
"Assign primary key to merged " + clazz.getSimpleName(),
WithKeys.of(ValidateSqlPipeline::getPrimaryKeyString).withKeyType(strings()))
WithKeys.of(ValidateDatabasePipeline::getPrimaryKeyString).withKeyType(strings()))
.apply("Group by primary key " + clazz.getSimpleName(), GroupByKey.create())
.apply("Compare " + clazz.getSimpleName(), ParDo.of(new CompareSqlEntity()));
}
@ -189,76 +146,25 @@ public class ValidateSqlPipeline {
return sqlEntity.getPrimaryKeyString();
}
private static Optional<Lock> acquireCommitLogReplayLock() {
Stopwatch stopwatch = Stopwatch.createStarted();
while (stopwatch.elapsed().minus(REPLAY_LOCK_ACQUIRE_TIMEOUT).isNegative()) {
Optional<Lock> lock = tryAcquireCommitLogReplayLock();
if (lock.isPresent()) {
return lock;
}
logger.atInfo().log("Failed to acquired CommitLog Replay lock. Will retry...");
try {
Thread.sleep(REPLAY_LOCK_ACQUIRE_DELAY.toMillis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted.");
}
}
return Optional.empty();
}
private static Optional<Lock> tryAcquireCommitLogReplayLock() {
return Lock.acquireSql(
COMMITLOG_REPLAY_LOCK_NAME,
null,
REPLAY_LOCK_LEASE_LENGTH,
getLockingRequestStatusChecker(),
false);
}
/**
* Returns a fake implementation of {@link RequestStatusChecker} that is required for lock
* acquisition. The default implementation is AppEngine-specific and is unusable on GCE.
*/
private static RequestStatusChecker getLockingRequestStatusChecker() {
return new RequestStatusChecker() {
@Override
public String getLogId() {
return "ValidateSqlPipeline";
}
LatestDatastoreSnapshotFinder datastoreSnapshotFinder =
DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create()
.datastoreSnapshotInfoFinder();
@Override
public boolean isRunning(String requestLogId) {
return true;
}
};
}
public static void main(String[] args) {
ValidateSqlPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ValidateSqlPipelineOptions.class);
ValidateDatabasePipelineOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(ValidateDatabasePipelineOptions.class);
RegistryPipelineOptions.validateRegistryPipelineOptions(options);
// Defensively set important options.
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_REPEATABLE_READ);
options.setJpaTransactionManagerType(JpaTransactionManagerType.BULK_QUERY);
// Reuse Dataflow worker initialization code to set up JPA in the pipeline harness.
// Set up JPA in the pipeline harness (the locally executed part of the main() method). Reuse
// code in RegistryPipelineWorkerInitializer, which only applies to pipeline worker VMs.
new RegistryPipelineWorkerInitializer().beforeProcessing(options);
MigrationState state =
DatabaseMigrationStateSchedule.getValueAtTime(new SystemClock().nowUtc());
if (!state.getReplayDirection().equals(ReplayDirection.DATASTORE_TO_SQL)) {
throw new IllegalStateException("This pipeline is not designed for migration phase " + state);
}
LatestDatastoreSnapshotFinder datastoreSnapshotFinder =
DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create()
.datastoreSnapshotInfoFinder();
new ValidateSqlPipeline(options, datastoreSnapshotFinder).run(Pipeline.create(options));
new ValidateDatabasePipeline(options, datastoreSnapshotFinder).run(Pipeline.create(options));
}
}

View file

@ -14,14 +14,15 @@
package google.registry.beam.comparedb;
import google.registry.beam.common.RegistryPipelineOptions;
import google.registry.model.annotations.DeleteAfterMigration;
import javax.annotation.Nullable;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
/** BEAM pipeline options for {@link ValidateDatastorePipelineOptions}. */
/** BEAM pipeline options for {@link ValidateDatabasePipeline}. */
@DeleteAfterMigration
public interface ValidateDatastorePipelineOptions extends ValidateSqlPipelineOptions {
public interface ValidateDatabasePipelineOptions extends RegistryPipelineOptions {
@Description(
"The id of the SQL snapshot to be compared with Datastore. "
@ -36,4 +37,13 @@ public interface ValidateDatastorePipelineOptions extends ValidateSqlPipelineOpt
String getLatestCommitLogTimestamp();
void setLatestCommitLogTimestamp(String commitLogEndTimestamp);
@Description(
"For history entries and EPP resources, only those modified strictly after this time are "
+ "included in comparison. Value is in ISO8601 format. "
+ "Other entity types are not affected.")
@Nullable
String getComparisonStartTimestamp();
void setComparisonStartTimestamp(String comparisonStartTimestamp);
}

View file

@ -1,83 +0,0 @@
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.comparedb;
import google.registry.beam.common.RegistryPipelineOptions;
import google.registry.beam.common.RegistryPipelineWorkerInitializer;
import google.registry.beam.comparedb.LatestDatastoreSnapshotFinder.DatastoreSnapshotInfo;
import google.registry.model.annotations.DeleteAfterMigration;
import google.registry.persistence.PersistenceModule.JpaTransactionManagerType;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import java.util.Optional;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.joda.time.DateTime;
/**
* Validates the asynchronous data replication process from Cloud SQL (primary) to Datastore
* (secondary).
*
* <p>This pipeline simply compares the snapshots provided by an invoker, which is responsible for
* obtaining two consistent snapshots for the same point of time.
*/
// TODO(weiminyu): Implement the invoker action in a followup PR.
@DeleteAfterMigration
public class ValidateDatastorePipeline {
private final ValidateDatastorePipelineOptions options;
private final LatestDatastoreSnapshotFinder datastoreSnapshotFinder;
public ValidateDatastorePipeline(
ValidateDatastorePipelineOptions options,
LatestDatastoreSnapshotFinder datastoreSnapshotFinder) {
this.options = options;
this.datastoreSnapshotFinder = datastoreSnapshotFinder;
}
void run(Pipeline pipeline) {
DateTime latestCommitLogTime = DateTime.parse(options.getLatestCommitLogTimestamp());
DatastoreSnapshotInfo mostRecentExport =
datastoreSnapshotFinder.getSnapshotInfo(latestCommitLogTime.toInstant());
ValidateSqlPipeline.setupPipeline(
pipeline,
Optional.ofNullable(options.getSqlSnapshotId()),
mostRecentExport,
latestCommitLogTime,
Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse));
pipeline.run();
}
public static void main(String[] args) {
ValidateDatastorePipelineOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(ValidateDatastorePipelineOptions.class);
RegistryPipelineOptions.validateRegistryPipelineOptions(options);
// Defensively set important options.
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_REPEATABLE_READ);
options.setJpaTransactionManagerType(JpaTransactionManagerType.BULK_QUERY);
// Reuse Dataflow worker initialization code to set up JPA in the pipeline harness.
new RegistryPipelineWorkerInitializer().beforeProcessing(options);
LatestDatastoreSnapshotFinder datastoreSnapshotFinder =
DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create()
.datastoreSnapshotInfoFinder();
new ValidateDatastorePipeline(options, datastoreSnapshotFinder).run(Pipeline.create(options));
}
}

View file

@ -1,34 +0,0 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.comparedb;
import google.registry.beam.common.RegistryPipelineOptions;
import google.registry.model.annotations.DeleteAfterMigration;
import javax.annotation.Nullable;
import org.apache.beam.sdk.options.Description;
/** BEAM pipeline options for {@link ValidateSqlPipeline}. */
@DeleteAfterMigration
public interface ValidateSqlPipelineOptions extends RegistryPipelineOptions {
@Description(
"For history entries and EPP resources, only those modified strictly after this time are "
+ "included in comparison. Value is in ISO8601 format. "
+ "Other entity types are not affected.")
@Nullable
String getComparisonStartTimestamp();
void setComparisonStartTimestamp(String comparisonStartTimestamp);
}

View file

@ -51,7 +51,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
/** Helpers for use by {@link ValidateSqlPipeline}. */
/** Helpers for use by {@link ValidateDatabasePipeline}. */
@DeleteAfterMigration
final class ValidateSqlUtils {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@ -66,7 +66,7 @@ final class ValidateSqlUtils {
* Query template for finding the median value of the {@code history_revision_id} column in one of
* the History tables.
*
* <p>The {@link ValidateSqlPipeline} uses this query to parallelize the query to some of the
* <p>The {@link ValidateDatabasePipeline} uses this query to parallelize the query to some of the
* history tables. Although the {@code repo_id} column is the leading column in the primary keys
* of these tables, in practice and with production data, division by {@code history_revision_id}
* works slightly faster for unknown reasons.

View file

@ -123,9 +123,10 @@ public final class RegistryTool {
.put("update_server_locks", UpdateServerLocksCommand.class)
.put("update_tld", UpdateTldCommand.class)
.put("upload_claims_list", UploadClaimsListCommand.class)
.put("validate_datastore_with_sql", ValidateDatastoreWithSqlCommand.class)
.put("validate_datastore", ValidateDatastoreCommand.class)
.put("validate_escrow_deposit", ValidateEscrowDepositCommand.class)
.put("validate_login_credentials", ValidateLoginCredentialsCommand.class)
.put("validate_sql", ValidateSqlCommand.class)
.put("verify_ote", VerifyOteCommand.class)
.put("whois_query", WhoisQueryCommand.class)
.build();

View file

@ -171,12 +171,14 @@ interface RegistryToolComponent {
void inject(UpdateTldCommand command);
void inject(ValidateDatastoreWithSqlCommand command);
void inject(ValidateDatastoreCommand command);
void inject(ValidateEscrowDepositCommand command);
void inject(ValidateLoginCredentialsCommand command);
void inject(ValidateSqlCommand command);
void inject(WhoisQueryCommand command);
AppEngineConnection appEngineConnection();

View file

@ -0,0 +1,217 @@
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.tools;
import static google.registry.beam.BeamUtils.createJobName;
import com.beust.jcommander.Parameter;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
import com.google.common.base.Ascii;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import google.registry.beam.common.DatabaseSnapshot;
import google.registry.config.RegistryConfig.Config;
import google.registry.tools.params.DateTimeParameter;
import google.registry.util.Clock;
import google.registry.util.RequestStatusChecker;
import google.registry.util.Sleeper;
import java.io.IOException;
import java.util.UUID;
import javax.inject.Inject;
import org.joda.time.DateTime;
import org.joda.time.Duration;
/** Shared setup for commands that validate the data replication between Datastore and Cloud SQL. */
abstract class ValidateDatabaseMigrationCommand
implements CommandWithConnection, CommandWithRemoteApi {
private static final String PIPELINE_NAME = "validate_database_pipeline";
private static final String MANUAL_PIPELINE_LAUNCH_COMMAND_TEMPLATE =
"gcloud dataflow flex-template run "
+ "\"%s-${USER}-$(date +%%Y%%m%%dt%%H%%M%%S)\" "
+ "--template-file-gcs-location %s "
+ "--project %s "
+ "--region=%s "
+ "--worker-machine-type=n2-standard-8 --num-workers=8 "
+ "--parameters registryEnvironment=%s "
+ "--parameters sqlSnapshotId=%s "
+ "--parameters latestCommitLogTimestamp=%s ";
// States indicating a job is not finished yet.
static final ImmutableSet<String> DATAFLOW_JOB_RUNNING_STATES =
ImmutableSet.of(
"JOB_STATE_UNKNOWN",
"JOB_STATE_RUNNING",
"JOB_STATE_STOPPED",
"JOB_STATE_PENDING",
"JOB_STATE_QUEUED");
static final Duration JOB_POLLING_INTERVAL = Duration.standardSeconds(60);
@Parameter(
names = {"-m", "--manual"},
description =
"If true, let user launch the comparison pipeline manually out of band. "
+ "Command will wait for user key-press to exit after syncing Datastore.")
boolean manualLaunchPipeline;
@Parameter(
names = {"-r", "--release"},
description = "The release tag of the BEAM pipeline to run. It defaults to 'live'.")
String release = "live";
@Parameter(
names = {"-c", "--comparisonStartTimestamp"},
description =
"When comparing History and Epp Resource entities, ignore those that have not"
+ " changed since this time.",
converter = DateTimeParameter.class)
DateTime comparisonStartTimestamp;
@Inject Clock clock;
@Inject Dataflow dataflow;
@Inject
@Config("defaultJobRegion")
String jobRegion;
@Inject
@Config("beamStagingBucketUrl")
String stagingBucketUrl;
@Inject
@Config("projectId")
String projectId;
@Inject Sleeper sleeper;
AppEngineConnection connection;
@Override
public void setConnection(AppEngineConnection connection) {
this.connection = connection;
}
String getDataflowJobStatus(String jobId) {
try {
return dataflow
.projects()
.locations()
.jobs()
.get(projectId, jobRegion, jobId)
.execute()
.getCurrentState();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
void launchPipelineAndWaitUntilFinish(
String pipelineName, DatabaseSnapshot snapshot, String latestCommitTimestamp) {
Job pipelineJob =
launchComparisonPipeline(pipelineName, snapshot.getSnapshotId(), latestCommitTimestamp)
.getJob();
String jobId = pipelineJob.getId();
System.out.printf("Launched comparison pipeline %s (%s).\n", pipelineJob.getName(), jobId);
while (DATAFLOW_JOB_RUNNING_STATES.contains(getDataflowJobStatus(jobId))) {
sleeper.sleepInterruptibly(JOB_POLLING_INTERVAL);
}
System.out.printf(
"Pipeline ended with %s state. Please check counters for results.\n",
getDataflowJobStatus(jobId));
}
String getContainerSpecGcsPath() {
return String.format(
"%s/%s_metadata.json", stagingBucketUrl.replace("live", release), PIPELINE_NAME);
}
String getManualLaunchCommand(
String jobName, String snapshotId, String latestCommitLogTimestamp) {
String baseCommand =
String.format(
MANUAL_PIPELINE_LAUNCH_COMMAND_TEMPLATE,
jobName,
getContainerSpecGcsPath(),
projectId,
jobRegion,
RegistryToolEnvironment.get().name(),
snapshotId,
latestCommitLogTimestamp);
if (comparisonStartTimestamp == null) {
return baseCommand;
}
return baseCommand + "--parameters comparisonStartTimestamp=" + comparisonStartTimestamp;
}
LaunchFlexTemplateResponse launchComparisonPipeline(
String jobName, String sqlSnapshotId, String latestCommitLogTimestamp) {
try {
// Hardcode machine type and initial workers to force a quick start.
ImmutableMap.Builder<String, String> paramsBuilder =
new ImmutableMap.Builder()
.put("workerMachineType", "n2-standard-8")
.put("numWorkers", "8")
.put("sqlSnapshotId", sqlSnapshotId)
.put("latestCommitLogTimestamp", latestCommitLogTimestamp)
.put("registryEnvironment", RegistryToolEnvironment.get().name());
if (comparisonStartTimestamp != null) {
paramsBuilder.put("comparisonStartTimestamp", comparisonStartTimestamp.toString());
}
LaunchFlexTemplateParameter parameter =
new LaunchFlexTemplateParameter()
.setJobName(createJobName(Ascii.toLowerCase(jobName).replace('_', '-'), clock))
.setContainerSpecGcsPath(getContainerSpecGcsPath())
.setParameters(paramsBuilder.build());
return dataflow
.projects()
.locations()
.flexTemplates()
.launch(
projectId, jobRegion, new LaunchFlexTemplateRequest().setLaunchParameter(parameter))
.execute();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* A fake implementation of {@link RequestStatusChecker} for managing SQL-backed locks from
* non-AppEngine platforms. This is only required until the Nomulus server is migrated off
* AppEngine.
*/
static class FakeRequestStatusChecker implements RequestStatusChecker {
private final String logId =
ValidateDatastoreCommand.class.getSimpleName() + "-" + UUID.randomUUID();
@Override
public String getLogId() {
return logId;
}
@Override
public boolean isRunning(String requestLogId) {
return false;
}
}
}

View file

@ -0,0 +1,105 @@
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.tools;
import static google.registry.model.replay.ReplicateToDatastoreAction.REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH;
import static google.registry.model.replay.ReplicateToDatastoreAction.REPLICATE_TO_DATASTORE_LOCK_NAME;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.beust.jcommander.Parameters;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.MediaType;
import google.registry.backup.SyncDatastoreToSqlSnapshotAction;
import google.registry.beam.common.DatabaseSnapshot;
import google.registry.model.common.DatabaseMigrationStateSchedule;
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection;
import google.registry.model.server.Lock;
import google.registry.request.Action.Service;
import java.util.Optional;
/**
* Validates asynchronously replicated data from the primary Cloud SQL database to Datastore.
*
* <p>This command suspends the replication process (by acquiring the replication lock), take a
* snapshot of the Cloud SQL database, invokes a Nomulus server action to sync Datastore to this
* snapshot (See {@link SyncDatastoreToSqlSnapshotAction} for details), and finally launches a BEAM
* pipeline to compare Datastore with the given SQL snapshot.
*
* <p>This command does not lock up the SQL database. Normal processing can proceed.
*/
@Parameters(commandDescription = "Validates Datastore with the primary Cloud SQL database.")
public class ValidateDatastoreCommand extends ValidateDatabaseMigrationCommand {
private static final Service NOMULUS_SERVICE = Service.BACKEND;
@Override
public void run() throws Exception {
MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc());
if (!state.getReplayDirection().equals(ReplayDirection.SQL_TO_DATASTORE)) {
throw new IllegalStateException("Cannot validate Datastore in migration step " + state);
}
Optional<Lock> lock =
Lock.acquireSql(
REPLICATE_TO_DATASTORE_LOCK_NAME,
null,
REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH,
new FakeRequestStatusChecker(),
false);
if (!lock.isPresent()) {
throw new IllegalStateException("Cannot acquire the async propagation lock.");
}
try {
try (DatabaseSnapshot snapshot = DatabaseSnapshot.createSnapshot()) {
System.out.printf("Obtained snapshot %s\n", snapshot.getSnapshotId());
AppEngineConnection connectionToService = connection.withService(NOMULUS_SERVICE);
String response =
connectionToService.sendPostRequest(
getNomulusEndpoint(snapshot.getSnapshotId()),
ImmutableMap.<String, String>of(),
MediaType.PLAIN_TEXT_UTF_8,
"".getBytes(UTF_8));
System.out.println(response);
lock.ifPresent(Lock::releaseSql);
lock = Optional.empty();
// See SyncDatastoreToSqlSnapshotAction for response format.
String latestCommitTimestamp =
response.substring(response.lastIndexOf('(') + 1, response.lastIndexOf(')'));
if (manualLaunchPipeline) {
System.out.printf(
"To launch the pipeline manually, use the following command:\n%s\n",
getManualLaunchCommand(
"validate-datastore", snapshot.getSnapshotId(), latestCommitTimestamp));
System.out.print("\nEnter any key to continue when the pipeline ends:");
System.in.read();
} else {
launchPipelineAndWaitUntilFinish("validate-datastore", snapshot, latestCommitTimestamp);
}
}
} finally {
lock.ifPresent(Lock::releaseSql);
}
}
private static String getNomulusEndpoint(String sqlSnapshotId) {
return String.format(
"%s?sqlSnapshotId=%s", SyncDatastoreToSqlSnapshotAction.PATH, sqlSnapshotId);
}
}

View file

@ -1,229 +0,0 @@
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.tools;
import static google.registry.beam.BeamUtils.createJobName;
import static google.registry.model.replay.ReplicateToDatastoreAction.REPLICATE_TO_DATASTORE_LOCK_NAME;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.MediaType;
import google.registry.backup.SyncDatastoreToSqlSnapshotAction;
import google.registry.beam.common.DatabaseSnapshot;
import google.registry.config.RegistryConfig.Config;
import google.registry.model.common.DatabaseMigrationStateSchedule;
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection;
import google.registry.model.replay.ReplicateToDatastoreAction;
import google.registry.model.server.Lock;
import google.registry.request.Action.Service;
import google.registry.util.Clock;
import google.registry.util.RequestStatusChecker;
import google.registry.util.Sleeper;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import javax.inject.Inject;
import org.joda.time.Duration;
/**
* Validates asynchronously replicated data from the primary Cloud SQL database to Datastore.
*
* <p>This command suspends the replication process (by acquiring the replication lock), take a
* snapshot of the Cloud SQL database, invokes a Nomulus server action to sync Datastore to this
* snapshot (See {@link SyncDatastoreToSqlSnapshotAction} for details), and finally launches a BEAM
* pipeline to compare Datastore with the given SQL snapshot.
*
* <p>This command does not lock up the SQL database. Normal processing can proceed.
*/
@Parameters(commandDescription = "Validates Datastore with Cloud SQL.")
public class ValidateDatastoreWithSqlCommand
implements CommandWithConnection, CommandWithRemoteApi {
private static final Service NOMULUS_SERVICE = Service.BACKEND;
private static final String PIPELINE_NAME = "validate_datastore_pipeline";
// States indicating a job is not finished yet.
private static final ImmutableSet<String> DATAFLOW_JOB_RUNNING_STATES =
ImmutableSet.of(
"JOB_STATE_RUNNING", "JOB_STATE_STOPPED", "JOB_STATE_PENDING", "JOB_STATE_QUEUED");
private static final Duration JOB_POLLING_INTERVAL = Duration.standardSeconds(60);
@Parameter(
names = {"-m", "--manual"},
description =
"If true, let user launch the comparison pipeline manually out of band. "
+ "Command will wait for user key-press to exit after syncing Datastore.")
boolean manualLaunchPipeline;
@Inject Clock clock;
@Inject Dataflow dataflow;
@Inject
@Config("defaultJobRegion")
String jobRegion;
@Inject
@Config("beamStagingBucketUrl")
String stagingBucketUrl;
@Inject
@Config("projectId")
String projectId;
@Inject Sleeper sleeper;
private AppEngineConnection connection;
@Override
public void setConnection(AppEngineConnection connection) {
this.connection = connection;
}
@Override
public void run() throws Exception {
MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc());
if (!state.getReplayDirection().equals(ReplayDirection.SQL_TO_DATASTORE)) {
throw new IllegalStateException("Cannot sync Datastore to SQL in migration step " + state);
}
Optional<Lock> lock =
Lock.acquireSql(
REPLICATE_TO_DATASTORE_LOCK_NAME,
null,
ReplicateToDatastoreAction.REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH,
new FakeRequestStatusChecker(),
false);
if (!lock.isPresent()) {
throw new IllegalStateException("Cannot acquire the async propagation lock.");
}
try {
try (DatabaseSnapshot snapshot = DatabaseSnapshot.createSnapshot()) {
System.out.printf("Obtained snapshot %s\n", snapshot.getSnapshotId());
AppEngineConnection connectionToService = connection.withService(NOMULUS_SERVICE);
String response =
connectionToService.sendPostRequest(
getNomulusEndpoint(snapshot.getSnapshotId()),
ImmutableMap.<String, String>of(),
MediaType.PLAIN_TEXT_UTF_8,
"".getBytes(UTF_8));
System.out.println(response);
lock.ifPresent(Lock::releaseSql);
lock = Optional.empty();
// See SyncDatastoreToSqlSnapshotAction for response format.
String latestCommitTimestamp =
response.substring(response.lastIndexOf('(') + 1, response.lastIndexOf(')'));
if (manualLaunchPipeline) {
System.out.print("\nEnter any key to continue when the pipeline ends:");
System.in.read();
} else {
Job pipelineJob =
launchComparisonPipeline(snapshot.getSnapshotId(), latestCommitTimestamp).getJob();
String jobId = pipelineJob.getId();
System.out.printf(
"Launched comparison pipeline %s (%s).\n", pipelineJob.getName(), jobId);
while (DATAFLOW_JOB_RUNNING_STATES.contains(getDataflowJobStatus(jobId))) {
sleeper.sleepInterruptibly(JOB_POLLING_INTERVAL);
}
System.out.printf(
"Pipeline ended with %s state. Please check counters for results.\n",
getDataflowJobStatus(jobId));
}
}
} finally {
lock.ifPresent(Lock::releaseSql);
}
}
private static String getNomulusEndpoint(String sqlSnapshotId) {
return String.format(
"%s?sqlSnapshotId=%s", SyncDatastoreToSqlSnapshotAction.PATH, sqlSnapshotId);
}
private LaunchFlexTemplateResponse launchComparisonPipeline(
String sqlSnapshotId, String latestCommitLogTimestamp) {
try {
LaunchFlexTemplateParameter parameter =
new LaunchFlexTemplateParameter()
.setJobName(createJobName("validate-datastore", clock))
.setContainerSpecGcsPath(
String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME))
.setParameters(
ImmutableMap.of(
"sqlSnapshotId",
sqlSnapshotId,
"latestCommitLogTimestamp",
latestCommitLogTimestamp,
"registryEnvironment",
RegistryToolEnvironment.get().name()));
return dataflow
.projects()
.locations()
.flexTemplates()
.launch(
projectId, jobRegion, new LaunchFlexTemplateRequest().setLaunchParameter(parameter))
.execute();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private String getDataflowJobStatus(String jobId) {
try {
return dataflow
.projects()
.locations()
.jobs()
.get(projectId, jobRegion, jobId)
.execute()
.getCurrentState();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* A fake implementation of {@link RequestStatusChecker} for managing SQL-backed locks from
* non-AppEngine platforms. This is only required until the Nomulus server is migrated off
* AppEngine.
*/
static class FakeRequestStatusChecker implements RequestStatusChecker {
@Override
public String getLogId() {
return ValidateDatastoreWithSqlCommand.class.getSimpleName() + "-" + UUID.randomUUID();
}
@Override
public boolean isRunning(String requestLogId) {
return false;
}
}
}

View file

@ -0,0 +1,91 @@
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.tools;
import static google.registry.backup.ReplayCommitLogsToSqlAction.REPLAY_TO_SQL_LOCK_LEASE_LENGTH;
import static google.registry.backup.ReplayCommitLogsToSqlAction.REPLAY_TO_SQL_LOCK_NAME;
import com.beust.jcommander.Parameters;
import google.registry.beam.common.DatabaseSnapshot;
import google.registry.model.common.DatabaseMigrationStateSchedule;
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection;
import google.registry.model.replay.SqlReplayCheckpoint;
import google.registry.model.server.Lock;
import google.registry.persistence.transaction.TransactionManagerFactory;
import java.util.Optional;
import org.joda.time.DateTime;
/**
* Validates asynchronously replicated data from the primary Datastore to Cloud SQL.
*
* <p>This command suspends the replication process (by acquiring the replication lock), take a
* snapshot of the Cloud SQL database, finds the corresponding Datastore snapshot, and finally
* launches a BEAM pipeline to compare the two snapshots.
*
* <p>This command does not lock up either database. Normal processing can proceed.
*/
@Parameters(commandDescription = "Validates Cloud SQL with the primary Datastore.")
public class ValidateSqlCommand extends ValidateDatabaseMigrationCommand {
@Override
public void run() throws Exception {
MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc());
if (!state.getReplayDirection().equals(ReplayDirection.DATASTORE_TO_SQL)) {
throw new IllegalStateException("Cannot validate SQL in migration step " + state);
}
Optional<Lock> lock =
Lock.acquireSql(
REPLAY_TO_SQL_LOCK_NAME,
null,
REPLAY_TO_SQL_LOCK_LEASE_LENGTH,
new FakeRequestStatusChecker(),
false);
if (!lock.isPresent()) {
throw new IllegalStateException("Cannot acquire the async propagation lock.");
}
try {
DateTime latestCommitLogTime =
TransactionManagerFactory.jpaTm().transact(() -> SqlReplayCheckpoint.get());
try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {
// Eagerly release the commitlog replay lock so that replay can resume.
lock.ifPresent(Lock::releaseSql);
lock = Optional.empty();
System.out.printf(
"Start comparison with SQL snapshot (%s) and CommitLog timestamp (%s).\n",
databaseSnapshot.getSnapshotId(), latestCommitLogTime);
if (manualLaunchPipeline) {
System.out.printf(
"To launch the pipeline manually, use the following command:\n%s\n",
getManualLaunchCommand(
"validate-sql",
databaseSnapshot.getSnapshotId(),
latestCommitLogTime.toString()));
System.out.print("\nEnter any key to continue when the pipeline ends:");
System.in.read();
} else {
launchPipelineAndWaitUntilFinish(
"validate-sql", databaseSnapshot, latestCommitLogTime.toString());
}
}
} finally {
lock.ifPresent(Lock::releaseSql);
}
}
}

View file

@ -1,6 +1,6 @@
{
"name": "Validate Datastore with Cloud SQL",
"description": "An Apache Beam batch pipeline that compares Datastore with the primary Cloud SQL database.",
"name": "Validate Datastore and Cloud SQL",
"description": "An Apache Beam batch pipeline that compares the data in Datastore and Cloud SQL database.",
"parameters": [
{
"name": "registryEnvironment",

View file

@ -1,21 +0,0 @@
{
"name": "Validate Cloud SQL with Datastore being primary",
"description": "An Apache Beam batch pipeline that compares Cloud SQL with the primary Datastore.",
"parameters": [
{
"name": "registryEnvironment",
"label": "The Registry environment.",
"helpText": "The Registry environment.",
"is_optional": false,
"regexes": [
"^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$"
]
},
{
"name": "comparisonStartTimestamp",
"label": "Only entities updated at or after this time are included for validation.",
"helpText": "The earliest entity update time allowed for inclusion in validation, in ISO8601 format.",
"is_optional": true
}
]
}

View file

@ -97,10 +97,8 @@ steps:
google/registry/beam/invoicing_pipeline_metadata.json \
google.registry.beam.rde.RdePipeline \
google/registry/beam/rde_pipeline_metadata.json \
google.registry.beam.comparedb.ValidateDatastorePipeline \
google/registry/beam/validate_datastore_pipeline_metadata.json \
google.registry.beam.comparedb.ValidateSqlPipeline \
google/registry/beam/validate_sql_pipeline_metadata.json
google.registry.beam.comparedb.ValidateDatabasePipeline \
google/registry/beam/validate_database_pipeline_metadata.json
# Tentatively build and publish Cloud SQL schema jar here, before schema release
# process is finalized. Also publish nomulus:core jars that are needed for
# server/schema compatibility tests.