diff --git a/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java b/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java index deba20cd6..76a110830 100644 --- a/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java +++ b/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java @@ -18,6 +18,10 @@ import static google.registry.model.ofy.ObjectifyService.auditedOfy; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; import static google.registry.request.Action.Method.GET; +import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; +import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; +import static javax.servlet.http.HttpServletResponse.SC_OK; +import static org.joda.time.Duration.standardHours; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -26,15 +30,20 @@ import google.registry.model.UpdateAutoTimestamp; import google.registry.model.common.DatabaseMigrationStateSchedule; import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState; import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection; +import google.registry.model.server.Lock; import google.registry.persistence.transaction.Transaction; import google.registry.persistence.transaction.TransactionEntity; import google.registry.request.Action; +import google.registry.request.Response; import google.registry.request.auth.Auth; import google.registry.util.Clock; +import google.registry.util.RequestStatusChecker; import java.io.IOException; import java.util.List; +import java.util.Optional; import javax.inject.Inject; import javax.persistence.NoResultException; +import org.joda.time.Duration; /** Cron task to replicate from Cloud SQL to datastore. */ @Action( @@ -55,11 +64,18 @@ public class ReplicateToDatastoreAction implements Runnable { */ public static final int BATCH_SIZE = 200; + private static final Duration LEASE_LENGTH = standardHours(1); + private final Clock clock; + private final RequestStatusChecker requestStatusChecker; + private final Response response; @Inject - public ReplicateToDatastoreAction(Clock clock) { + public ReplicateToDatastoreAction( + Clock clock, RequestStatusChecker requestStatusChecker, Response response) { this.clock = clock; + this.requestStatusChecker = requestStatusChecker; + this.response = response; } @VisibleForTesting @@ -143,24 +159,55 @@ public class ReplicateToDatastoreAction implements Runnable { public void run() { MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc()); if (!state.getReplayDirection().equals(ReplayDirection.SQL_TO_DATASTORE)) { - logger.atInfo().log( - "Skipping ReplicateToDatastoreAction because we are in migration phase %s.", state); + String message = + String.format( + "Skipping ReplicateToDatastoreAction because we are in migration phase %s.", state); + logger.atInfo().log(message); + // App Engine will retry on any non-2xx status code, which we don't want in this case. + response.setStatus(SC_NO_CONTENT); + response.setPayload(message); return; } - // TODO(b/181758163): Deal with objects that don't exist in Cloud SQL, e.g. ForeignKeyIndex, - // EppResourceIndex. - logger.atInfo().log("Processing transaction replay batch Cloud SQL -> Cloud Datastore"); - int numTransactionsReplayed = 0; - for (TransactionEntity txnEntity : getTransactionBatch()) { - try { - applyTransaction(txnEntity); - } catch (Throwable t) { - logger.atSevere().withCause(t).log("Errored out replaying files"); - return; - } - numTransactionsReplayed++; + Optional lock = + Lock.acquire( + this.getClass().getSimpleName(), null, LEASE_LENGTH, requestStatusChecker, false); + if (!lock.isPresent()) { + String message = "Can't acquire ReplicateToDatastoreAction lock, aborting."; + logger.atSevere().log(message); + // App Engine will retry on any non-2xx status code, which we don't want in this case. + response.setStatus(SC_NO_CONTENT); + response.setPayload(message); + return; } - logger.atInfo().log( - "Replayed %d transactions from Cloud SQL -> Datastore", numTransactionsReplayed); + try { + logger.atInfo().log("Processing transaction replay batch Cloud SQL -> Cloud Datastore"); + int numTransactionsReplayed = replayAllTransactions(); + String resultMessage = + String.format( + "Replayed %d transaction(s) from Cloud SQL -> Datastore", numTransactionsReplayed); + logger.atInfo().log(resultMessage); + response.setPayload(resultMessage); + response.setStatus(SC_OK); + } catch (Throwable t) { + String message = "Errored out replaying files"; + logger.atSevere().withCause(t).log(message); + response.setStatus(SC_INTERNAL_SERVER_ERROR); + response.setPayload(message); + } finally { + lock.ifPresent(Lock::release); + } + } + + private int replayAllTransactions() { + int numTransactionsReplayed = 0; + List transactionBatch; + do { + transactionBatch = getTransactionBatch(); + for (TransactionEntity transaction : transactionBatch) { + applyTransaction(transaction); + numTransactionsReplayed++; + } + } while (!transactionBatch.isEmpty()); + return numTransactionsReplayed; } } diff --git a/core/src/main/java/google/registry/model/server/Lock.java b/core/src/main/java/google/registry/model/server/Lock.java index b3b3169f4..1e4e7ac7e 100644 --- a/core/src/main/java/google/registry/model/server/Lock.java +++ b/core/src/main/java/google/registry/model/server/Lock.java @@ -15,6 +15,7 @@ package google.registry.model.server; import static com.google.common.base.Preconditions.checkArgument; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import static google.registry.util.DateTimeUtils.isAtOrAfter; import static google.registry.util.PreconditionsUtils.checkArgumentNotNull; @@ -35,6 +36,7 @@ import google.registry.util.RequestStatusChecker; import google.registry.util.RequestStatusCheckerImpl; import java.io.Serializable; import java.util.Optional; +import java.util.function.Supplier; import javax.annotation.Nullable; import javax.persistence.Column; import javax.persistence.IdClass; @@ -215,45 +217,45 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri // It's important to use transactNew rather than transact, because a Lock can be used to control // access to resources like GCS that can't be transactionally rolled back. Therefore, the lock // must be definitively acquired before it is used, even when called inside another transaction. + Supplier lockAcquirer = + () -> { + DateTime now = tm().getTransactionTime(); + + // Checking if an unexpired lock still exists - if so, the lock can't be acquired. + Lock lock = + tm().loadByKeyIfPresent( + VKey.create( + Lock.class, + new LockId(resourceName, scope), + Key.create(Lock.class, lockId))) + .orElse(null); + if (lock != null) { + logger.atInfo().log( + "Loaded existing lock: %s for request: %s", lock.lockId, lock.requestLogId); + } + LockState lockState; + if (lock == null) { + lockState = LockState.FREE; + } else if (isAtOrAfter(now, lock.expirationTime)) { + lockState = LockState.TIMED_OUT; + } else if (checkThreadRunning && !requestStatusChecker.isRunning(lock.requestLogId)) { + lockState = LockState.OWNER_DIED; + } else { + lockState = LockState.IN_USE; + return AcquireResult.create(now, lock, null, lockState); + } + + Lock newLock = + create(resourceName, scope, requestStatusChecker.getLogId(), now, leaseLength); + // Locks are not parented under an EntityGroupRoot (so as to avoid write + // contention) and don't need to be backed up. + tm().putIgnoringReadOnly(newLock); + + return AcquireResult.create(now, lock, newLock, lockState); + }; + // In ofy, backup is determined per-action, but in SQL it's determined per-transaction AcquireResult acquireResult = - tm().transactNew( - () -> { - DateTime now = tm().getTransactionTime(); - - // Checking if an unexpired lock still exists - if so, the lock can't be acquired. - Lock lock = - tm().loadByKeyIfPresent( - VKey.create( - Lock.class, - new LockId(resourceName, scope), - Key.create(Lock.class, lockId))) - .orElse(null); - if (lock != null) { - logger.atInfo().log( - "Loaded existing lock: %s for request: %s", lock.lockId, lock.requestLogId); - } - LockState lockState; - if (lock == null) { - lockState = LockState.FREE; - } else if (isAtOrAfter(now, lock.expirationTime)) { - lockState = LockState.TIMED_OUT; - } else if (checkThreadRunning - && !requestStatusChecker.isRunning(lock.requestLogId)) { - lockState = LockState.OWNER_DIED; - } else { - lockState = LockState.IN_USE; - return AcquireResult.create(now, lock, null, lockState); - } - - Lock newLock = - create( - resourceName, scope, requestStatusChecker.getLogId(), now, leaseLength); - // Locks are not parented under an EntityGroupRoot (so as to avoid write - // contention) and don't need to be backed up. - tm().putIgnoringReadOnly(newLock); - - return AcquireResult.create(now, lock, newLock, lockState); - }); + tm().isOfy() ? tm().transactNew(lockAcquirer) : jpaTm().transactWithoutBackup(lockAcquirer); logAcquireResult(acquireResult); lockMetrics.recordAcquire(resourceName, scope, acquireResult.lockState()); @@ -263,34 +265,41 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri /** Release the lock. */ public void release() { // Just use the default clock because we aren't actually doing anything that will use the clock. - tm().transact( - () -> { - // To release a lock, check that no one else has already obtained it and if not - // delete it. If the lock in Datastore was different then this lock is gone already; - // this can happen if release() is called around the expiration time and the lock - // expires underneath us. - VKey key = - VKey.create( - Lock.class, new LockId(resourceName, tld), Key.create(Lock.class, lockId)); - Lock loadedLock = tm().loadByKeyIfPresent(key).orElse(null); - if (Lock.this.equals(loadedLock)) { - // Use deleteWithoutBackup() so that we don't create a commit log entry for deleting - // the lock. - logger.atInfo().log("Deleting lock: %s", lockId); - tm().deleteIgnoringReadOnly(key); + Supplier lockReleaser = + () -> { + // To release a lock, check that no one else has already obtained it and if not + // delete it. If the lock in Datastore was different then this lock is gone already; + // this can happen if release() is called around the expiration time and the lock + // expires underneath us. + VKey key = + VKey.create( + Lock.class, new LockId(resourceName, tld), Key.create(Lock.class, lockId)); + Lock loadedLock = tm().loadByKeyIfPresent(key).orElse(null); + if (Lock.this.equals(loadedLock)) { + // Use deleteIgnoringReadOnly() so that we don't create a commit log entry for deleting + // the lock. + logger.atInfo().log("Deleting lock: %s", lockId); + tm().deleteIgnoringReadOnly(key); - lockMetrics.recordRelease( - resourceName, tld, new Duration(acquiredTime, tm().getTransactionTime())); - } else { - logger.atSevere().log( - "The lock we acquired was transferred to someone else before we" - + " released it! Did action take longer than lease length?" - + " Our lock: %s, current lock: %s", - Lock.this, loadedLock); - logger.atInfo().log( - "Not deleting lock: %s - someone else has it: %s", lockId, loadedLock); - } - }); + lockMetrics.recordRelease( + resourceName, tld, new Duration(acquiredTime, tm().getTransactionTime())); + } else { + logger.atSevere().log( + "The lock we acquired was transferred to someone else before we" + + " released it! Did action take longer than lease length?" + + " Our lock: %s, current lock: %s", + Lock.this, loadedLock); + logger.atInfo().log( + "Not deleting lock: %s - someone else has it: %s", lockId, loadedLock); + } + return null; + }; + // In ofy, backup is determined per-action, but in SQL it's determined per-transaction + if (tm().isOfy()) { + tm().transact(lockReleaser); + } else { + jpaTm().transactWithoutBackup(lockReleaser); + } } static class LockId extends ImmutableObject implements Serializable { diff --git a/core/src/main/java/google/registry/persistence/transaction/Transaction.java b/core/src/main/java/google/registry/persistence/transaction/Transaction.java index 4c1ace197..aef856dad 100644 --- a/core/src/main/java/google/registry/persistence/transaction/Transaction.java +++ b/core/src/main/java/google/registry/persistence/transaction/Transaction.java @@ -242,7 +242,7 @@ public class Transaction extends ImmutableObject implements Buildable { if (entity instanceof DatastoreEntity) { ((DatastoreEntity) entity).beforeDatastoreSaveOnReplay(); } - ofyTm().put(entity); + ofyTm().putIgnoringReadOnly(entity); } @Override @@ -280,7 +280,7 @@ public class Transaction extends ImmutableObject implements Buildable { @Override public void writeToDatastore() { - ofyTm().delete(key); + ofyTm().deleteIgnoringReadOnly(key); } @Override diff --git a/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java b/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java index 625c7d23d..05d04a1d9 100644 --- a/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java +++ b/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java @@ -20,21 +20,30 @@ import static google.registry.persistence.transaction.TransactionManagerFactory. import static google.registry.testing.DatabaseHelper.insertInDb; import static google.registry.testing.LogsSubject.assertAboutLogs; import static google.registry.util.DateTimeUtils.START_OF_TIME; +import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; +import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; +import static javax.servlet.http.HttpServletResponse.SC_OK; import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableSortedMap; import com.google.common.testing.TestLogHandler; +import com.google.common.truth.Truth8; import google.registry.model.common.DatabaseMigrationStateSchedule; import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState; import google.registry.model.ofy.CommitLogBucket; import google.registry.model.ofy.Ofy; +import google.registry.model.server.Lock; import google.registry.persistence.transaction.TransactionEntity; import google.registry.testing.AppEngineExtension; import google.registry.testing.DatabaseHelper; import google.registry.testing.FakeClock; +import google.registry.testing.FakeResponse; import google.registry.testing.InjectExtension; import google.registry.testing.TestObject; +import google.registry.util.RequestStatusChecker; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -54,18 +63,20 @@ public class ReplicateToDatastoreActionTest { public final AppEngineExtension appEngine = AppEngineExtension.builder() .withDatastoreAndCloudSql() - .withOfyTestEntities(TestObject.class) - .withJpaUnitTestEntities(TestObject.class) + .withOfyTestEntities(Lock.class, TestObject.class) + .withJpaUnitTestEntities(Lock.class, TestObject.class) .withClock(fakeClock) .build(); @RegisterExtension final InjectExtension injectExtension = new InjectExtension(); - private final ReplicateToDatastoreAction task = new ReplicateToDatastoreAction(fakeClock); private final TestLogHandler logHandler = new TestLogHandler(); + private ReplicateToDatastoreAction action; + private FakeResponse response; @BeforeEach void setUp() { + resetAction(); injectExtension.setStaticField(Ofy.class, "clock", fakeClock); // Use a single bucket to expose timestamp inversion problems. injectExtension.setStaticField( @@ -95,7 +106,7 @@ public class ReplicateToDatastoreActionTest { jpaTm().insert(foo); jpaTm().insert(bar); }); - task.run(); + runAndVerifySuccess(); assertThat(ofyTm().transact(() -> ofyTm().loadByKey(foo.key()))).isEqualTo(foo); assertThat(ofyTm().transact(() -> ofyTm().loadByKey(bar.key()))).isEqualTo(bar); @@ -107,7 +118,7 @@ public class ReplicateToDatastoreActionTest { jpaTm().delete(bar.key()); jpaTm().insert(baz); }); - task.run(); + runAndVerifySuccess(); assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(bar.key()).isPresent())).isFalse(); assertThat(ofyTm().transact(() -> ofyTm().loadByKey(baz.key()))).isEqualTo(baz); @@ -120,7 +131,7 @@ public class ReplicateToDatastoreActionTest { // Write a transaction containing "foo". insertInDb(foo); - task.run(); + runAndVerifySuccess(); // Verify that it propagated to datastore, then remove "foo" directly from datastore. assertThat(ofyTm().transact(() -> ofyTm().loadByKey(foo.key()))).isEqualTo(foo); @@ -128,7 +139,7 @@ public class ReplicateToDatastoreActionTest { // Write "bar" insertInDb(bar); - task.run(); + runAndVerifySuccess(); // If we replayed only the most recent transaction, we should have "bar" but not "foo". assertThat(ofyTm().transact(() -> ofyTm().loadByKey(bar.key()))).isEqualTo(bar); @@ -142,23 +153,23 @@ public class ReplicateToDatastoreActionTest { // Write a transaction and run just the batch fetch. insertInDb(foo); - List txns1 = task.getTransactionBatch(); + List txns1 = action.getTransactionBatch(); assertThat(txns1).hasSize(1); // Write a second transaction and do another batch fetch. insertInDb(bar); - List txns2 = task.getTransactionBatch(); + List txns2 = action.getTransactionBatch(); assertThat(txns2).hasSize(2); // Apply the first batch. - task.applyTransaction(txns1.get(0)); + action.applyTransaction(txns1.get(0)); // Remove the foo record so we can ensure that this transaction doesn't get doublle-played. ofyTm().transact(() -> ofyTm().delete(foo.key())); // Apply the second batch. for (TransactionEntity txn : txns2) { - task.applyTransaction(txn); + action.applyTransaction(txn); } // Verify that the first transaction didn't get replayed but the second one did. @@ -179,9 +190,10 @@ public class ReplicateToDatastoreActionTest { // Force the last transaction id back to -1 so that we look for transaction 0. ofyTm().transact(() -> ofyTm().insert(new LastSqlTransaction(-1))); - List txns = task.getTransactionBatch(); + List txns = action.getTransactionBatch(); assertThat(txns).hasSize(1); - assertThat(assertThrows(IllegalStateException.class, () -> task.applyTransaction(txns.get(0)))) + assertThat( + assertThrows(IllegalStateException.class, () -> action.applyTransaction(txns.get(0)))) .hasMessageThat() .isEqualTo("Missing transaction: last txn id = -1, next available txn = 1"); } @@ -194,19 +206,21 @@ public class ReplicateToDatastoreActionTest { // Force the last transaction id back to -1 so that we look for transaction 0. ofyTm().transact(() -> ofyTm().insert(new LastSqlTransaction(-1))); - task.run(); + action.run(); assertAboutLogs() .that(logHandler) .hasSevereLogWithCause( new IllegalStateException( "Missing transaction: last txn id = -1, next available txn = 1")); + assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR); + assertThat(response.getPayload()).isEqualTo("Errored out replaying files"); } @Test void testBeforeDatastoreSaveCallback() { TestObject testObject = TestObject.create("foo"); jpaTm().transact(() -> jpaTm().put(testObject)); - task.run(); + action.run(); assertThat(ofyTm().loadAllOf(TestObject.class)).containsExactly(testObject); assertThat(TestObject.beforeDatastoreSaveCallCount).isEqualTo(1); } @@ -231,7 +245,7 @@ public class ReplicateToDatastoreActionTest { fakeClock.advanceBy(Duration.standardDays(1)); insertInDb(TestObject.create("foo")); - task.run(); + action.run(); // Replication shouldn't have happened assertThat(ofyTm().loadAllOf(TestObject.class)).isEmpty(); assertAboutLogs() @@ -240,5 +254,46 @@ public class ReplicateToDatastoreActionTest { Level.INFO, "Skipping ReplicateToDatastoreAction because we are in migration phase " + "DATASTORE_PRIMARY."); + assertThat(response.getStatus()).isEqualTo(SC_NO_CONTENT); + assertThat(response.getPayload()) + .isEqualTo( + "Skipping ReplicateToDatastoreAction because we are in migration phase" + + " DATASTORE_PRIMARY."); + } + + @Test + void testFailure_cannotAcquireLock() { + RequestStatusChecker requestStatusChecker = mock(RequestStatusChecker.class); + when(requestStatusChecker.getLogId()).thenReturn("logId"); + Truth8.assertThat( + Lock.acquire( + ReplicateToDatastoreAction.class.getSimpleName(), + null, + Duration.standardHours(1), + requestStatusChecker, + false)) + .isPresent(); + fakeClock.advanceOneMilli(); + + resetAction(); + action.run(); + assertThat(response.getStatus()).isEqualTo(SC_NO_CONTENT); + assertThat(response.getPayload()) + .isEqualTo("Can't acquire ReplicateToDatastoreAction lock, aborting."); + } + + private void runAndVerifySuccess() { + resetAction(); + action.run(); + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getPayload()) + .isEqualTo("Replayed 1 transaction(s) from Cloud SQL -> Datastore"); + } + + private void resetAction() { + response = new FakeResponse(); + RequestStatusChecker requestStatusChecker = mock(RequestStatusChecker.class); + when(requestStatusChecker.getLogId()).thenReturn("logId"); + action = new ReplicateToDatastoreAction(fakeClock, requestStatusChecker, response); } } diff --git a/core/src/test/java/google/registry/testing/ReplayExtension.java b/core/src/test/java/google/registry/testing/ReplayExtension.java index 425164b86..893df2ab3 100644 --- a/core/src/test/java/google/registry/testing/ReplayExtension.java +++ b/core/src/test/java/google/registry/testing/ReplayExtension.java @@ -38,6 +38,7 @@ import google.registry.persistence.transaction.Transaction.Delete; import google.registry.persistence.transaction.Transaction.Mutation; import google.registry.persistence.transaction.Transaction.Update; import google.registry.persistence.transaction.TransactionEntity; +import google.registry.util.RequestStatusChecker; import java.io.IOException; import java.util.List; import java.util.Optional; @@ -45,6 +46,7 @@ import javax.annotation.Nullable; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; +import org.mockito.Mockito; /** * A JUnit extension that replays datastore transactions against postgresql. @@ -81,7 +83,11 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback { * Create a replay extension that replays from SQL to cloud datastore when running in SQL mode. */ public static ReplayExtension createWithDoubleReplay(FakeClock clock) { - return new ReplayExtension(clock, true, new ReplicateToDatastoreAction(clock)); + return new ReplayExtension( + clock, + true, + new ReplicateToDatastoreAction( + clock, Mockito.mock(RequestStatusChecker.class), new FakeResponse())); } @Override