Track and replay Transaction table gaps (#1557)

* Track and replay Transaction table gaps

Id gaps in the Transaction table can be the result of a transactions committed
out of order.  To deal with this, keep track of gaps for up to five minutes
and check to see if they've been back-filled prior to applying the next batch
of transactions during reply.

* Changes for review

* Calculate gap expiration time before gap queries

* Reformat.
This commit is contained in:
Michael Muller 2022-03-16 11:08:45 -04:00 committed by GitHub
parent a3ca052115
commit b248071eb3
6 changed files with 278 additions and 0 deletions

View file

@ -40,6 +40,7 @@ import google.registry.model.rde.RdeRevision;
import google.registry.model.registrar.Registrar; import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.RegistrarContact; import google.registry.model.registrar.RegistrarContact;
import google.registry.model.replay.LastSqlTransaction; import google.registry.model.replay.LastSqlTransaction;
import google.registry.model.replay.ReplayGap;
import google.registry.model.reporting.HistoryEntry; import google.registry.model.reporting.HistoryEntry;
import google.registry.model.server.Lock; import google.registry.model.server.Lock;
import google.registry.model.server.ServerSecret; import google.registry.model.server.ServerSecret;
@ -86,6 +87,7 @@ public final class EntityClasses {
Registrar.class, Registrar.class,
RegistrarContact.class, RegistrarContact.class,
Registry.class, Registry.class,
ReplayGap.class,
ServerSecret.class); ServerSecret.class);
private EntityClasses() {} private EntityClasses() {}

View file

@ -0,0 +1,60 @@
// Copyright 2022 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.model.replay;
import static google.registry.model.annotations.NotBackedUp.Reason.TRANSIENT;
import com.googlecode.objectify.annotation.Entity;
import com.googlecode.objectify.annotation.Id;
import google.registry.model.ImmutableObject;
import google.registry.model.annotations.DeleteAfterMigration;
import google.registry.model.annotations.NotBackedUp;
import org.joda.time.DateTime;
/**
* Tracks gaps in transaction ids when replicating from SQL to datastore.
*
* <p>SQL -&gt; DS replication uses a Transaction table indexed by a SEQUENCE column, which normally
* increments monotonically for each committed transaction. Gaps in this sequence can occur when a
* transaction is rolled back or when a transaction has been initiated but not committed to the
* table at the time of a query. To protect us from the latter scenario, we need to keep track of
* these gaps and replay any of them that have been filled in since we processed their batch.
*/
@DeleteAfterMigration
@NotBackedUp(reason = TRANSIENT)
@Entity
public class ReplayGap extends ImmutableObject implements DatastoreOnlyEntity {
@Id long transactionId;
// We can't use a CreateAutoTimestamp here because this ends up getting persisted in an ofy
// transaction that happens in JPA mode, so we don't end up getting an active transaction manager
// when the timestamp needs to be set.
DateTime timestamp;
ReplayGap() {}
ReplayGap(DateTime timestamp, long transactionId) {
this.timestamp = timestamp;
this.transactionId = transactionId;
}
long getTransactionId() {
return transactionId;
}
DateTime getTimestamp() {
return timestamp;
}
}

View file

