diff --git a/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java b/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java index f3836560e..4f739d912 100644 --- a/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java +++ b/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java @@ -42,6 +42,7 @@ import google.registry.request.auth.Auth; import google.registry.util.Clock; import google.registry.util.RequestStatusChecker; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import javax.inject.Inject; @@ -78,6 +79,14 @@ public class ReplicateToDatastoreAction implements Runnable { */ public static final long MAX_GAP_RETENTION_MILLIS = 300000; + /** + * The maximum number of entitities to be mutated per transaction. For our purposes, the entities + * that we're keeping track of are each in their own entity group. Per datastore documentation, we + * should be allowed to update up to 25 of them. In practice, we get an error if we go beyond 24 + * (possibly due to something in our own infrastructure). + */ + private static final int MAX_ENTITIES_PER_TXN = 24; + public static final Duration REPLICATE_TO_DATASTORE_LOCK_LEASE_LENGTH = standardHours(1); private final Clock clock; @@ -177,79 +186,128 @@ public class ReplicateToDatastoreAction implements Runnable { logger.atInfo().log("Applied missing transaction %s", txn.getId()); }); - // Clean up any gaps that have expired. + // Clean up any gaps that have expired (in batches because they're each in their own entity + // group). + ArrayList gapBatch = new ArrayList<>(); + gaps.stream() + .forEach( + gap -> { + if (now - gap.getTimestamp().getMillis() > MAX_GAP_RETENTION_MILLIS) { + gapBatch.add(gap); + } + if (gapBatch.size() == MAX_ENTITIES_PER_TXN) { + deleteReplayGaps(gapBatch); + gapBatch.clear(); + } + }); + if (!gapBatch.isEmpty()) { + deleteReplayGaps(gapBatch); + } + } + + private static void deleteReplayGaps(ArrayList gapsToDelete) { + logger.atInfo().log( + "deleting gap records for %s", + gapsToDelete.stream().map(g -> g.getTransactionId()).collect(toImmutableList())); ofyTm() - .transact( - () -> - gaps.stream() - .forEach( - gap -> { - if (now - gap.getTimestamp().getMillis() > MAX_GAP_RETENTION_MILLIS) { - auditedOfy().deleteIgnoringReadOnlyWithoutBackup().entity(gap); - logger.atInfo().log("Removed expired gap %s", gap); - } - })); + .transact(() -> auditedOfy().deleteIgnoringReadOnlyWithoutBackup().entities(gapsToDelete)); } /** - * Apply a transaction to Datastore, returns true if there was a fatal error and the batch should - * be aborted. + * Apply a transaction to Datastore. * - *

Throws an exception if a fatal error occurred and the batch should be aborted + *

