mirror of
https://github.com/google/nomulus.git
synced 2025-07-25 20:18:34 +02:00
Fix issues with saving and deleting gap records (#1561)
* Fix issues with saving and deleting gap records Datastore limits us to mutating up to 25 records per transaction. We sometimes exceed that when deleting expired gap records. In addition, it is theoretically possible for us to accumulate enough continuous gap records to exceed this count while replaying the original transaction. Deal with deletion by breaking up the gap records to be deleted into a batch size that is small enough to be deleted transactionally (in practice, we don't much care about the transactionality but it doesn't seem like we can delete batches without it). Deal with the possibility of too many additions by always breaking out gap record storage and last transaction number updates into their own transaction(s) (separate from the replay of the original SQL transaction).
This commit is contained in:
parent
784d193e58
commit
2a2652c7f5
2 changed files with 150 additions and 57 deletions
|
@ -42,6 +42,7 @@ import google.registry.request.auth.Auth;
|
|||
import google.registry.util.Clock;
|
||||
import google.registry.util.RequestStatusChecker;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import javax.inject.Inject;
|
||||
|
@ -78,6 +79,14 @@ public class ReplicateToDatastoreAction implements Runnable {
|
|||
*/
|
||||
public static final long MAX_GAP_RETENTION_MILLIS = 300000;
|
||||
|
||||
/**
|
||||
* The maximum number of entitities to be mutated per transaction. For our purposes, the entities
|
||||
* that we're keeping track of are each in their own entity group. Per datastore documentation, we
|
||||
* should be allowed to update up to 25 of them. In practice, we get an error if we go beyond 24
|
||||
* (possibly due to something in our own infrastructure).
|
||||
*/
|
||||
private static final int MAX_ENTITIES_PER_TXN = 24;
|
||||
|
||||
public static final Duration REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH = standardHours(1);
|
||||
|
||||
private final Clock clock;
|
||||
|
@ -177,79 +186,128 @@ public class ReplicateToDatastoreAction implements Runnable {
|
|||
logger.atInfo().log("Applied missing transaction %s", txn.getId());
|
||||
});
|
||||
|
||||
// Clean up any gaps that have expired.
|
||||
// Clean up any gaps that have expired (in batches because they're each in their own entity
|
||||
// group).
|
||||
ArrayList<ReplayGap> gapBatch = new ArrayList<>();
|
||||
gaps.stream()
|
||||
.forEach(
|
||||
gap -> {
|
||||
if (now - gap.getTimestamp().getMillis() > MAX_GAP_RETENTION_MILLIS) {
|
||||
gapBatch.add(gap);
|
||||
}
|
||||
if (gapBatch.size() == MAX_ENTITIES_PER_TXN) {
|
||||
deleteReplayGaps(gapBatch);
|
||||
gapBatch.clear();
|
||||
}
|
||||
});
|
||||
if (!gapBatch.isEmpty()) {
|
||||
deleteReplayGaps(gapBatch);
|
||||
}
|
||||
}
|
||||
|
||||
private static void deleteReplayGaps(ArrayList<ReplayGap> gapsToDelete) {
|
||||
logger.atInfo().log(
|
||||
"deleting gap records for %s",
|
||||
gapsToDelete.stream().map(g -> g.getTransactionId()).collect(toImmutableList()));
|
||||
ofyTm()
|
||||
.transact(
|
||||
() ->
|
||||
gaps.stream()
|
||||
.forEach(
|
||||
gap -> {
|
||||
if (now - gap.getTimestamp().getMillis() > MAX_GAP_RETENTION_MILLIS) {
|
||||
auditedOfy().deleteIgnoringReadOnlyWithoutBackup().entity(gap);
|
||||
logger.atInfo().log("Removed expired gap %s", gap);
|
||||
}
|
||||
}));
|
||||
.transact(() -> auditedOfy().deleteIgnoringReadOnlyWithoutBackup().entities(gapsToDelete));
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply a transaction to Datastore, returns true if there was a fatal error and the batch should
|
||||
* be aborted.
|
||||
* Apply a transaction to Datastore.
|
||||
*
|
||||
* <p>Throws an exception if a fatal error occurred and the batch should be aborted
|
||||
* <p>Throws an exception if a fatal error occurred and the batch should be aborted.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static void applyTransaction(TransactionEntity txnEntity) {
|
||||
logger.atInfo().log("Applying a single transaction Cloud SQL -> Cloud Datastore.");
|
||||
boolean done = false;
|
||||
try (UpdateAutoTimestamp.DisableAutoUpdateResource disabler =
|
||||
UpdateAutoTimestamp.disableAutoUpdate()) {
|
||||
ofyTm()
|
||||
.transact(
|
||||
() -> {
|
||||
// Reload the last transaction id, which could possibly have changed.
|
||||
LastSqlTransaction lastSqlTxn = LastSqlTransaction.load();
|
||||
long nextTxnId = lastSqlTxn.getTransactionId() + 1;
|
||||
|
||||
// Skip missing transactions. Missed transactions can happen normally. If a
|
||||
// transaction gets rolled back, the sequence counter doesn't.
|
||||
while (nextTxnId < txnEntity.getId()) {
|
||||
logger.atWarning().log(
|
||||
"Ignoring transaction %s, which does not exist.", nextTxnId);
|
||||
auditedOfy()
|
||||
.saveIgnoringReadOnlyWithoutBackup()
|
||||
.entity(new ReplayGap(ofyTm().getTransactionTime(), nextTxnId));
|
||||
++nextTxnId;
|
||||
}
|
||||
// We put this in a do/while loop because we can potentially clean out some range of gaps
|
||||
// first and we want to do those in their own transaction so as not to run up the entity group
|
||||
// count. (This is a highly pathological case: consecutive gaps are rare, but the fact that
|
||||
// they can occur potentially increases the entity group count to beyond what we can
|
||||
// accommodate in a single transaction.)
|
||||
do {
|
||||
done =
|
||||
ofyTm()
|
||||
.transact(
|
||||
() -> {
|
||||
// Reload the last transaction id, which could possibly have changed.
|
||||
LastSqlTransaction lastSqlTxn = LastSqlTransaction.load();
|
||||
long nextTxnId = lastSqlTxn.getTransactionId() + 1;
|
||||
|
||||
if (nextTxnId > txnEntity.getId()) {
|
||||
// We've already replayed this transaction. This shouldn't happen, as GAE cron
|
||||
// is supposed to avoid overruns and this action shouldn't be executed from any
|
||||
// other context, but it's not harmful as we can just ignore the transaction. Log
|
||||
// it so that we know about it and move on.
|
||||
logger.atWarning().log(
|
||||
"Ignoring transaction %s, which appears to have already been applied.",
|
||||
txnEntity.getId());
|
||||
return;
|
||||
}
|
||||
// Skip missing transactions. Missed transactions can happen normally. If a
|
||||
// transaction gets rolled back, the sequence counter doesn't.
|
||||
int gapCount = 0;
|
||||
while (nextTxnId < txnEntity.getId()) {
|
||||
logger.atWarning().log(
|
||||
"Ignoring transaction %s, which does not exist.", nextTxnId);
|
||||
auditedOfy()
|
||||
.saveIgnoringReadOnlyWithoutBackup()
|
||||
.entity(new ReplayGap(ofyTm().getTransactionTime(), nextTxnId));
|
||||
++nextTxnId;
|
||||
|
||||
logger.atInfo().log(
|
||||
"Applying transaction %s to Cloud Datastore.", txnEntity.getId());
|
||||
// Don't exceed the entity group count trying to clean these up (we stop at
|
||||
// max
|
||||
// - 1 because we also want to save the lastSqlTransaction).
|
||||
if (++gapCount == MAX_ENTITIES_PER_TXN - 1) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// At this point, we know txnEntity is the correct next transaction, so write it
|
||||
// to Datastore.
|
||||
try {
|
||||
Transaction.deserialize(txnEntity.getContents()).writeToDatastore();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Error during transaction deserialization", e);
|
||||
}
|
||||
// Don't write gap records in the same transaction as the SQL transaction that
|
||||
// we're replaying. Return false to force us to go through and repeat in a
|
||||
// new
|
||||
// transaction.
|
||||
if (gapCount > 0) {
|
||||
// We haven't replayed the transaction, but we've determined that we can
|
||||
// ignore everything before it so update lastSqlTransaction accordingly.
|
||||
auditedOfy()
|
||||
.saveIgnoringReadOnlyWithoutBackup()
|
||||
.entity(lastSqlTxn.cloneWithNewTransactionId(nextTxnId - 1));
|
||||
return false;
|
||||
}
|
||||
|
||||
// Write the updated last transaction id to Datastore as part of this Datastore
|
||||
// transaction.
|
||||
auditedOfy()
|
||||
.saveIgnoringReadOnlyWithoutBackup()
|
||||
.entity(lastSqlTxn.cloneWithNewTransactionId(nextTxnId));
|
||||
logger.atInfo().log(
|
||||
"Finished applying single transaction Cloud SQL -> Cloud Datastore.");
|
||||
});
|
||||
if (nextTxnId > txnEntity.getId()) {
|
||||
// We've already replayed this transaction. This shouldn't happen, as GAE
|
||||
// cron
|
||||
// is supposed to avoid overruns and this action shouldn't be executed from
|
||||
// any
|
||||
// other context, but it's not harmful as we can just ignore the
|
||||
// transaction.
|
||||
// Log it so that we know about it and move on.
|
||||
logger.atWarning().log(
|
||||
"Ignoring transaction %s, which appears to have already been applied.",
|
||||
txnEntity.getId());
|
||||
return true;
|
||||
}
|
||||
|
||||
logger.atInfo().log(
|
||||
"Applying transaction %s to Cloud Datastore.", txnEntity.getId());
|
||||
|
||||
// At this point, we know txnEntity is the correct next transaction, so write
|
||||
// it
|
||||
// to Datastore.
|
||||
try {
|
||||
Transaction.deserialize(txnEntity.getContents()).writeToDatastore();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Error during transaction deserialization", e);
|
||||
}
|
||||
|
||||
// Write the updated last transaction id to Datastore as part of this
|
||||
// Datastore
|
||||
// transaction.
|
||||
auditedOfy()
|
||||
.saveIgnoringReadOnlyWithoutBackup()
|
||||
.entity(lastSqlTxn.cloneWithNewTransactionId(nextTxnId));
|
||||
logger.atInfo().log(
|
||||
"Finished applying single transaction Cloud SQL -> Cloud Datastore.");
|
||||
return true;
|
||||
});
|
||||
} while (!done);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -273,6 +273,41 @@ public class ReplicateToDatastoreActionTest {
|
|||
assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(gapKey).isPresent())).isFalse();
|
||||
}
|
||||
|
||||
/** Verify that we can handle creation and deletion of > 25 gap records. */
|
||||
@Test
|
||||
void testLargeNumberOfGaps() {
|
||||
// Fail thirty transactions.
|
||||
for (int i = 0; i < 30; ++i) {
|
||||
try {
|
||||
jpaTm()
|
||||
.transact(
|
||||
() -> {
|
||||
insertInDb(TestObject.create("foo"));
|
||||
// Explicitly save the transaction entity to force the id update.
|
||||
jpaTm().insert(new TransactionEntity(new byte[] {1, 2, 3}));
|
||||
throw new RuntimeException("fail!!!");
|
||||
});
|
||||
} catch (Exception e) {
|
||||
;
|
||||
}
|
||||
}
|
||||
|
||||
TestObject bar = TestObject.create("bar");
|
||||
insertInDb(bar);
|
||||
|
||||
// Verify that the transaction was successfully applied and that we have generated 30 gap
|
||||
// records.
|
||||
action.run();
|
||||
Truth8.assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(bar.key()))).isPresent();
|
||||
assertThat(ofyTm().loadAllOf(ReplayGap.class).size()).isEqualTo(30);
|
||||
|
||||
// Verify that we can clean up this many gap records after expiration.
|
||||
fakeClock.advanceBy(Duration.millis(ReplicateToDatastoreAction.MAX_GAP_RETENTION_MILLIS + 1));
|
||||
resetAction();
|
||||
action.run();
|
||||
assertThat(ofyTm().loadAllOf(ReplayGap.class).size()).isEqualTo(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGapRecordExpiration() {
|
||||
insertInDb(TestObject.create("foo"));
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue