Improve Transaction gap processing (#1546)

Skip multiple gaps in one pass and write the correct transaction id to
datastore.
This commit is contained in:
Michael Muller 2022-03-07 13:26:18 -05:00 committed by GitHub
parent f382f63d00
commit 8395d17ba2
2 changed files with 24 additions and 16 deletions

View file

@ -127,12 +127,16 @@ public class ReplicateToDatastoreAction implements Runnable {
// Reload the last transaction id, which could possibly have changed.
LastSqlTransaction lastSqlTxn = LastSqlTransaction.load();
long nextTxnId = lastSqlTxn.getTransactionId() + 1;
if (nextTxnId < txnEntity.getId()) {
// Missing transaction id. This can happen normally. If a transaction gets
// rolled back, the sequence counter doesn't.
// Skip missing transactions. Missed transactions can happen normally. If a
// transaction gets rolled back, the sequence counter doesn't.
while (nextTxnId < txnEntity.getId()) {
logger.atWarning().log(
"Ignoring transaction %s, which does not exist.", nextTxnId);
} else if (nextTxnId > txnEntity.getId()) {
++nextTxnId;
}
if (nextTxnId > txnEntity.getId()) {
// We've already replayed this transaction. This shouldn't happen, as GAE cron
// is supposed to avoid overruns and this action shouldn't be executed from any
// other context, but it's not harmful as we can just ignore the transaction. Log

View file

@ -208,18 +208,20 @@ public class ReplicateToDatastoreActionTest {
TestObject foo = TestObject.create("foo");
insertInDb(foo);
// Fail during the transaction to delete it.
try {
jpaTm()
.transact(
() -> {
jpaTm().delete(foo.key());
// Explicitly save the transaction entity to force the id update.
jpaTm().insert(new TransactionEntity(new byte[] {1, 2, 3}));
throw new RuntimeException("fail!!!");
});
} catch (Exception e) {
logger.atInfo().log("Got expected exception.");
// Fail two transactions.
for (int i = 0; i < 2; ++i) {
try {
jpaTm()
.transact(
() -> {
jpaTm().delete(foo.key());
// Explicitly save the transaction entity to force the id update.
jpaTm().insert(new TransactionEntity(new byte[] {1, 2, 3}));
throw new RuntimeException("fail!!!");
});
} catch (Exception e) {
logger.atInfo().log("Got expected exception.");
}
}
TestObject bar = TestObject.create("bar");
@ -230,11 +232,13 @@ public class ReplicateToDatastoreActionTest {
assertThat(txns).hasSize(2);
for (TransactionEntity txn : txns) {
assertThat(txn.getId()).isNotEqualTo(2);
assertThat(txn.getId()).isNotEqualTo(3);
applyTransaction(txn);
}
assertThat(ofyTm().transact(() -> ofyTm().loadByKey(foo.key()))).isEqualTo(foo);
assertThat(ofyTm().transact(() -> ofyTm().loadByKey(bar.key()))).isEqualTo(bar);
assertThat(ofyTm().transact(() -> LastSqlTransaction.load()).getTransactionId()).isEqualTo(4);
}
@Test