Throws an exception if a fatal error occurred and the batch should be aborted. */ @VisibleForTesting public static void applyTransaction(TransactionEntity txnEntity) { logger.atInfo().log("Applying a single transaction Cloud SQL -> Cloud Datastore."); + boolean done = false; try (UpdateAutoTimestamp.DisableAutoUpdateResource disabler = UpdateAutoTimestamp.disableAutoUpdate()) { - ofyTm() - .transact( - () -> { - // Reload the last transaction id, which could possibly have changed. - LastSqlTransaction lastSqlTxn = LastSqlTransaction.load(); - long nextTxnId = lastSqlTxn.getTransactionId() + 1; - // Skip missing transactions. Missed transactions can happen normally. If a - // transaction gets rolled back, the sequence counter doesn't. - while (nextTxnId < txnEntity.getId()) { - logger.atWarning().log( - "Ignoring transaction %s, which does not exist.", nextTxnId); - auditedOfy() - .saveIgnoringReadOnlyWithoutBackup() - .entity(new ReplayGap(ofyTm().getTransactionTime(), nextTxnId)); - ++nextTxnId; - } + // We put this in a do/while loop because we can potentially clean out some range of gaps + // first and we want to do those in their own transaction so as not to run up the entity group + // count. (This is a highly pathological case: consecutive gaps are rare, but the fact that + // they can occur potentially increases the entity group count to beyond what we can + // accommodate in a single transaction.) + do { + done = + ofyTm() + .transact( + () -> { + // Reload the last transaction id, which could possibly have changed. + LastSqlTransaction lastSqlTxn = LastSqlTransaction.load(); + long nextTxnId = lastSqlTxn.getTransactionId() + 1; - if (nextTxnId > txnEntity.getId()) { - // We've already replayed this transaction. This shouldn't happen, as GAE cron - // is supposed to avoid overruns and this action shouldn't be executed from any - // other context, but it's not harmful as we can just ignore the transaction. Log - // it so that we know about it and move on. - logger.atWarning().log( - "Ignoring transaction %s, which appears to have already been applied.", - txnEntity.getId()); - return; - } + // Skip missing transactions. Missed transactions can happen normally. If a + // transaction gets rolled back, the sequence counter doesn't. + int gapCount = 0; + while (nextTxnId < txnEntity.getId()) { + logger.atWarning().log( + "Ignoring transaction %s, which does not exist.", nextTxnId); + auditedOfy() + .saveIgnoringReadOnlyWithoutBackup() + .entity(new ReplayGap(ofyTm().getTransactionTime(), nextTxnId)); + ++nextTxnId; - logger.atInfo().log( - "Applying transaction %s to Cloud Datastore.", txnEntity.getId()); + // Don't exceed the entity group count trying to clean these up (we stop at + // max + // - 1 because we also want to save the lastSqlTransaction). + if (++gapCount == MAX_ENTITIES_PER_TXN - 1) { + break; + } + } - // At this point, we know txnEntity is the correct next transaction, so write it - // to Datastore. - try { - Transaction.deserialize(txnEntity.getContents()).writeToDatastore(); - } catch (IOException e) { - throw new RuntimeException("Error during transaction deserialization", e); - } + // Don't write gap records in the same transaction as the SQL transaction that + // we're replaying. Return false to force us to go through and repeat in a + // new + // transaction. + if (gapCount > 0) { + // We haven't replayed the transaction, but we've determined that we can + // ignore everything before it so update lastSqlTransaction accordingly. + auditedOfy() + .saveIgnoringReadOnlyWithoutBackup() + .entity(lastSqlTxn.cloneWithNewTransactionId(nextTxnId - 1)); + return false; + } - // Write the updated last transaction id to Datastore as part of this Datastore - // transaction. - auditedOfy() - .saveIgnoringReadOnlyWithoutBackup() - .entity(lastSqlTxn.cloneWithNewTransactionId(nextTxnId)); - logger.atInfo().log( - "Finished applying single transaction Cloud SQL -> Cloud Datastore."); - }); + if (nextTxnId > txnEntity.getId()) { + // We've already replayed this transaction. This shouldn't happen, as GAE + // cron + // is supposed to avoid overruns and this action shouldn't be executed from + // any + // other context, but it's not harmful as we can just ignore the + // transaction. + // Log it so that we know about it and move on. + logger.atWarning().log( + "Ignoring transaction %s, which appears to have already been applied.", + txnEntity.getId()); + return true; + } + + logger.atInfo().log( + "Applying transaction %s to Cloud Datastore.", txnEntity.getId()); + + // At this point, we know txnEntity is the correct next transaction, so write + // it + // to Datastore. + try { + Transaction.deserialize(txnEntity.getContents()).writeToDatastore(); + } catch (IOException e) { + throw new RuntimeException("Error during transaction deserialization", e); + } + + // Write the updated last transaction id to Datastore as part of this + // Datastore + // transaction. + auditedOfy() + .saveIgnoringReadOnlyWithoutBackup() + .entity(lastSqlTxn.cloneWithNewTransactionId(nextTxnId)); + logger.atInfo().log( + "Finished applying single transaction Cloud SQL -> Cloud Datastore."); + return true; + }); + } while (!done); } } diff --git a/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java b/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java index 9da37e895..23aa77e2f 100644 --- a/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java +++ b/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java @@ -273,6 +273,41 @@ public class ReplicateToDatastoreActionTest { assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(gapKey).isPresent())).isFalse(); } + /** Verify that we can handle creation and deletion of > 25 gap records. */ + @Test + void testLargeNumberOfGaps() { + // Fail thirty transactions. + for (int i = 0; i < 30; ++i) { + try { + jpaTm() + .transact( + () -> { + insertInDb(TestObject.create("foo")); + // Explicitly save the transaction entity to force the id update. + jpaTm().insert(new TransactionEntity(new byte[] {1, 2, 3})); + throw new RuntimeException("fail!!!"); + }); + } catch (Exception e) { + ; + } + } + + TestObject bar = TestObject.create("bar"); + insertInDb(bar); + + // Verify that the transaction was successfully applied and that we have generated 30 gap + // records. + action.run(); + Truth8.assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(bar.key()))).isPresent(); + assertThat(ofyTm().loadAllOf(ReplayGap.class).size()).isEqualTo(30); + + // Verify that we can clean up this many gap records after expiration. + fakeClock.advanceBy(Duration.millis(ReplicateToDatastoreAction.MAX_GAP_RETENTION_MILLIS + 1)); + resetAction(); + action.run(); + assertThat(ofyTm().loadAllOf(ReplayGap.class).size()).isEqualTo(0); + } + @Test void testGapRecordExpiration() { insertInDb(TestObject.create("foo"));