mirror of
https://github.com/google/nomulus.git
synced 2025-04-30 03:57:51 +02:00
Compare migration data with SQL as primary DB (#1497)
* Compare migration data with SQL as primary DB Add a BEAM pipeline that compares the secondary Datastore against SQL. This is a dumb pipeline to be launched by a driver (in a followup PR). Manually tested pipeline in sandbox. Also updated the ValidateSqlPipeline and the snapshot finder class so that an appropriate Datastore export is found (one that ends before the replay checkpoint value).
This commit is contained in:
parent
4892b03ffb
commit
4fdfb3ea04
5 changed files with 180 additions and 28 deletions
|
@ -708,6 +708,9 @@ createToolTask(
|
||||||
createToolTask(
|
createToolTask(
|
||||||
'validateSqlPipeline', 'google.registry.beam.comparedb.ValidateSqlPipeline')
|
'validateSqlPipeline', 'google.registry.beam.comparedb.ValidateSqlPipeline')
|
||||||
|
|
||||||
|
createToolTask(
|
||||||
|
'validateDatastorePipeline', 'google.registry.beam.comparedb.ValidateDatastorePipeline')
|
||||||
|
|
||||||
|
|
||||||
createToolTask(
|
createToolTask(
|
||||||
'jpaDemoPipeline', 'google.registry.beam.common.JpaDemoPipeline')
|
'jpaDemoPipeline', 'google.registry.beam.common.JpaDemoPipeline')
|
||||||
|
|
|
@ -53,11 +53,11 @@ public class LatestDatastoreSnapshotFinder {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finds information of the most recent Datastore snapshot, including the GCS folder of the
|
* Finds information of the most recent Datastore snapshot that ends strictly before {@code
|
||||||
* exported data files and the start and stop times of the export. The folder of the CommitLogs is
|
* exportEndTimeUpperBound}, including the GCS folder of the exported data files and the start and
|
||||||
* also included in the return.
|
* stop times of the export. The folder of the CommitLogs is also included in the return.
|
||||||
*/
|
*/
|
||||||
public DatastoreSnapshotInfo getSnapshotInfo() {
|
public DatastoreSnapshotInfo getSnapshotInfo(Instant exportEndTimeUpperBound) {
|
||||||
String bucketName = RegistryConfig.getDatastoreBackupsBucket().substring("gs://".length());
|
String bucketName = RegistryConfig.getDatastoreBackupsBucket().substring("gs://".length());
|
||||||
/**
|
/**
|
||||||
* Find the bucket-relative path to the overall metadata file of the last Datastore export.
|
* Find the bucket-relative path to the overall metadata file of the last Datastore export.
|
||||||
|
@ -65,7 +65,8 @@ public class LatestDatastoreSnapshotFinder {
|
||||||
* return value is like
|
* return value is like
|
||||||
* "2021-11-19T06:00:00_76493/2021-11-19T06:00:00_76493.overall_export_metadata".
|
* "2021-11-19T06:00:00_76493/2021-11-19T06:00:00_76493.overall_export_metadata".
|
||||||
*/
|
*/
|
||||||
Optional<String> metaFilePathOptional = findMostRecentExportMetadataFile(bucketName, 2);
|
Optional<String> metaFilePathOptional =
|
||||||
|
findNewestExportMetadataFileBeforeTime(bucketName, exportEndTimeUpperBound, 2);
|
||||||
if (!metaFilePathOptional.isPresent()) {
|
if (!metaFilePathOptional.isPresent()) {
|
||||||
throw new NoSuchElementException("No exports found over the past 2 days.");
|
throw new NoSuchElementException("No exports found over the past 2 days.");
|
||||||
}
|
}
|
||||||
|
@ -85,8 +86,9 @@ public class LatestDatastoreSnapshotFinder {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finds the bucket-relative path of the overall export metadata file, in the given bucket,
|
* Finds the latest Datastore export that ends strictly before {@code endTimeUpperBound} and
|
||||||
* searching back up to {@code lookBackDays} days, including today.
|
* returns the bucket-relative path of the overall export metadata file, in the given bucket. The
|
||||||
|
* search goes back for up to {@code lookBackDays} days in time, including today.
|
||||||
*
|
*
|
||||||
* <p>The overall export metadata file is the last file created during a Datastore export. All
|
* <p>The overall export metadata file is the last file created during a Datastore export. All
|
||||||
* data has been exported by the creation time of this file. The name of this file, like that of
|
* data has been exported by the creation time of this file. The name of this file, like that of
|
||||||
|
@ -95,7 +97,8 @@ public class LatestDatastoreSnapshotFinder {
|
||||||
* <p>An example return value: {@code
|
* <p>An example return value: {@code
|
||||||
* 2021-11-19T06:00:00_76493/2021-11-19T06:00:00_76493.overall_export_metadata}.
|
* 2021-11-19T06:00:00_76493/2021-11-19T06:00:00_76493.overall_export_metadata}.
|
||||||
*/
|
*/
|
||||||
private Optional<String> findMostRecentExportMetadataFile(String bucketName, int lookBackDays) {
|
private Optional<String> findNewestExportMetadataFileBeforeTime(
|
||||||
|
String bucketName, Instant endTimeUpperBound, int lookBackDays) {
|
||||||
DateTime today = clock.nowUtc();
|
DateTime today = clock.nowUtc();
|
||||||
for (int day = 0; day < lookBackDays; day++) {
|
for (int day = 0; day < lookBackDays; day++) {
|
||||||
String dateString = today.minusDays(day).toString("yyyy-MM-dd");
|
String dateString = today.minusDays(day).toString("yyyy-MM-dd");
|
||||||
|
@ -107,8 +110,12 @@ public class LatestDatastoreSnapshotFinder {
|
||||||
.sorted(Comparator.<String>naturalOrder().reversed())
|
.sorted(Comparator.<String>naturalOrder().reversed())
|
||||||
.findFirst();
|
.findFirst();
|
||||||
if (metaFilePath.isPresent()) {
|
if (metaFilePath.isPresent()) {
|
||||||
|
BlobInfo blobInfo = gcsUtils.getBlobInfo(BlobId.of(bucketName, metaFilePath.get()));
|
||||||
|
Instant exportEndTime = new Instant(blobInfo.getCreateTime());
|
||||||
|
if (exportEndTime.isBefore(endTimeUpperBound)) {
|
||||||
return metaFilePath;
|
return metaFilePath;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
throw new RuntimeException(ioe);
|
throw new RuntimeException(ioe);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,83 @@
|
||||||
|
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package google.registry.beam.comparedb;
|
||||||
|
|
||||||
|
import google.registry.beam.common.RegistryPipelineOptions;
|
||||||
|
import google.registry.beam.common.RegistryPipelineWorkerInitializer;
|
||||||
|
import google.registry.beam.comparedb.LatestDatastoreSnapshotFinder.DatastoreSnapshotInfo;
|
||||||
|
import google.registry.model.annotations.DeleteAfterMigration;
|
||||||
|
import google.registry.persistence.PersistenceModule.JpaTransactionManagerType;
|
||||||
|
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
|
||||||
|
import java.util.Optional;
|
||||||
|
import org.apache.beam.sdk.Pipeline;
|
||||||
|
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates the asynchronous data replication process from Cloud SQL (primary) to Datastore
|
||||||
|
* (secondary).
|
||||||
|
*
|
||||||
|
* <p>This pipeline simply compares the snapshots provided by an invoker, which is responsible for
|
||||||
|
* obtaining two consistent snapshots for the same point of time.
|
||||||
|
*/
|
||||||
|
// TODO(weiminyu): Implement the invoker action in a followup PR.
|
||||||
|
@DeleteAfterMigration
|
||||||
|
public class ValidateDatastorePipeline {
|
||||||
|
|
||||||
|
private final ValidateDatastorePipelineOptions options;
|
||||||
|
private final LatestDatastoreSnapshotFinder datastoreSnapshotFinder;
|
||||||
|
|
||||||
|
public ValidateDatastorePipeline(
|
||||||
|
ValidateDatastorePipelineOptions options,
|
||||||
|
LatestDatastoreSnapshotFinder datastoreSnapshotFinder) {
|
||||||
|
this.options = options;
|
||||||
|
this.datastoreSnapshotFinder = datastoreSnapshotFinder;
|
||||||
|
}
|
||||||
|
|
||||||
|
void run(Pipeline pipeline) {
|
||||||
|
DateTime latestCommitLogTime = DateTime.parse(options.getLatestCommitLogTimestamp());
|
||||||
|
DatastoreSnapshotInfo mostRecentExport =
|
||||||
|
datastoreSnapshotFinder.getSnapshotInfo(latestCommitLogTime.toInstant());
|
||||||
|
|
||||||
|
ValidateSqlPipeline.setupPipeline(
|
||||||
|
pipeline,
|
||||||
|
Optional.ofNullable(options.getSqlSnapshotId()),
|
||||||
|
mostRecentExport,
|
||||||
|
latestCommitLogTime,
|
||||||
|
Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse));
|
||||||
|
|
||||||
|
pipeline.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
ValidateDatastorePipelineOptions options =
|
||||||
|
PipelineOptionsFactory.fromArgs(args)
|
||||||
|
.withValidation()
|
||||||
|
.as(ValidateDatastorePipelineOptions.class);
|
||||||
|
RegistryPipelineOptions.validateRegistryPipelineOptions(options);
|
||||||
|
|
||||||
|
// Defensively set important options.
|
||||||
|
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_REPEATABLE_READ);
|
||||||
|
options.setJpaTransactionManagerType(JpaTransactionManagerType.BULK_QUERY);
|
||||||
|
|
||||||
|
// Reuse Dataflow worker initialization code to set up JPA in the pipeline harness.
|
||||||
|
new RegistryPipelineWorkerInitializer().beforeProcessing(options);
|
||||||
|
|
||||||
|
LatestDatastoreSnapshotFinder datastoreSnapshotFinder =
|
||||||
|
DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create()
|
||||||
|
.datastoreSnapshotInfoFinder();
|
||||||
|
new ValidateDatastorePipeline(options, datastoreSnapshotFinder).run(Pipeline.create(options));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package google.registry.beam.comparedb;
|
||||||
|
|
||||||
|
import google.registry.model.annotations.DeleteAfterMigration;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import org.apache.beam.sdk.options.Description;
|
||||||
|
import org.apache.beam.sdk.options.Validation;
|
||||||
|
|
||||||
|
/** BEAM pipeline options for {@link ValidateDatastorePipelineOptions}. */
|
||||||
|
@DeleteAfterMigration
|
||||||
|
public interface ValidateDatastorePipelineOptions extends ValidateSqlPipelineOptions {
|
||||||
|
|
||||||
|
@Description(
|
||||||
|
"The id of the SQL snapshot to be compared with Datastore. "
|
||||||
|
+ "If null, the current state of the SQL database is used.")
|
||||||
|
@Nullable
|
||||||
|
String getSqlSnapshotId();
|
||||||
|
|
||||||
|
void setSqlSnapshotId(String snapshotId);
|
||||||
|
|
||||||
|
@Description("The latest CommitLogs to load, in ISO8601 format.")
|
||||||
|
@Validation.Required
|
||||||
|
String getLatestCommitLogTimestamp();
|
||||||
|
|
||||||
|
void setLatestCommitLogTimestamp(String commitLogEndTimestamp);
|
||||||
|
}
|
|
@ -26,6 +26,9 @@ import google.registry.beam.common.RegistryPipelineWorkerInitializer;
|
||||||
import google.registry.beam.comparedb.LatestDatastoreSnapshotFinder.DatastoreSnapshotInfo;
|
import google.registry.beam.comparedb.LatestDatastoreSnapshotFinder.DatastoreSnapshotInfo;
|
||||||
import google.registry.beam.comparedb.ValidateSqlUtils.CompareSqlEntity;
|
import google.registry.beam.comparedb.ValidateSqlUtils.CompareSqlEntity;
|
||||||
import google.registry.model.annotations.DeleteAfterMigration;
|
import google.registry.model.annotations.DeleteAfterMigration;
|
||||||
|
import google.registry.model.common.DatabaseMigrationStateSchedule;
|
||||||
|
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
|
||||||
|
import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection;
|
||||||
import google.registry.model.domain.DomainBase;
|
import google.registry.model.domain.DomainBase;
|
||||||
import google.registry.model.domain.DomainHistory;
|
import google.registry.model.domain.DomainHistory;
|
||||||
import google.registry.model.replay.SqlEntity;
|
import google.registry.model.replay.SqlEntity;
|
||||||
|
@ -35,6 +38,7 @@ import google.registry.persistence.PersistenceModule.JpaTransactionManagerType;
|
||||||
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
|
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
|
||||||
import google.registry.persistence.transaction.TransactionManagerFactory;
|
import google.registry.persistence.transaction.TransactionManagerFactory;
|
||||||
import google.registry.util.RequestStatusChecker;
|
import google.registry.util.RequestStatusChecker;
|
||||||
|
import google.registry.util.SystemClock;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import org.apache.beam.sdk.Pipeline;
|
import org.apache.beam.sdk.Pipeline;
|
||||||
|
@ -76,21 +80,16 @@ public class ValidateSqlPipeline {
|
||||||
java.time.Duration.ofSeconds(30);
|
java.time.Duration.ofSeconds(30);
|
||||||
|
|
||||||
private final ValidateSqlPipelineOptions options;
|
private final ValidateSqlPipelineOptions options;
|
||||||
private final DatastoreSnapshotInfo mostRecentExport;
|
private final LatestDatastoreSnapshotFinder datastoreSnapshotFinder;
|
||||||
|
|
||||||
public ValidateSqlPipeline(
|
public ValidateSqlPipeline(
|
||||||
ValidateSqlPipelineOptions options, DatastoreSnapshotInfo mostRecentExport) {
|
ValidateSqlPipelineOptions options, LatestDatastoreSnapshotFinder datastoreSnapshotFinder) {
|
||||||
this.options = options;
|
this.options = options;
|
||||||
this.mostRecentExport = mostRecentExport;
|
this.datastoreSnapshotFinder = datastoreSnapshotFinder;
|
||||||
}
|
|
||||||
|
|
||||||
void run() {
|
|
||||||
run(Pipeline.create(options));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void run(Pipeline pipeline) {
|
void run(Pipeline pipeline) {
|
||||||
// TODO(weiminyu): ensure migration stage is DATASTORE_PRIMARY or DATASTORE_PRIMARY_READ_ONLY
|
|
||||||
Optional<Lock> lock = acquireCommitLogReplayLock();
|
Optional<Lock> lock = acquireCommitLogReplayLock();
|
||||||
if (lock.isPresent()) {
|
if (lock.isPresent()) {
|
||||||
logger.atInfo().log("Acquired CommitLog Replay lock.");
|
logger.atInfo().log("Acquired CommitLog Replay lock.");
|
||||||
|
@ -101,6 +100,8 @@ public class ValidateSqlPipeline {
|
||||||
try {
|
try {
|
||||||
DateTime latestCommitLogTime =
|
DateTime latestCommitLogTime =
|
||||||
TransactionManagerFactory.jpaTm().transact(() -> SqlReplayCheckpoint.get());
|
TransactionManagerFactory.jpaTm().transact(() -> SqlReplayCheckpoint.get());
|
||||||
|
DatastoreSnapshotInfo mostRecentExport =
|
||||||
|
datastoreSnapshotFinder.getSnapshotInfo(latestCommitLogTime.toInstant());
|
||||||
Preconditions.checkState(
|
Preconditions.checkState(
|
||||||
latestCommitLogTime.isAfter(mostRecentExport.exportInterval().getEnd()),
|
latestCommitLogTime.isAfter(mostRecentExport.exportInterval().getEnd()),
|
||||||
"Cannot recreate Datastore snapshot since target time is in the middle of an export.");
|
"Cannot recreate Datastore snapshot since target time is in the middle of an export.");
|
||||||
|
@ -109,7 +110,16 @@ public class ValidateSqlPipeline {
|
||||||
lock.ifPresent(Lock::releaseSql);
|
lock.ifPresent(Lock::releaseSql);
|
||||||
lock = Optional.empty();
|
lock = Optional.empty();
|
||||||
|
|
||||||
setupPipeline(pipeline, Optional.of(databaseSnapshot.getSnapshotId()), latestCommitLogTime);
|
logger.atInfo().log(
|
||||||
|
"Starting comparison with export at %s and latestCommitLogTime at %s",
|
||||||
|
mostRecentExport.exportDir(), latestCommitLogTime);
|
||||||
|
|
||||||
|
setupPipeline(
|
||||||
|
pipeline,
|
||||||
|
Optional.of(databaseSnapshot.getSnapshotId()),
|
||||||
|
mostRecentExport,
|
||||||
|
latestCommitLogTime,
|
||||||
|
Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse));
|
||||||
State state = pipeline.run().waitUntilFinish();
|
State state = pipeline.run().waitUntilFinish();
|
||||||
if (!State.DONE.equals(state)) {
|
if (!State.DONE.equals(state)) {
|
||||||
throw new IllegalStateException("Unexpected pipeline state: " + state);
|
throw new IllegalStateException("Unexpected pipeline state: " + state);
|
||||||
|
@ -120,15 +130,16 @@ public class ValidateSqlPipeline {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void setupPipeline(
|
static void setupPipeline(
|
||||||
Pipeline pipeline, Optional<String> sqlSnapshotId, DateTime latestCommitLogTime) {
|
Pipeline pipeline,
|
||||||
|
Optional<String> sqlSnapshotId,
|
||||||
|
DatastoreSnapshotInfo mostRecentExport,
|
||||||
|
DateTime latestCommitLogTime,
|
||||||
|
Optional<DateTime> compareStartTime) {
|
||||||
pipeline
|
pipeline
|
||||||
.getCoderRegistry()
|
.getCoderRegistry()
|
||||||
.registerCoderForClass(SqlEntity.class, SerializableCoder.of(Serializable.class));
|
.registerCoderForClass(SqlEntity.class, SerializableCoder.of(Serializable.class));
|
||||||
|
|
||||||
Optional<DateTime> compareStartTime =
|
|
||||||
Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse);
|
|
||||||
|
|
||||||
PCollectionTuple datastoreSnapshot =
|
PCollectionTuple datastoreSnapshot =
|
||||||
DatastoreSnapshots.loadDatastoreSnapshotByKind(
|
DatastoreSnapshots.loadDatastoreSnapshotByKind(
|
||||||
pipeline,
|
pipeline,
|
||||||
|
@ -216,6 +227,10 @@ public class ValidateSqlPipeline {
|
||||||
return "ValidateSqlPipeline";
|
return "ValidateSqlPipeline";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LatestDatastoreSnapshotFinder datastoreSnapshotFinder =
|
||||||
|
DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create()
|
||||||
|
.datastoreSnapshotInfoFinder();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isRunning(String requestLogId) {
|
public boolean isRunning(String requestLogId) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -234,11 +249,16 @@ public class ValidateSqlPipeline {
|
||||||
// Reuse Dataflow worker initialization code to set up JPA in the pipeline harness.
|
// Reuse Dataflow worker initialization code to set up JPA in the pipeline harness.
|
||||||
new RegistryPipelineWorkerInitializer().beforeProcessing(options);
|
new RegistryPipelineWorkerInitializer().beforeProcessing(options);
|
||||||
|
|
||||||
DatastoreSnapshotInfo mostRecentExport =
|
MigrationState state =
|
||||||
DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create()
|
DatabaseMigrationStateSchedule.getValueAtTime(new SystemClock().nowUtc());
|
||||||
.datastoreSnapshotInfoFinder()
|
if (!state.getReplayDirection().equals(ReplayDirection.DATASTORE_TO_SQL)) {
|
||||||
.getSnapshotInfo();
|
throw new IllegalStateException("This pipeline is not designed for migration phase " + state);
|
||||||
|
}
|
||||||
|
|
||||||
new ValidateSqlPipeline(options, mostRecentExport).run(Pipeline.create(options));
|
LatestDatastoreSnapshotFinder datastoreSnapshotFinder =
|
||||||
|
DaggerLatestDatastoreSnapshotFinder_LatestDatastoreSnapshotFinderFinderComponent.create()
|
||||||
|
.datastoreSnapshotInfoFinder();
|
||||||
|
|
||||||
|
new ValidateSqlPipeline(options, datastoreSnapshotFinder).run(Pipeline.create(options));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue