Add locking and a response in ReplicateToDatastoreAction (#1328)

* Add locking and a response in ReplicateToDatastoreAction

The response is necessary to get nicer logs in GAE and nicer cron job
behavior.

In addition:
- fix issues where locks would be backed up and replayed to Datastore
(they shouldn't be replayed)
- do ignore-read-only writes when replaying the transactions
This commit is contained in:
gbrodman 2021-09-21 10:12:27 -04:00 committed by GitHub
parent 0afef0fb82
commit 42012f6cce
5 changed files with 218 additions and 101 deletions

View file

@ -18,6 +18,10 @@ import static google.registry.model.ofy.ObjectifyService.auditedOfy;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
import static google.registry.request.Action.Method.GET;
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import static org.joda.time.Duration.standardHours;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
@ -26,15 +30,20 @@ import google.registry.model.UpdateAutoTimestamp;
import google.registry.model.common.DatabaseMigrationStateSchedule;
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection;
import google.registry.model.server.Lock;
import google.registry.persistence.transaction.Transaction;
import google.registry.persistence.transaction.TransactionEntity;
import google.registry.request.Action;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import google.registry.util.RequestStatusChecker;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import javax.inject.Inject;
import javax.persistence.NoResultException;
import org.joda.time.Duration;
/** Cron task to replicate from Cloud SQL to datastore. */
@Action(
@ -55,11 +64,18 @@ public class ReplicateToDatastoreAction implements Runnable {
*/
public static final int BATCH_SIZE = 200;
private static final Duration LEASE_LENGTH = standardHours(1);
private final Clock clock;
private final RequestStatusChecker requestStatusChecker;
private final Response response;
@Inject
public ReplicateToDatastoreAction(Clock clock) {
public ReplicateToDatastoreAction(
Clock clock, RequestStatusChecker requestStatusChecker, Response response) {
this.clock = clock;
this.requestStatusChecker = requestStatusChecker;
this.response = response;
}
@VisibleForTesting
@ -143,24 +159,55 @@ public class ReplicateToDatastoreAction implements Runnable {
public void run() {
MigrationState state = DatabaseMigrationStateSchedule.getValueAtTime(clock.nowUtc());
if (!state.getReplayDirection().equals(ReplayDirection.SQL_TO_DATASTORE)) {
logger.atInfo().log(
String message =
String.format(
"Skipping ReplicateToDatastoreAction because we are in migration phase %s.", state);
logger.atInfo().log(message);
// App Engine will retry on any non-2xx status code, which we don't want in this case.
response.setStatus(SC_NO_CONTENT);
response.setPayload(message);
return;
}
Optional<Lock> lock =
Lock.acquire(
this.getClass().getSimpleName(), null, LEASE_LENGTH, requestStatusChecker, false);
if (!lock.isPresent()) {
String message = "Can't acquire ReplicateToDatastoreAction lock, aborting.";
logger.atSevere().log(message);
// App Engine will retry on any non-2xx status code, which we don't want in this case.
response.setStatus(SC_NO_CONTENT);
response.setPayload(message);
return;
}
// TODO(b/181758163): Deal with objects that don't exist in Cloud SQL, e.g. ForeignKeyIndex,
// EppResourceIndex.
logger.atInfo().log("Processing transaction replay batch Cloud SQL -> Cloud Datastore");
int numTransactionsReplayed = 0;
for (TransactionEntity txnEntity : getTransactionBatch()) {
try {
applyTransaction(txnEntity);
logger.atInfo().log("Processing transaction replay batch Cloud SQL -> Cloud Datastore");
int numTransactionsReplayed = replayAllTransactions();
String resultMessage =
String.format(
"Replayed %d transaction(s) from Cloud SQL -> Datastore", numTransactionsReplayed);
logger.atInfo().log(resultMessage);
response.setPayload(resultMessage);
response.setStatus(SC_OK);
} catch (Throwable t) {
logger.atSevere().withCause(t).log("Errored out replaying files");
return;
String message = "Errored out replaying files";
logger.atSevere().withCause(t).log(message);
response.setStatus(SC_INTERNAL_SERVER_ERROR);
response.setPayload(message);
} finally {
lock.ifPresent(Lock::release);
}
}
private int replayAllTransactions() {
int numTransactionsReplayed = 0;
List<TransactionEntity> transactionBatch;
do {
transactionBatch = getTransactionBatch();
for (TransactionEntity transaction : transactionBatch) {
applyTransaction(transaction);
numTransactionsReplayed++;
}
logger.atInfo().log(
"Replayed %d transactions from Cloud SQL -> Datastore", numTransactionsReplayed);
} while (!transactionBatch.isEmpty());
return numTransactionsReplayed;
}
}

View file

@ -15,6 +15,7 @@
package google.registry.model.server;
import static com.google.common.base.Preconditions.checkArgument;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.util.DateTimeUtils.isAtOrAfter;
import static google.registry.util.PreconditionsUtils.checkArgumentNotNull;
@ -35,6 +36,7 @@ import google.registry.util.RequestStatusChecker;
import google.registry.util.RequestStatusCheckerImpl;
import java.io.Serializable;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.persistence.Column;
import javax.persistence.IdClass;
@ -215,8 +217,7 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri
// It's important to use transactNew rather than transact, because a Lock can be used to control
// access to resources like GCS that can't be transactionally rolled back. Therefore, the lock
// must be definitively acquired before it is used, even when called inside another transaction.
AcquireResult acquireResult =
tm().transactNew(
Supplier<AcquireResult> lockAcquirer =
() -> {
DateTime now = tm().getTransactionTime();
@ -237,8 +238,7 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri
lockState = LockState.FREE;
} else if (isAtOrAfter(now, lock.expirationTime)) {
lockState = LockState.TIMED_OUT;
} else if (checkThreadRunning
&& !requestStatusChecker.isRunning(lock.requestLogId)) {
} else if (checkThreadRunning && !requestStatusChecker.isRunning(lock.requestLogId)) {
lockState = LockState.OWNER_DIED;
} else {
lockState = LockState.IN_USE;
@ -246,14 +246,16 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri
}
Lock newLock =
create(
resourceName, scope, requestStatusChecker.getLogId(), now, leaseLength);
create(resourceName, scope, requestStatusChecker.getLogId(), now, leaseLength);
// Locks are not parented under an EntityGroupRoot (so as to avoid write
// contention) and don't need to be backed up.
tm().putIgnoringReadOnly(newLock);
return AcquireResult.create(now, lock, newLock, lockState);
});
};
// In ofy, backup is determined per-action, but in SQL it's determined per-transaction
AcquireResult acquireResult =
tm().isOfy() ? tm().transactNew(lockAcquirer) : jpaTm().transactWithoutBackup(lockAcquirer);
logAcquireResult(acquireResult);
lockMetrics.recordAcquire(resourceName, scope, acquireResult.lockState());
@ -263,7 +265,7 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri
/** Release the lock. */
public void release() {
// Just use the default clock because we aren't actually doing anything that will use the clock.
tm().transact(
Supplier<Void> lockReleaser =
() -> {
// To release a lock, check that no one else has already obtained it and if not
// delete it. If the lock in Datastore was different then this lock is gone already;
@ -274,7 +276,7 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri
Lock.class, new LockId(resourceName, tld), Key.create(Lock.class, lockId));
Lock loadedLock = tm().loadByKeyIfPresent(key).orElse(null);
if (Lock.this.equals(loadedLock)) {
// Use deleteWithoutBackup() so that we don't create a commit log entry for deleting
// Use deleteIgnoringReadOnly() so that we don't create a commit log entry for deleting
// the lock.
logger.atInfo().log("Deleting lock: %s", lockId);
tm().deleteIgnoringReadOnly(key);
@ -290,7 +292,14 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri
logger.atInfo().log(
"Not deleting lock: %s - someone else has it: %s", lockId, loadedLock);
}
});
return null;
};
// In ofy, backup is determined per-action, but in SQL it's determined per-transaction
if (tm().isOfy()) {
tm().transact(lockReleaser);
} else {
jpaTm().transactWithoutBackup(lockReleaser);
}
}
static class LockId extends ImmutableObject implements Serializable {

View file

@ -242,7 +242,7 @@ public class Transaction extends ImmutableObject implements Buildable {
if (entity instanceof DatastoreEntity) {
((DatastoreEntity) entity).beforeDatastoreSaveOnReplay();
}
ofyTm().put(entity);
ofyTm().putIgnoringReadOnly(entity);
}
@Override
@ -280,7 +280,7 @@ public class Transaction extends ImmutableObject implements Buildable {
@Override
public void writeToDatastore() {
ofyTm().delete(key);
ofyTm().deleteIgnoringReadOnly(key);
}
@Override

View file

@ -20,21 +20,30 @@ import static google.registry.persistence.transaction.TransactionManagerFactory.
import static google.registry.testing.DatabaseHelper.insertInDb;
import static google.registry.testing.LogsSubject.assertAboutLogs;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.testing.TestLogHandler;
import com.google.common.truth.Truth8;
import google.registry.model.common.DatabaseMigrationStateSchedule;
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
import google.registry.model.ofy.CommitLogBucket;
import google.registry.model.ofy.Ofy;
import google.registry.model.server.Lock;
import google.registry.persistence.transaction.TransactionEntity;
import google.registry.testing.AppEngineExtension;
import google.registry.testing.DatabaseHelper;
import google.registry.testing.FakeClock;
import google.registry.testing.FakeResponse;
import google.registry.testing.InjectExtension;
import google.registry.testing.TestObject;
import google.registry.util.RequestStatusChecker;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -54,18 +63,20 @@ public class ReplicateToDatastoreActionTest {
public final AppEngineExtension appEngine =
AppEngineExtension.builder()
.withDatastoreAndCloudSql()
.withOfyTestEntities(TestObject.class)
.withJpaUnitTestEntities(TestObject.class)
.withOfyTestEntities(Lock.class, TestObject.class)
.withJpaUnitTestEntities(Lock.class, TestObject.class)
.withClock(fakeClock)
.build();
@RegisterExtension final InjectExtension injectExtension = new InjectExtension();
private final ReplicateToDatastoreAction task = new ReplicateToDatastoreAction(fakeClock);
private final TestLogHandler logHandler = new TestLogHandler();
private ReplicateToDatastoreAction action;
private FakeResponse response;
@BeforeEach
void setUp() {
resetAction();
injectExtension.setStaticField(Ofy.class, "clock", fakeClock);
// Use a single bucket to expose timestamp inversion problems.
injectExtension.setStaticField(
@ -95,7 +106,7 @@ public class ReplicateToDatastoreActionTest {
jpaTm().insert(foo);
jpaTm().insert(bar);
});
task.run();
runAndVerifySuccess();
assertThat(ofyTm().transact(() -> ofyTm().loadByKey(foo.key()))).isEqualTo(foo);
assertThat(ofyTm().transact(() -> ofyTm().loadByKey(bar.key()))).isEqualTo(bar);
@ -107,7 +118,7 @@ public class ReplicateToDatastoreActionTest {
jpaTm().delete(bar.key());
jpaTm().insert(baz);
});
task.run();
runAndVerifySuccess();
assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(bar.key()).isPresent())).isFalse();
assertThat(ofyTm().transact(() -> ofyTm().loadByKey(baz.key()))).isEqualTo(baz);
@ -120,7 +131,7 @@ public class ReplicateToDatastoreActionTest {
// Write a transaction containing "foo".
insertInDb(foo);
task.run();
runAndVerifySuccess();
// Verify that it propagated to datastore, then remove "foo" directly from datastore.
assertThat(ofyTm().transact(() -> ofyTm().loadByKey(foo.key()))).isEqualTo(foo);
@ -128,7 +139,7 @@ public class ReplicateToDatastoreActionTest {
// Write "bar"
insertInDb(bar);
task.run();
runAndVerifySuccess();
// If we replayed only the most recent transaction, we should have "bar" but not "foo".
assertThat(ofyTm().transact(() -> ofyTm().loadByKey(bar.key()))).isEqualTo(bar);
@ -142,23 +153,23 @@ public class ReplicateToDatastoreActionTest {
// Write a transaction and run just the batch fetch.
insertInDb(foo);
List<TransactionEntity> txns1 = task.getTransactionBatch();
List<TransactionEntity> txns1 = action.getTransactionBatch();
assertThat(txns1).hasSize(1);
// Write a second transaction and do another batch fetch.
insertInDb(bar);
List<TransactionEntity> txns2 = task.getTransactionBatch();
List<TransactionEntity> txns2 = action.getTransactionBatch();
assertThat(txns2).hasSize(2);
// Apply the first batch.
task.applyTransaction(txns1.get(0));
action.applyTransaction(txns1.get(0));
// Remove the foo record so we can ensure that this transaction doesn't get doublle-played.
ofyTm().transact(() -> ofyTm().delete(foo.key()));
// Apply the second batch.
for (TransactionEntity txn : txns2) {
task.applyTransaction(txn);
action.applyTransaction(txn);
}
// Verify that the first transaction didn't get replayed but the second one did.
@ -179,9 +190,10 @@ public class ReplicateToDatastoreActionTest {
// Force the last transaction id back to -1 so that we look for transaction 0.
ofyTm().transact(() -> ofyTm().insert(new LastSqlTransaction(-1)));
List<TransactionEntity> txns = task.getTransactionBatch();
List<TransactionEntity> txns = action.getTransactionBatch();
assertThat(txns).hasSize(1);
assertThat(assertThrows(IllegalStateException.class, () -> task.applyTransaction(txns.get(0))))
assertThat(
assertThrows(IllegalStateException.class, () -> action.applyTransaction(txns.get(0))))
.hasMessageThat()
.isEqualTo("Missing transaction: last txn id = -1, next available txn = 1");
}
@ -194,19 +206,21 @@ public class ReplicateToDatastoreActionTest {
// Force the last transaction id back to -1 so that we look for transaction 0.
ofyTm().transact(() -> ofyTm().insert(new LastSqlTransaction(-1)));
task.run();
action.run();
assertAboutLogs()
.that(logHandler)
.hasSevereLogWithCause(
new IllegalStateException(
"Missing transaction: last txn id = -1, next available txn = 1"));
assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR);
assertThat(response.getPayload()).isEqualTo("Errored out replaying files");
}
@Test
void testBeforeDatastoreSaveCallback() {
TestObject testObject = TestObject.create("foo");
jpaTm().transact(() -> jpaTm().put(testObject));
task.run();
action.run();
assertThat(ofyTm().loadAllOf(TestObject.class)).containsExactly(testObject);
assertThat(TestObject.beforeDatastoreSaveCallCount).isEqualTo(1);
}
@ -231,7 +245,7 @@ public class ReplicateToDatastoreActionTest {
fakeClock.advanceBy(Duration.standardDays(1));
insertInDb(TestObject.create("foo"));
task.run();
action.run();
// Replication shouldn't have happened
assertThat(ofyTm().loadAllOf(TestObject.class)).isEmpty();
assertAboutLogs()
@ -240,5 +254,46 @@ public class ReplicateToDatastoreActionTest {
Level.INFO,
"Skipping ReplicateToDatastoreAction because we are in migration phase "
+ "DATASTORE_PRIMARY.");
assertThat(response.getStatus()).isEqualTo(SC_NO_CONTENT);
assertThat(response.getPayload())
.isEqualTo(
"Skipping ReplicateToDatastoreAction because we are in migration phase"
+ " DATASTORE_PRIMARY.");
}
@Test
void testFailure_cannotAcquireLock() {
RequestStatusChecker requestStatusChecker = mock(RequestStatusChecker.class);
when(requestStatusChecker.getLogId()).thenReturn("logId");
Truth8.assertThat(
Lock.acquire(
ReplicateToDatastoreAction.class.getSimpleName(),
null,
Duration.standardHours(1),
requestStatusChecker,
false))
.isPresent();
fakeClock.advanceOneMilli();
resetAction();
action.run();
assertThat(response.getStatus()).isEqualTo(SC_NO_CONTENT);
assertThat(response.getPayload())
.isEqualTo("Can't acquire ReplicateToDatastoreAction lock, aborting.");
}
private void runAndVerifySuccess() {
resetAction();
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getPayload())
.isEqualTo("Replayed 1 transaction(s) from Cloud SQL -> Datastore");
}
private void resetAction() {
response = new FakeResponse();
RequestStatusChecker requestStatusChecker = mock(RequestStatusChecker.class);
when(requestStatusChecker.getLogId()).thenReturn("logId");
action = new ReplicateToDatastoreAction(fakeClock, requestStatusChecker, response);
}
}

View file

@ -38,6 +38,7 @@ import google.registry.persistence.transaction.Transaction.Delete;
import google.registry.persistence.transaction.Transaction.Mutation;
import google.registry.persistence.transaction.Transaction.Update;
import google.registry.persistence.transaction.TransactionEntity;
import google.registry.util.RequestStatusChecker;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
@ -45,6 +46,7 @@ import javax.annotation.Nullable;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.mockito.Mockito;
/**
* A JUnit extension that replays datastore transactions against postgresql.
@ -81,7 +83,11 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback {
* Create a replay extension that replays from SQL to cloud datastore when running in SQL mode.
*/
public static ReplayExtension createWithDoubleReplay(FakeClock clock) {
return new ReplayExtension(clock, true, new ReplicateToDatastoreAction(clock));
return new ReplayExtension(
clock,
true,
new ReplicateToDatastoreAction(
clock, Mockito.mock(RequestStatusChecker.class), new FakeResponse()));
}
@Override