diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java
index 636c1b243..d69635942 100644
--- a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java
+++ b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlPipeline.java
@@ -19,6 +19,8 @@ import static org.apache.beam.sdk.values.TypeDescriptors.strings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.flogger.FluentLogger;
import google.registry.beam.common.DatabaseSnapshot;
import google.registry.beam.common.RegistryPipelineWorkerInitializer;
import google.registry.beam.comparedb.LatestDatastoreSnapshotFinder.DatastoreSnapshotInfo;
@@ -27,9 +29,11 @@ import google.registry.model.domain.DomainBase;
import google.registry.model.domain.DomainHistory;
import google.registry.model.replay.SqlEntity;
import google.registry.model.replay.SqlReplayCheckpoint;
+import google.registry.model.server.Lock;
import google.registry.persistence.PersistenceModule.JpaTransactionManagerType;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import google.registry.persistence.transaction.TransactionManagerFactory;
+import google.registry.util.RequestStatusChecker;
import java.io.Serializable;
import java.util.Optional;
import org.apache.beam.sdk.Pipeline;
@@ -44,15 +48,30 @@ import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.DateTime;
+import org.joda.time.Duration;
/**
* Validates the asynchronous data replication process from Datastore (primary storage) to Cloud SQL
* (secondary storage).
*/
public class ValidateSqlPipeline {
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
/** Specifies the extra CommitLogs to load before the start of a Database export. */
- private static final int COMMIT_LOG_MARGIN_MINUTES = 10;
+ private static final Duration COMMITLOG_START_TIME_MARGIN = Duration.standardMinutes(10);
+
+ /**
+ * Name of the lock used by the commitlog replay process.
+ *
+ *
See {@link google.registry.backup.ReplayCommitLogsToSqlAction} for more information.
+ */
+ private static final String COMMITLOG_REPLAY_LOCK_NAME = "ReplayCommitLogsToSqlAction";
+
+ private static final Duration REPLAY_LOCK_LEASE_LENGTH = Duration.standardHours(1);
+ private static final java.time.Duration REPLAY_LOCK_ACQUIRE_TIMEOUT =
+ java.time.Duration.ofMinutes(6);
+ private static final java.time.Duration REPLAY_LOCK_ACQUIRE_DELAY =
+ java.time.Duration.ofSeconds(30);
private final ValidateSqlPipelineOptions options;
private final DatastoreSnapshotInfo mostRecentExport;
@@ -69,18 +88,33 @@ public class ValidateSqlPipeline {
@VisibleForTesting
void run(Pipeline pipeline) {
- // TODO(weiminyu): Acquire the commit log replay lock when the lock release bug is fixed.
- DateTime latestCommitLogTime =
- TransactionManagerFactory.jpaTm().transact(() -> SqlReplayCheckpoint.get());
- Preconditions.checkState(
- latestCommitLogTime.isAfter(mostRecentExport.exportInterval().getEnd()),
- "Cannot recreate Datastore snapshot since target time is in the middle of an export.");
- try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {
- setupPipeline(pipeline, Optional.of(databaseSnapshot.getSnapshotId()), latestCommitLogTime);
- State state = pipeline.run().waitUntilFinish();
- if (!State.DONE.equals(state)) {
- throw new IllegalStateException("Unexpected pipeline state: " + state);
+ // TODO(weiminyu): ensure migration stage is DATASTORE_PRIMARY or DATASTORE_PRIMARY_READ_ONLY
+ Optional lock = acquireCommitLogReplayLock();
+ if (lock.isPresent()) {
+ logger.atInfo().log("Acquired CommitLog Replay lock.");
+ } else {
+ throw new RuntimeException("Failed to acquire CommitLog Replay lock.");
+ }
+
+ try {
+ DateTime latestCommitLogTime =
+ TransactionManagerFactory.jpaTm().transact(() -> SqlReplayCheckpoint.get());
+ Preconditions.checkState(
+ latestCommitLogTime.isAfter(mostRecentExport.exportInterval().getEnd()),
+ "Cannot recreate Datastore snapshot since target time is in the middle of an export.");
+ try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {
+ // Eagerly release the commitlog replay lock so that replay can resume.
+ lock.ifPresent(Lock::releaseSql);
+ lock = Optional.empty();
+
+ setupPipeline(pipeline, Optional.of(databaseSnapshot.getSnapshotId()), latestCommitLogTime);
+ State state = pipeline.run().waitUntilFinish();
+ if (!State.DONE.equals(state)) {
+ throw new IllegalStateException("Unexpected pipeline state: " + state);
+ }
}
+ } finally {
+ lock.ifPresent(Lock::releaseSql);
}
}
@@ -95,7 +129,7 @@ public class ValidateSqlPipeline {
pipeline,
mostRecentExport.exportDir(),
mostRecentExport.commitLogDir(),
- mostRecentExport.exportInterval().getStart().minusMinutes(COMMIT_LOG_MARGIN_MINUTES),
+ mostRecentExport.exportInterval().getStart().minus(COMMITLOG_START_TIME_MARGIN),
// Increase by 1ms since we want to include commitLogs latestCommitLogTime but
// this parameter is exclusive.
latestCommitLogTime.plusMillis(1),
@@ -138,6 +172,51 @@ public class ValidateSqlPipeline {
return sqlEntity.getPrimaryKeyString();
}
+ private static Optional acquireCommitLogReplayLock() {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ while (stopwatch.elapsed().minus(REPLAY_LOCK_ACQUIRE_TIMEOUT).isNegative()) {
+ Optional lock = tryAcquireCommitLogReplayLock();
+ if (lock.isPresent()) {
+ return lock;
+ }
+ logger.atInfo().log("Failed to acquired CommitLog Replay lock. Will retry...");
+ try {
+ Thread.sleep(REPLAY_LOCK_ACQUIRE_DELAY.toMillis());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted.");
+ }
+ }
+ return Optional.empty();
+ }
+
+ private static Optional tryAcquireCommitLogReplayLock() {
+ return Lock.acquireSql(
+ COMMITLOG_REPLAY_LOCK_NAME,
+ null,
+ REPLAY_LOCK_LEASE_LENGTH,
+ getLockingRequestStatusChecker(),
+ false);
+ }
+
+ /**
+ * Returns a fake implementation of {@link RequestStatusChecker} that is required for lock
+ * acquisition. The default implementation is AppEngine-specific and is unusable on GCE.
+ */
+ private static RequestStatusChecker getLockingRequestStatusChecker() {
+ return new RequestStatusChecker() {
+ @Override
+ public String getLogId() {
+ return "ValidateSqlPipeline";
+ }
+
+ @Override
+ public boolean isRunning(String requestLogId) {
+ return true;
+ }
+ };
+ }
+
public static void main(String[] args) {
ValidateSqlPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ValidateSqlPipelineOptions.class);
diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlUtils.java b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlUtils.java
index 2c7ea5619..e7c5e8750 100644
--- a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlUtils.java
+++ b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlUtils.java
@@ -19,9 +19,14 @@ import static google.registry.persistence.transaction.TransactionManagerFactory.
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.FluentLogger;
+import google.registry.beam.initsql.Transforms;
+import google.registry.config.RegistryEnvironment;
+import google.registry.model.BackupGroupRoot;
import google.registry.model.EppResource;
import google.registry.model.ImmutableObject;
+import google.registry.model.billing.BillingEvent;
import google.registry.model.contact.ContactBase;
import google.registry.model.contact.ContactHistory;
import google.registry.model.domain.DomainContent;
@@ -29,6 +34,7 @@ import google.registry.model.domain.DomainHistory;
import google.registry.model.eppcommon.AuthInfo;
import google.registry.model.host.HostHistory;
import google.registry.model.poll.PollMessage;
+import google.registry.model.registrar.Registrar;
import google.registry.model.replay.SqlEntity;
import google.registry.model.reporting.HistoryEntry;
import java.lang.reflect.Field;
@@ -51,6 +57,10 @@ final class ValidateSqlUtils {
private ValidateSqlUtils() {}
+ private static final ImmutableSet PROBER_CELLS = ImmutableSet.of("IQ", "LG", "TL");
+ private static final ImmutableSet PROBER_TYPES =
+ ImmutableSet.of("ANYT", "ANYTES", "CANARY");
+
/**
* Query template for finding the median value of the {@code history_revision_id} column in one of
* the History tables.
@@ -153,6 +163,9 @@ final class ValidateSqlUtils {
totalCounters.get(counterKey).inc();
if (entities.size() == 1) {
+ if (isSpecialCaseProberEntity(entities.get(0))) {
+ return;
+ }
missingCounters.get(counterKey).inc();
// Temporary debugging help. See logDiff() above.
if (!logPrinted) {
@@ -203,6 +216,15 @@ final class ValidateSqlUtils {
*/
static SqlEntity normalizeEppResource(SqlEntity eppResource) {
try {
+ if (isSpecialCaseProberEntity(eppResource)) {
+ // Clearing some timestamps. See isSpecialCaseProberEntity() for reasons.
+ Field lastUpdateTime = BackupGroupRoot.class.getDeclaredField("updateTimestamp");
+ lastUpdateTime.setAccessible(true);
+ lastUpdateTime.set(eppResource, null);
+ Field deletionTime = EppResource.class.getDeclaredField("deletionTime");
+ deletionTime.setAccessible(true);
+ deletionTime.set(eppResource, null);
+ }
Field authField =
eppResource instanceof DomainContent
? DomainContent.class.getDeclaredField("authInfo")
@@ -246,6 +268,7 @@ final class ValidateSqlUtils {
Field domainContent = DomainHistory.class.getDeclaredField("domainContent");
domainContent.setAccessible(true);
domainContent.set(historyEntry, null);
+ // Convert empty domainTransactionRecords to null for comparison.
Field domainTransactionRecords =
HistoryEntry.class.getDeclaredField("domainTransactionRecords");
domainTransactionRecords.setAccessible(true);
@@ -253,6 +276,16 @@ final class ValidateSqlUtils {
if (domainTransactionRecordsValue != null && domainTransactionRecordsValue.isEmpty()) {
domainTransactionRecords.set(historyEntry, null);
}
+ // DomainHistory in Datastore does not have the following properties either:
+ Field nsHosts = DomainHistory.class.getDeclaredField("nsHosts");
+ nsHosts.setAccessible(true);
+ nsHosts.set(historyEntry, null);
+ Field dsDataHistories = DomainHistory.class.getDeclaredField("dsDataHistories");
+ dsDataHistories.setAccessible(true);
+ dsDataHistories.set(historyEntry, null);
+ Field gracePeriodHistories = DomainHistory.class.getDeclaredField("gracePeriodHistories");
+ gracePeriodHistories.setAccessible(true);
+ gracePeriodHistories.set(historyEntry, null);
} else if (historyEntry instanceof ContactHistory) {
Field contactBase = ContactHistory.class.getDeclaredField("contactBase");
contactBase.setAccessible(true);
@@ -267,4 +300,86 @@ final class ValidateSqlUtils {
throw new RuntimeException(e);
}
}
+
+ /**
+ * Returns {@code true} if {@code entity} is created by the prober and needs special treatment.
+ *
+ * {@link EppResource} entities created by the prober are deleted by a cron job that bypasses
+ * the CommitLog mechanism. As a result, their deletions are not propagated to SQL, creating two
+ * types of mismatches: an entity exists in both databases but differs in lastUpdateTime and
+ * deletionTime; an entity only exists in the SQL database.
+ *
+ *
In production, there are few placeholder {@link Registrar registrars} that do not exist in
+ * Datastore. They were manually created to in SQL to solve a one-time problem (see b/187946868
+ * for details). They can be ignored in the database comparison.
+ */
+ static boolean isSpecialCaseProberEntity(Object entity) {
+ if (entity instanceof EppResource) {
+ EppResource host = (EppResource) entity;
+ if (host.getPersistedCurrentSponsorRegistrarId().startsWith("prober-")) {
+ return true;
+ }
+ }
+ if (entity instanceof HistoryEntry) {
+ HistoryEntry historyEntry = (HistoryEntry) entity;
+ if (historyEntry.getRegistrarId().startsWith("prober-")) {
+ // Not all prober entities have "prober-" as registrar prefix.
+ return true;
+ }
+ if (Objects.equals(historyEntry.getReason(), "Deletion of prober data")) {
+ // Soft-delete event in Datastore that is not propagated to SQL.
+ return true;
+ }
+ }
+ if (entity instanceof DomainHistory) {
+ DomainHistory domainHistory = (DomainHistory) entity;
+ if (domainHistory.getDomainContent().isPresent()
+ && domainHistory.getDomainContent().get().getDomainName().startsWith("prober-")) {
+ // Asynchronously replicated event in SQL.
+ return true;
+ }
+ if (domainHistory.getDomainRepoId() != null) {
+ // Some synthetic events only have domainRepoId.
+ String repoId = domainHistory.getDomainRepoId();
+ if (Transforms.IGNORED_DOMAINS.contains(repoId)) {
+ return true;
+ }
+ String suffix = repoId.substring(repoId.indexOf('-') + 1);
+ String cell = suffix.substring(0, 2);
+ suffix = suffix.substring(2);
+ if (PROBER_CELLS.contains(cell) && PROBER_TYPES.contains(suffix)) {
+ return true;
+ }
+ }
+ }
+ if (entity instanceof ContactHistory) {
+ if (Transforms.IGNORED_CONTACTS.contains(((ContactHistory) entity).getContactRepoId())) {
+ return true;
+ }
+ }
+ if (entity instanceof HostHistory) {
+ if (Transforms.IGNORED_HOSTS.contains(((HostHistory) entity).getHostRepoId())) {
+ return true;
+ }
+ }
+ if (entity instanceof BillingEvent) {
+ BillingEvent event = (BillingEvent) entity;
+ if (event.getRegistrarId().startsWith("prober-")) {
+ return true;
+ }
+ }
+ if (entity instanceof PollMessage) {
+ if (((PollMessage) entity).getRegistrarId().startsWith("prober-")) {
+ return true;
+ }
+ }
+ if (RegistryEnvironment.get().equals(RegistryEnvironment.PRODUCTION)
+ && entity instanceof Registrar) {
+ Registrar registrar = (Registrar) entity;
+ if (registrar.getRegistrarId().startsWith("prober-wj-")) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/core/src/main/java/google/registry/beam/initsql/Transforms.java b/core/src/main/java/google/registry/beam/initsql/Transforms.java
index 821dacf26..1216152e2 100644
--- a/core/src/main/java/google/registry/beam/initsql/Transforms.java
+++ b/core/src/main/java/google/registry/beam/initsql/Transforms.java
@@ -261,16 +261,19 @@ public final class Transforms {
.iterator()));
}
- // Production data repair configs go below. See b/185954992.
+ // Production data repair configs go below. See b/185954992. Note that the CommitLog replay
+ // process does not filter out the ignored entities listed below, a mistake that we do not fix
+ // for operational convenience. Instead, the Database comparison tool will filter them out. See
+ // ValidateSqlUtils.java for more information.
// Prober domains in bad state, without associated contacts, hosts, billings, and non-synthesized
// history. They can be safely ignored.
- private static final ImmutableSet IGNORED_DOMAINS =
+ public static final ImmutableSet IGNORED_DOMAINS =
ImmutableSet.of("6AF6D2-IQCANT", "2-IQANYT");
// Prober hosts referencing phantom registrars. They and their associated history entries can be
// safely ignored.
- private static final ImmutableSet IGNORED_HOSTS =
+ public static final ImmutableSet IGNORED_HOSTS =
ImmutableSet.of(
"4E21_WJ0TEST-GOOGLE",
"4E21_WJ1TEST-GOOGLE",
@@ -279,7 +282,7 @@ public final class Transforms {
// Prober contacts referencing phantom registrars. They and their associated history entries can
// be safely ignored.
- private static final ImmutableSet IGNORED_CONTACTS =
+ public static final ImmutableSet IGNORED_CONTACTS =
ImmutableSet.of(
"1_WJ0TEST-GOOGLE", "1_WJ1TEST-GOOGLE", "1_WJ2TEST-GOOGLE", "1_WJ3TEST-GOOGLE");
@@ -300,6 +303,13 @@ public final class Transforms {
return !IGNORED_HOSTS.contains(roid);
}
if (entity.getKind().equals("HistoryEntry")) {
+ // DOMAIN_APPLICATION_CREATE is deprecated type and should not be migrated.
+ // The Enum name DOMAIN_APPLICATION_CREATE no longer exists in Java and cannot
+ // be deserialized.
+ if (Objects.equals(entity.getProperty("type"), "DOMAIN_APPLICATION_CREATE")) {
+ return false;
+ }
+
// Remove production bad data: Histories of ignored EPP resources:
com.google.appengine.api.datastore.Key parentKey = entity.getKey().getParent();
if (parentKey.getKind().equals("ContactResource")) {
@@ -315,14 +325,6 @@ public final class Transforms {
return !IGNORED_DOMAINS.contains(domainRoid);
}
}
- // End of production-specific checks.
-
- if (entity.getKind().equals("HistoryEntry")) {
- // DOMAIN_APPLICATION_CREATE is deprecated type and should not be migrated.
- // The Enum name DOMAIN_APPLICATION_CREATE no longer exists in Java and cannot
- // be deserialized.
- return !Objects.equals(entity.getProperty("type"), "DOMAIN_APPLICATION_CREATE");
- }
return true;
}
diff --git a/core/src/main/java/google/registry/model/bulkquery/DomainHistoryHost.java b/core/src/main/java/google/registry/model/bulkquery/DomainHistoryHost.java
index 9cd2ed767..796d104a7 100644
--- a/core/src/main/java/google/registry/model/bulkquery/DomainHistoryHost.java
+++ b/core/src/main/java/google/registry/model/bulkquery/DomainHistoryHost.java
@@ -14,6 +14,7 @@
package google.registry.model.bulkquery;
+import com.google.common.base.Objects;
import google.registry.model.domain.DomainHistory.DomainHistoryId;
import google.registry.model.host.HostResource;
import google.registry.model.replay.SqlOnlyEntity;
@@ -47,4 +48,23 @@ public class DomainHistoryHost implements Serializable, SqlOnlyEntity {
public VKey getHostVKey() {
return VKey.create(HostResource.class, hostRepoId);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DomainHistoryHost)) {
+ return false;
+ }
+ DomainHistoryHost that = (DomainHistoryHost) o;
+ return Objects.equal(domainHistoryHistoryRevisionId, that.domainHistoryHistoryRevisionId)
+ && Objects.equal(domainHistoryDomainRepoId, that.domainHistoryDomainRepoId)
+ && Objects.equal(hostRepoId, that.hostRepoId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(domainHistoryHistoryRevisionId, domainHistoryDomainRepoId, hostRepoId);
+ }
}
diff --git a/core/src/main/java/google/registry/model/bulkquery/DomainHost.java b/core/src/main/java/google/registry/model/bulkquery/DomainHost.java
index 3d6761f78..8055b5f4c 100644
--- a/core/src/main/java/google/registry/model/bulkquery/DomainHost.java
+++ b/core/src/main/java/google/registry/model/bulkquery/DomainHost.java
@@ -14,6 +14,7 @@
package google.registry.model.bulkquery;
+import com.google.common.base.Objects;
import google.registry.model.host.HostResource;
import google.registry.model.replay.SqlOnlyEntity;
import google.registry.persistence.VKey;
@@ -43,4 +44,22 @@ public class DomainHost implements Serializable, SqlOnlyEntity {
public VKey getHostVKey() {
return VKey.create(HostResource.class, hostRepoId);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DomainHost)) {
+ return false;
+ }
+ DomainHost that = (DomainHost) o;
+ return Objects.equal(domainRepoId, that.domainRepoId)
+ && Objects.equal(hostRepoId, that.hostRepoId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(domainRepoId, hostRepoId);
+ }
}
diff --git a/core/src/main/java/google/registry/model/poll/PollMessage.java b/core/src/main/java/google/registry/model/poll/PollMessage.java
index ffa08cdb0..7491c4551 100644
--- a/core/src/main/java/google/registry/model/poll/PollMessage.java
+++ b/core/src/main/java/google/registry/model/poll/PollMessage.java
@@ -395,9 +395,11 @@ public abstract class PollMessage extends ImmutableObject
super.onLoad();
// Take the Objectify-specific fields and map them to the SQL-specific fields, if applicable
if (!isNullOrEmpty(contactPendingActionNotificationResponses)) {
+ contactId = contactPendingActionNotificationResponses.get(0).getId().value;
pendingActionNotificationResponse = contactPendingActionNotificationResponses.get(0);
}
if (!isNullOrEmpty(hostPendingActionNotificationResponses)) {
+ hostId = hostPendingActionNotificationResponses.get(0).nameOrId.value;
pendingActionNotificationResponse = hostPendingActionNotificationResponses.get(0);
}
if (!isNullOrEmpty(contactTransferResponses)) {