diff --git a/core/src/main/java/google/registry/model/EntityClasses.java b/core/src/main/java/google/registry/model/EntityClasses.java
index ba5fd409a..2b9db8e72 100644
--- a/core/src/main/java/google/registry/model/EntityClasses.java
+++ b/core/src/main/java/google/registry/model/EntityClasses.java
@@ -40,6 +40,7 @@ import google.registry.model.rde.RdeRevision;
import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.RegistrarContact;
import google.registry.model.replay.LastSqlTransaction;
+import google.registry.model.replay.ReplayGap;
import google.registry.model.reporting.HistoryEntry;
import google.registry.model.server.Lock;
import google.registry.model.server.ServerSecret;
@@ -86,6 +87,7 @@ public final class EntityClasses {
Registrar.class,
RegistrarContact.class,
Registry.class,
+ ReplayGap.class,
ServerSecret.class);
private EntityClasses() {}
diff --git a/core/src/main/java/google/registry/model/replay/ReplayGap.java b/core/src/main/java/google/registry/model/replay/ReplayGap.java
new file mode 100644
index 000000000..921c9ea35
--- /dev/null
+++ b/core/src/main/java/google/registry/model/replay/ReplayGap.java
@@ -0,0 +1,60 @@
+// Copyright 2022 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.model.replay;
+
+import static google.registry.model.annotations.NotBackedUp.Reason.TRANSIENT;
+
+import com.googlecode.objectify.annotation.Entity;
+import com.googlecode.objectify.annotation.Id;
+import google.registry.model.ImmutableObject;
+import google.registry.model.annotations.DeleteAfterMigration;
+import google.registry.model.annotations.NotBackedUp;
+import org.joda.time.DateTime;
+
+/**
+ * Tracks gaps in transaction ids when replicating from SQL to datastore.
+ *
+ *
SQL -> DS replication uses a Transaction table indexed by a SEQUENCE column, which normally
+ * increments monotonically for each committed transaction. Gaps in this sequence can occur when a
+ * transaction is rolled back or when a transaction has been initiated but not committed to the
+ * table at the time of a query. To protect us from the latter scenario, we need to keep track of
+ * these gaps and replay any of them that have been filled in since we processed their batch.
+ */
+@DeleteAfterMigration
+@NotBackedUp(reason = TRANSIENT)
+@Entity
+public class ReplayGap extends ImmutableObject implements DatastoreOnlyEntity {
+ @Id long transactionId;
+
+ // We can't use a CreateAutoTimestamp here because this ends up getting persisted in an ofy
+ // transaction that happens in JPA mode, so we don't end up getting an active transaction manager
+ // when the timestamp needs to be set.
+ DateTime timestamp;
+
+ ReplayGap() {}
+
+ ReplayGap(DateTime timestamp, long transactionId) {
+ this.timestamp = timestamp;
+ this.transactionId = transactionId;
+ }
+
+ long getTransactionId() {
+ return transactionId;
+ }
+
+ DateTime getTimestamp() {
+ return timestamp;
+ }
+}
diff --git a/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java b/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java
index 3b6b7fdc7..f3836560e 100644
--- a/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java
+++ b/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java
@@ -14,6 +14,8 @@
package google.registry.model.replay;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableList.toImmutableList;
import static google.registry.model.ofy.ObjectifyService.auditedOfy;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
@@ -70,6 +72,12 @@ public class ReplicateToDatastoreAction implements Runnable {
*/
public static final int BATCH_SIZE = 200;
+ /**
+ * The longest time that we'll keep trying to resolve a gap in the Transaction table in
+ * milliseconds, after which, the gap record will be deleted.
+ */
+ public static final long MAX_GAP_RETENTION_MILLIS = 300000;
+
public static final Duration REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH = standardHours(1);
private final Clock clock;
@@ -89,6 +97,12 @@ public class ReplicateToDatastoreAction implements Runnable {
return getTransactionBatchAtSnapshot(Optional.empty());
}
+ /**
+ * Get the next batch of transactions, optionally from a specific SQL database snapshot.
+ *
+ *
Note that this method may also apply transactions from previous batches that had not yet
+ * been committed at the time the previous batch was retrieved.
+ */
static List getTransactionBatchAtSnapshot(Optional snapshotId) {
// Get the next batch of transactions that we haven't replicated.
LastSqlTransaction lastSqlTxnBeforeBatch = ofyTm().transact(LastSqlTransaction::load);
@@ -97,6 +111,11 @@ public class ReplicateToDatastoreAction implements Runnable {
.transactWithoutBackup(
() -> {
snapshotId.ifPresent(jpaTm()::setDatabaseSnapshot);
+
+ // Fill in any gaps in the transaction log that have since become available before
+ // processing the next batch.
+ applyMissingTransactions();
+
return jpaTm()
.query(
"SELECT txn FROM TransactionEntity txn WHERE id >" + " :lastId ORDER BY id",
@@ -110,6 +129,68 @@ public class ReplicateToDatastoreAction implements Runnable {
}
}
+ /**
+ * Iterate over the recent gaps in the Transaction table and apply any that have been filled in.
+ *
+ * Must be called from within a JPA transaction.
+ *
+ *
Gap rewriting is a complicated matter, and the algorithm is the product of some very deep
+ * consideration by mmuller and weiminyu. Basically, the constraints are:
+ *
+ *
+ * - Replay has to work against a database snapshot (gap replay would break this, so we don't
+ * call this method when replaying against a snapshot)
+ *
+ */
+ private static void applyMissingTransactions() {
+ long now = jpaTm().getTransactionTime().getMillis();
+ ImmutableList gaps = ofyTm().loadAllOf(ReplayGap.class);
+ jpaTm()
+ .query("SELECT txn from TransactionEntity txn WHERE id IN :gapIds", TransactionEntity.class)
+ .setParameter(
+ "gapIds", gaps.stream().map(gap -> gap.getTransactionId()).collect(toImmutableList()))
+ .getResultStream()
+ .forEach(
+ txn -> {
+ // Transcribe the transaction and delete the gap record in the same ofy transaction.
+ ofyTm()
+ .transact(
+ () -> {
+ // Write the transaction to datastore.
+ try {
+ Transaction.deserialize(txn.getContents()).writeToDatastore();
+ } catch (IOException e) {
+ throw new RuntimeException("Error during transaction deserialization", e);
+ }
+
+ // Find and delete the gap record.
+ ImmutableList filledGaps =
+ gaps.stream()
+ .filter(gap -> gap.getTransactionId() == txn.getId())
+ .collect(toImmutableList());
+ checkState(
+ filledGaps.size() == 1,
+ "Bad list of gaps for discovered id: %s",
+ filledGaps);
+ auditedOfy().deleteIgnoringReadOnlyWithoutBackup().entity(gaps.get(0));
+ });
+ logger.atInfo().log("Applied missing transaction %s", txn.getId());
+ });
+
+ // Clean up any gaps that have expired.
+ ofyTm()
+ .transact(
+ () ->
+ gaps.stream()
+ .forEach(
+ gap -> {
+ if (now - gap.getTimestamp().getMillis() > MAX_GAP_RETENTION_MILLIS) {
+ auditedOfy().deleteIgnoringReadOnlyWithoutBackup().entity(gap);
+ logger.atInfo().log("Removed expired gap %s", gap);
+ }
+ }));
+ }
+
/**
* Apply a transaction to Datastore, returns true if there was a fatal error and the batch should
* be aborted.
@@ -133,6 +214,9 @@ public class ReplicateToDatastoreAction implements Runnable {
while (nextTxnId < txnEntity.getId()) {
logger.atWarning().log(
"Ignoring transaction %s, which does not exist.", nextTxnId);
+ auditedOfy()
+ .saveIgnoringReadOnlyWithoutBackup()
+ .entity(new ReplayGap(ofyTm().getTransactionTime(), nextTxnId));
++nextTxnId;
}
diff --git a/core/src/test/java/google/registry/model/common/ClassPathManagerTest.java b/core/src/test/java/google/registry/model/common/ClassPathManagerTest.java
index 458f039ea..294ce7a3e 100644
--- a/core/src/test/java/google/registry/model/common/ClassPathManagerTest.java
+++ b/core/src/test/java/google/registry/model/common/ClassPathManagerTest.java
@@ -41,6 +41,7 @@ import google.registry.model.rde.RdeRevision;
import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.RegistrarContact;
import google.registry.model.replay.LastSqlTransaction;
+import google.registry.model.replay.ReplayGap;
import google.registry.model.reporting.HistoryEntry;
import google.registry.model.server.Lock;
import google.registry.model.server.ServerSecret;
@@ -74,6 +75,7 @@ public class ClassPathManagerTest {
assertThat(ClassPathManager.getClass("HostResource")).isEqualTo(HostResource.class);
assertThat(ClassPathManager.getClass("Recurring")).isEqualTo(Recurring.class);
assertThat(ClassPathManager.getClass("Registrar")).isEqualTo(Registrar.class);
+ assertThat(ClassPathManager.getClass("ReplayGap")).isEqualTo(ReplayGap.class);
assertThat(ClassPathManager.getClass("ContactResource")).isEqualTo(ContactResource.class);
assertThat(ClassPathManager.getClass("Cancellation")).isEqualTo(Cancellation.class);
assertThat(ClassPathManager.getClass("RegistrarContact")).isEqualTo(RegistrarContact.class);
@@ -141,6 +143,7 @@ public class ClassPathManagerTest {
assertThat(ClassPathManager.getClassName(HostResource.class)).isEqualTo("HostResource");
assertThat(ClassPathManager.getClassName(Recurring.class)).isEqualTo("Recurring");
assertThat(ClassPathManager.getClassName(Registrar.class)).isEqualTo("Registrar");
+ assertThat(ClassPathManager.getClassName(ReplayGap.class)).isEqualTo("ReplayGap");
assertThat(ClassPathManager.getClassName(ContactResource.class)).isEqualTo("ContactResource");
assertThat(ClassPathManager.getClassName(Cancellation.class)).isEqualTo("Cancellation");
assertThat(ClassPathManager.getClassName(RegistrarContact.class)).isEqualTo("RegistrarContact");
diff --git a/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java b/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java
index c7c0ebef0..9da37e895 100644
--- a/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java
+++ b/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java
@@ -27,11 +27,13 @@ import static org.junit.jupiter.api.Assumptions.assumeTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.flogger.FluentLogger;
import com.google.common.testing.TestLogHandler;
import com.google.common.truth.Truth8;
+import com.googlecode.objectify.Key;
import google.registry.model.common.DatabaseMigrationStateSchedule;
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
import google.registry.model.domain.token.AllocationToken;
@@ -39,6 +41,8 @@ import google.registry.model.domain.token.AllocationToken.TokenType;
import google.registry.model.ofy.CommitLogBucket;
import google.registry.model.ofy.Ofy;
import google.registry.model.server.Lock;
+import google.registry.persistence.VKey;
+import google.registry.persistence.transaction.JpaTransactionManagerImpl;
import google.registry.persistence.transaction.TransactionEntity;
import google.registry.testing.AppEngineExtension;
import google.registry.testing.DatabaseHelper;
@@ -48,9 +52,13 @@ import google.registry.testing.InjectExtension;
import google.registry.testing.ReplayExtension;
import google.registry.testing.TestObject;
import google.registry.util.RequestStatusChecker;
+import java.lang.reflect.Proxy;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.EntityTransaction;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.jupiter.api.AfterEach;
@@ -241,6 +249,64 @@ public class ReplicateToDatastoreActionTest {
assertThat(ofyTm().transact(() -> LastSqlTransaction.load()).getTransactionId()).isEqualTo(4);
}
+ @Test
+ void testTransactionGapReplay() {
+ insertInDb(TestObject.create("foo"));
+ DeferredCommit deferred = new DeferredCommit(fakeClock);
+ TestObject bar = TestObject.create("bar");
+ deferred.transact(() -> jpaTm().insert(bar));
+ TestObject baz = TestObject.create("baz");
+ insertInDb(baz);
+
+ action.run();
+ assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(bar.key())).isPresent()).isFalse();
+ assertThat(ofyTm().transact(() -> ofyTm().loadByKey(baz.key()))).isEqualTo(baz);
+ VKey gapKey = VKey.createOfy(ReplayGap.class, Key.create(ReplayGap.class, 2));
+ Truth8.assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(gapKey))).isPresent();
+
+ deferred.commit();
+ resetAction();
+ action.run();
+
+ assertThat(ofyTm().transact(() -> ofyTm().loadByKey(bar.key()))).isEqualTo(bar);
+ // Verify that the gap record has been cleaned up.
+ assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(gapKey).isPresent())).isFalse();
+ }
+
+ @Test
+ void testGapRecordExpiration() {
+ insertInDb(TestObject.create("foo"));
+
+ // Fail a transaction, create a gap.
+ try {
+ jpaTm()
+ .transact(
+ () -> {
+ jpaTm().insert(TestObject.create("other"));
+ // Explicitly save the transaction entity to force the id update.
+ jpaTm().insert(new TransactionEntity(new byte[] {1, 2, 3}));
+ throw new RuntimeException("fail!!!");
+ });
+ } catch (Exception e) {
+ logger.atInfo().log("Got expected exception.");
+ }
+
+ insertInDb(TestObject.create("bar"));
+
+ action.run();
+
+ // Verify that the gap record has been created
+ VKey gapKey = VKey.createOfy(ReplayGap.class, Key.create(ReplayGap.class, 2));
+ Truth8.assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(gapKey))).isPresent();
+
+ fakeClock.advanceBy(Duration.millis(ReplicateToDatastoreAction.MAX_GAP_RETENTION_MILLIS + 1));
+ resetAction();
+ action.run();
+
+ // Verify that the gap record has been destroyed.
+ assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(gapKey)).isPresent()).isFalse();
+ }
+
@Test
void testBeforeDatastoreSaveCallback() {
assumeTrue(ReplayExtension.replayTestsEnabled());
@@ -327,4 +393,63 @@ public class ReplicateToDatastoreActionTest {
when(requestStatusChecker.getLogId()).thenReturn("logId");
action = new ReplicateToDatastoreAction(fakeClock, requestStatusChecker, response);
}
+
+ /**
+ * Deep fake of EntityManagerFactory -> EntityManager -> EntityTransaction that allows us to defer
+ * the actual commit until after the other transactions are replayed.
+ */
+ static class DeferredCommit {
+
+ FakeClock clock;
+ EntityTransaction realTransaction;
+
+ DeferredCommit(FakeClock clock) {
+ this.clock = clock;
+ }
+
+ private static T makeProxy(
+ Class iface, T delegate, String method, Supplier> supplier) {
+ return (T)
+ Proxy.newProxyInstance(
+ delegate.getClass().getClassLoader(),
+ new Class[] {iface},
+ (proxy, meth, args) -> {
+ if (meth.getName().equals(method)) {
+ return supplier.get();
+ } else {
+ return meth.invoke(delegate, args);
+ }
+ });
+ }
+
+ EntityManager createEntityManagerProxy(EntityManager orgEntityManager) {
+ return makeProxy(
+ EntityManager.class,
+ orgEntityManager,
+ "getTransaction",
+ () ->
+ makeProxy(
+ EntityTransaction.class,
+ realTransaction = orgEntityManager.getTransaction(),
+ "commit",
+ () -> null));
+ }
+
+ void commit() {
+ realTransaction.commit();
+ }
+
+ void transact(Runnable runnable) {
+ EntityManagerFactory orgEmf =
+ jpaTm().transact(() -> jpaTm().getEntityManager().getEntityManagerFactory());
+ EntityManagerFactory emfProxy =
+ makeProxy(
+ EntityManagerFactory.class,
+ orgEmf,
+ "createEntityManager",
+ () -> createEntityManagerProxy(orgEmf.createEntityManager()));
+
+ new JpaTransactionManagerImpl(emfProxy, clock).transact(runnable);
+ }
+ }
}
diff --git a/core/src/test/resources/google/registry/model/schema.txt b/core/src/test/resources/google/registry/model/schema.txt
index 69e825846..20374c039 100644
--- a/core/src/test/resources/google/registry/model/schema.txt
+++ b/core/src/test/resources/google/registry/model/schema.txt
@@ -641,6 +641,10 @@ class google.registry.model.replay.LastSqlTransaction {
@Id long id;
long transactionId;
}
+class google.registry.model.replay.ReplayGap {
+ @Id long transactionId;
+ org.joda.time.DateTime timestamp;
+}
class google.registry.model.reporting.DomainTransactionRecord {
google.registry.model.reporting.DomainTransactionRecord$TransactionReportField reportField;
java.lang.Integer reportAmount;