@ -14,6 +14,8 @@
package google.registry.model.replay; package google.registry.model.replay;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static google.registry.model.ofy.ObjectifyService.auditedOfy; import static google.registry.model.ofy.ObjectifyService.auditedOfy;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
@ -70,6 +72,12 @@ public class ReplicateToDatastoreAction implements Runnable {
*/ */
public static final int BATCH_SIZE = 200; public static final int BATCH_SIZE = 200;
/**
* The longest time that we'll keep trying to resolve a gap in the Transaction table in
* milliseconds, after which, the gap record will be deleted.
*/
public static final long MAX_GAP_RETENTION_MILLIS = 300000;
public static final Duration REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH = standardHours(1); public static final Duration REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH = standardHours(1);
private final Clock clock; private final Clock clock;
@ -89,6 +97,12 @@ public class ReplicateToDatastoreAction implements Runnable {
return getTransactionBatchAtSnapshot(Optional.empty()); return getTransactionBatchAtSnapshot(Optional.empty());
} }
/**
* Get the next batch of transactions, optionally from a specific SQL database snapshot.
*
* <p>Note that this method may also apply transactions from previous batches that had not yet
* been committed at the time the previous batch was retrieved.
*/
static List<TransactionEntity> getTransactionBatchAtSnapshot(Optional<String> snapshotId) { static List<TransactionEntity> getTransactionBatchAtSnapshot(Optional<String> snapshotId) {
// Get the next batch of transactions that we haven't replicated. // Get the next batch of transactions that we haven't replicated.
LastSqlTransaction lastSqlTxnBeforeBatch = ofyTm().transact(LastSqlTransaction::load); LastSqlTransaction lastSqlTxnBeforeBatch = ofyTm().transact(LastSqlTransaction::load);
@ -97,6 +111,11 @@ public class ReplicateToDatastoreAction implements Runnable {
.transactWithoutBackup( .transactWithoutBackup(
() -> { () -> {
snapshotId.ifPresent(jpaTm()::setDatabaseSnapshot); snapshotId.ifPresent(jpaTm()::setDatabaseSnapshot);
// Fill in any gaps in the transaction log that have since become available before
// processing the next batch.
applyMissingTransactions();
return jpaTm() return jpaTm()
.query( .query(
"SELECT txn FROM TransactionEntity txn WHERE id >" + " :lastId ORDER BY id", "SELECT txn FROM TransactionEntity txn WHERE id >" + " :lastId ORDER BY id",
@ -110,6 +129,68 @@ public class ReplicateToDatastoreAction implements Runnable {
} }
} }
/**
* Iterate over the recent gaps in the Transaction table and apply any that have been filled in.
*
* <p>Must be called from within a JPA transaction.
*
* <p>Gap rewriting is a complicated matter, and the algorithm is the product of some very deep
* consideration by mmuller and weiminyu. Basically, the constraints are:
*
* <ol>
* <li>Replay has to work against a database snapshot (gap replay would break this, so we don't
* call this method when replaying against a snapshot)
* </ol>
*/
private static void applyMissingTransactions() {
long now = jpaTm().getTransactionTime().getMillis();
ImmutableList<ReplayGap> gaps = ofyTm().loadAllOf(ReplayGap.class);
jpaTm()
.query("SELECT txn from TransactionEntity txn WHERE id IN :gapIds", TransactionEntity.class)
.setParameter(
"gapIds", gaps.stream().map(gap -> gap.getTransactionId()).collect(toImmutableList()))
.getResultStream()
.forEach(
txn -> {
// Transcribe the transaction and delete the gap record in the same ofy transaction.
ofyTm()
.transact(
() -> {
// Write the transaction to datastore.
try {
Transaction.deserialize(txn.getContents()).writeToDatastore();
} catch (IOException e) {
throw new RuntimeException("Error during transaction deserialization", e);
}
// Find and delete the gap record.
ImmutableList<ReplayGap> filledGaps =
gaps.stream()
.filter(gap -> gap.getTransactionId() == txn.getId())
.collect(toImmutableList());
checkState(
filledGaps.size() == 1,
"Bad list of gaps for discovered id: %s",
filledGaps);
auditedOfy().deleteIgnoringReadOnlyWithoutBackup().entity(gaps.get(0));
});
logger.atInfo().log("Applied missing transaction %s", txn.getId());
});
// Clean up any gaps that have expired.
ofyTm()
.transact(
() ->
gaps.stream()
.forEach(
gap -> {
if (now - gap.getTimestamp().getMillis() > MAX_GAP_RETENTION_MILLIS) {
auditedOfy().deleteIgnoringReadOnlyWithoutBackup().entity(gap);
logger.atInfo().log("Removed expired gap %s", gap);
}
}));
}
/** /**
* Apply a transaction to Datastore, returns true if there was a fatal error and the batch should * Apply a transaction to Datastore, returns true if there was a fatal error and the batch should
* be aborted. * be aborted.
@ -133,6 +214,9 @@ public class ReplicateToDatastoreAction implements Runnable {
while (nextTxnId < txnEntity.getId()) { while (nextTxnId < txnEntity.getId()) {
logger.atWarning().log( logger.atWarning().log(
"Ignoring transaction %s, which does not exist.", nextTxnId); "Ignoring transaction %s, which does not exist.", nextTxnId);
auditedOfy()
.saveIgnoringReadOnlyWithoutBackup()
.entity(new ReplayGap(ofyTm().getTransactionTime(), nextTxnId));
++nextTxnId; ++nextTxnId;
} }

View file

@ -41,6 +41,7 @@ import google.registry.model.rde.RdeRevision;
import google.registry.model.registrar.Registrar; import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.RegistrarContact; import google.registry.model.registrar.RegistrarContact;
import google.registry.model.replay.LastSqlTransaction; import google.registry.model.replay.LastSqlTransaction;
import google.registry.model.replay.ReplayGap;
import google.registry.model.reporting.HistoryEntry; import google.registry.model.reporting.HistoryEntry;
import google.registry.model.server.Lock; import google.registry.model.server.Lock;
import google.registry.model.server.ServerSecret; import google.registry.model.server.ServerSecret;
@ -74,6 +75,7 @@ public class ClassPathManagerTest {
assertThat(ClassPathManager.getClass("HostResource")).isEqualTo(HostResource.class); assertThat(ClassPathManager.getClass("HostResource")).isEqualTo(HostResource.class);
assertThat(ClassPathManager.getClass("Recurring")).isEqualTo(Recurring.class); assertThat(ClassPathManager.getClass("Recurring")).isEqualTo(Recurring.class);
assertThat(ClassPathManager.getClass("Registrar")).isEqualTo(Registrar.class); assertThat(ClassPathManager.getClass("Registrar")).isEqualTo(Registrar.class);
assertThat(ClassPathManager.getClass("ReplayGap")).isEqualTo(ReplayGap.class);
assertThat(ClassPathManager.getClass("ContactResource")).isEqualTo(ContactResource.class); assertThat(ClassPathManager.getClass("ContactResource")).isEqualTo(ContactResource.class);
assertThat(ClassPathManager.getClass("Cancellation")).isEqualTo(Cancellation.class); assertThat(ClassPathManager.getClass("Cancellation")).isEqualTo(Cancellation.class);
assertThat(ClassPathManager.getClass("RegistrarContact")).isEqualTo(RegistrarContact.class); assertThat(ClassPathManager.getClass("RegistrarContact")).isEqualTo(RegistrarContact.class);
@ -141,6 +143,7 @@ public class ClassPathManagerTest {
assertThat(ClassPathManager.getClassName(HostResource.class)).isEqualTo("HostResource"); assertThat(ClassPathManager.getClassName(HostResource.class)).isEqualTo("HostResource");
assertThat(ClassPathManager.getClassName(Recurring.class)).isEqualTo("Recurring"); assertThat(ClassPathManager.getClassName(Recurring.class)).isEqualTo("Recurring");
assertThat(ClassPathManager.getClassName(Registrar.class)).isEqualTo("Registrar"); assertThat(ClassPathManager.getClassName(Registrar.class)).isEqualTo("Registrar");
assertThat(ClassPathManager.getClassName(ReplayGap.class)).isEqualTo("ReplayGap");
assertThat(ClassPathManager.getClassName(ContactResource.class)).isEqualTo("ContactResource"); assertThat(ClassPathManager.getClassName(ContactResource.class)).isEqualTo("ContactResource");
assertThat(ClassPathManager.getClassName(Cancellation.class)).isEqualTo("Cancellation"); assertThat(ClassPathManager.getClassName(Cancellation.class)).isEqualTo("Cancellation");
assertThat(ClassPathManager.getClassName(RegistrarContact.class)).isEqualTo("RegistrarContact"); assertThat(ClassPathManager.getClassName(RegistrarContact.class)).isEqualTo("RegistrarContact");

View file

@ -27,11 +27,13 @@ import static org.junit.jupiter.api.Assumptions.assumeTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.ImmutableSortedMap;
import com.google.common.flogger.FluentLogger; import com.google.common.flogger.FluentLogger;
import com.google.common.testing.TestLogHandler; import com.google.common.testing.TestLogHandler;
import com.google.common.truth.Truth8; import com.google.common.truth.Truth8;
import com.googlecode.objectify.Key;
import google.registry.model.common.DatabaseMigrationStateSchedule; import google.registry.model.common.DatabaseMigrationStateSchedule;
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState; import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
import google.registry.model.domain.token.AllocationToken; import google.registry.model.domain.token.AllocationToken;
@ -39,6 +41,8 @@ import google.registry.model.domain.token.AllocationToken.TokenType;
import google.registry.model.ofy.CommitLogBucket; import google.registry.model.ofy.CommitLogBucket;
import google.registry.model.ofy.Ofy; import google.registry.model.ofy.Ofy;
import google.registry.model.server.Lock; import google.registry.model.server.Lock;
import google.registry.persistence.VKey;
import google.registry.persistence.transaction.JpaTransactionManagerImpl;
import google.registry.persistence.transaction.TransactionEntity; import google.registry.persistence.transaction.TransactionEntity;
import google.registry.testing.AppEngineExtension; import google.registry.testing.AppEngineExtension;
import google.registry.testing.DatabaseHelper; import google.registry.testing.DatabaseHelper;
@ -48,9 +52,13 @@ import google.registry.testing.InjectExtension;
import google.registry.testing.ReplayExtension; import google.registry.testing.ReplayExtension;
import google.registry.testing.TestObject; import google.registry.testing.TestObject;
import google.registry.util.RequestStatusChecker; import google.registry.util.RequestStatusChecker;
import java.lang.reflect.Proxy;
import java.util.List; import java.util.List;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.EntityTransaction;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -241,6 +249,64 @@ public class ReplicateToDatastoreActionTest {
assertThat(ofyTm().transact(() -> LastSqlTransaction.load()).getTransactionId()).isEqualTo(4); assertThat(ofyTm().transact(() -> LastSqlTransaction.load()).getTransactionId()).isEqualTo(4);
} }
@Test
void testTransactionGapReplay() {
insertInDb(TestObject.create("foo"));
DeferredCommit deferred = new DeferredCommit(fakeClock);
TestObject bar = TestObject.create("bar");
deferred.transact(() -> jpaTm().insert(bar));
TestObject baz = TestObject.create("baz");
insertInDb(baz);
action.run();
assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(bar.key())).isPresent()).isFalse();
assertThat(ofyTm().transact(() -> ofyTm().loadByKey(baz.key()))).isEqualTo(baz);
VKey<ReplayGap> gapKey = VKey.createOfy(ReplayGap.class, Key.create(ReplayGap.class, 2));
Truth8.assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(gapKey))).isPresent();
deferred.commit();
resetAction();
action.run();
assertThat(ofyTm().transact(() -> ofyTm().loadByKey(bar.key()))).isEqualTo(bar);
// Verify that the gap record has been cleaned up.
assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(gapKey).isPresent())).isFalse();
}
@Test
void testGapRecordExpiration() {
insertInDb(TestObject.create("foo"));
// Fail a transaction, create a gap.
try {
jpaTm()
.transact(
() -> {
jpaTm().insert(TestObject.create("other"));
// Explicitly save the transaction entity to force the id update.
jpaTm().insert(new TransactionEntity(new byte[] {1, 2, 3}));
throw new RuntimeException("fail!!!");
});
} catch (Exception e) {
logger.atInfo().log("Got expected exception.");
}
insertInDb(TestObject.create("bar"));
action.run();
// Verify that the gap record has been created
VKey<ReplayGap> gapKey = VKey.createOfy(ReplayGap.class, Key.create(ReplayGap.class, 2));
Truth8.assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(gapKey))).isPresent();
fakeClock.advanceBy(Duration.millis(ReplicateToDatastoreAction.MAX_GAP_RETENTION_MILLIS + 1));
resetAction();
action.run();
// Verify that the gap record has been destroyed.
assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(gapKey)).isPresent()).isFalse();
}
@Test @Test
void testBeforeDatastoreSaveCallback() { void testBeforeDatastoreSaveCallback() {
assumeTrue(ReplayExtension.replayTestsEnabled()); assumeTrue(ReplayExtension.replayTestsEnabled());
@ -327,4 +393,63 @@ public class ReplicateToDatastoreActionTest {
when(requestStatusChecker.getLogId()).thenReturn("logId"); when(requestStatusChecker.getLogId()).thenReturn("logId");
action = new ReplicateToDatastoreAction(fakeClock, requestStatusChecker, response); action = new ReplicateToDatastoreAction(fakeClock, requestStatusChecker, response);
} }
/**
* Deep fake of EntityManagerFactory -> EntityManager -> EntityTransaction that allows us to defer
* the actual commit until after the other transactions are replayed.
*/
static class DeferredCommit {
FakeClock clock;
EntityTransaction realTransaction;
DeferredCommit(FakeClock clock) {
this.clock = clock;
}
private static <T> T makeProxy(
Class<T> iface, T delegate, String method, Supplier<?> supplier) {
return (T)
Proxy.newProxyInstance(
delegate.getClass().getClassLoader(),
new Class[] {iface},
(proxy, meth, args) -> {
if (meth.getName().equals(method)) {
return supplier.get();
} else {
return meth.invoke(delegate, args);
}
});
}
EntityManager createEntityManagerProxy(EntityManager orgEntityManager) {
return makeProxy(
EntityManager.class,
orgEntityManager,
"getTransaction",
() ->
makeProxy(
EntityTransaction.class,
realTransaction = orgEntityManager.getTransaction(),
"commit",
() -> null));
}
void commit() {
realTransaction.commit();
}
void transact(Runnable runnable) {
EntityManagerFactory orgEmf =
jpaTm().transact(() -> jpaTm().getEntityManager().getEntityManagerFactory());
EntityManagerFactory emfProxy =
makeProxy(
EntityManagerFactory.class,
orgEmf,
"createEntityManager",
() -> createEntityManagerProxy(orgEmf.createEntityManager()));
new JpaTransactionManagerImpl(emfProxy, clock).transact(runnable);
}
}
} }

View file

@ -641,6 +641,10 @@ class google.registry.model.replay.LastSqlTransaction {
@Id long id; @Id long id;
long transactionId; long transactionId;
} }
class google.registry.model.replay.ReplayGap {
@Id long transactionId;
org.joda.time.DateTime timestamp;
}
class google.registry.model.reporting.DomainTransactionRecord { class google.registry.model.reporting.DomainTransactionRecord {
google.registry.model.reporting.DomainTransactionRecord$TransactionReportField reportField; google.registry.model.reporting.DomainTransactionRecord$TransactionReportField reportField;
java.lang.Integer reportAmount; java.lang.Integer reportAmount;