Create a mechanism for storing / using locks explicitly only in SQL (#1392)

This is used for the replay locks so that Beam pipelines (which will be
used for database comparison) can acquire / release locks as necessary
to avoid database contention. If we're comparing contents of Datastore
and SQL databases, we shouldn't have replay actively running during the
comparison, so the pipeline will grab the locks.

Beam doesn't always play nicely with loading from / saving to Datastore,
so we need to make sure that we store the replay locks in SQL at all
times, even when Datastore is the primary DB.
This commit is contained in:
gbrodman 2021-10-27 16:20:35 -04:00 committed by GitHub
parent 201b6e8e0b
commit 1e7aae26a3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 205 additions and 41 deletions

View file

@ -112,7 +112,7 @@ public class ReplayCommitLogsToSqlAction implements Runnable {
return;
}
Optional<Lock> lock =
Lock.acquire(
Lock.acquireSql(
this.getClass().getSimpleName(), null, LEASE_LENGTH, requestStatusChecker, false);
if (!lock.isPresent()) {
String message = "Can't acquire SQL commit log replay lock, aborting.";

View file

@ -169,7 +169,7 @@ public class ReplicateToDatastoreAction implements Runnable {
return;
}
Optional<Lock> lock =
Lock.acquire(
Lock.acquireSql(
this.getClass().getSimpleName(), null, LEASE_LENGTH, requestStatusChecker, false);
if (!lock.isPresent()) {
String message = "Can't acquire ReplicateToDatastoreAction lock, aborting.";

View file

@ -32,6 +32,8 @@ import google.registry.model.annotations.NotBackedUp;
import google.registry.model.annotations.NotBackedUp.Reason;
import google.registry.model.replay.DatastoreAndSqlEntity;
import google.registry.persistence.VKey;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.persistence.transaction.TransactionManager;
import google.registry.util.RequestStatusChecker;
import google.registry.util.RequestStatusCheckerImpl;
import java.io.Serializable;
@ -212,6 +214,47 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri
Duration leaseLength,
RequestStatusChecker requestStatusChecker,
boolean checkThreadRunning) {
return acquireWithTransactionManager(
resourceName, tld, leaseLength, requestStatusChecker, checkThreadRunning, tm());
}
/**
* Try to acquire a lock in SQL. Returns absent if it can't be acquired.
*
* <p>This method exists so that Beam pipelines can acquire / load / release locks.
*/
public static Optional<Lock> acquireSql(
String resourceName,
@Nullable String tld,
Duration leaseLength,
RequestStatusChecker requestStatusChecker,
boolean checkThreadRunning) {
return acquireWithTransactionManager(
resourceName, tld, leaseLength, requestStatusChecker, checkThreadRunning, jpaTm());
}
/** Release the lock. */
public void release() {
releaseWithTransactionManager(tm());
}
/**
* Release the lock from SQL.
*
* <p>This method exists so that Beam pipelines can acquire / load / release locks.
*/
public void releaseSql() {
releaseWithTransactionManager(jpaTm());
}
/** Try to acquire a lock. Returns absent if it can't be acquired. */
private static Optional<Lock> acquireWithTransactionManager(
String resourceName,
@Nullable String tld,
Duration leaseLength,
RequestStatusChecker requestStatusChecker,
boolean checkThreadRunning,
TransactionManager transactionManager) {
String scope = (tld != null) ? tld : GLOBAL;
String lockId = makeLockId(resourceName, scope);
// It's important to use transactNew rather than transact, because a Lock can be used to control
@ -219,11 +262,12 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri
// must be definitively acquired before it is used, even when called inside another transaction.
Supplier<AcquireResult> lockAcquirer =
() -> {
DateTime now = tm().getTransactionTime();
DateTime now = transactionManager.getTransactionTime();
// Checking if an unexpired lock still exists - if so, the lock can't be acquired.
Lock lock =
tm().loadByKeyIfPresent(
transactionManager
.loadByKeyIfPresent(
VKey.create(
Lock.class,
new LockId(resourceName, scope),
@ -249,13 +293,15 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri
create(resourceName, scope, requestStatusChecker.getLogId(), now, leaseLength);
// Locks are not parented under an EntityGroupRoot (so as to avoid write
// contention) and don't need to be backed up.
tm().putIgnoringReadOnly(newLock);
transactionManager.putIgnoringReadOnly(newLock);
return AcquireResult.create(now, lock, newLock, lockState);
};
// In ofy, backup is determined per-action, but in SQL it's determined per-transaction
AcquireResult acquireResult =
tm().isOfy() ? tm().transactNew(lockAcquirer) : jpaTm().transactWithoutBackup(lockAcquirer);
transactionManager.isOfy()
? transactionManager.transactNew(lockAcquirer)
: ((JpaTransactionManager) transactionManager).transactWithoutBackup(lockAcquirer);
logAcquireResult(acquireResult);
lockMetrics.recordAcquire(resourceName, scope, acquireResult.lockState());
@ -263,7 +309,7 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri
}
/** Release the lock. */
public void release() {
private void releaseWithTransactionManager(TransactionManager transactionManager) {
// Just use the default clock because we aren't actually doing anything that will use the clock.
Supplier<Void> lockReleaser =
() -> {
@ -274,15 +320,17 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri
VKey<Lock> key =
VKey.create(
Lock.class, new LockId(resourceName, tld), Key.create(Lock.class, lockId));
Lock loadedLock = tm().loadByKeyIfPresent(key).orElse(null);
Lock loadedLock = transactionManager.loadByKeyIfPresent(key).orElse(null);
if (Lock.this.equals(loadedLock)) {
// Use deleteIgnoringReadOnly() so that we don't create a commit log entry for deleting
// the lock.
logger.atInfo().log("Deleting lock: %s", lockId);
tm().deleteIgnoringReadOnly(key);
transactionManager.deleteIgnoringReadOnly(key);
lockMetrics.recordRelease(
resourceName, tld, new Duration(acquiredTime, tm().getTransactionTime()));
resourceName,
tld,
new Duration(acquiredTime, transactionManager.getTransactionTime()));
} else {
logger.atSevere().log(
"The lock we acquired was transferred to someone else before we"
@ -294,11 +342,12 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri
}
return null;
};
// In ofy, backup is determined per-action, but in SQL it's determined per-transaction
if (tm().isOfy()) {
tm().transact(lockReleaser);
if (transactionManager.isOfy()) {
transactionManager.transact(lockReleaser);
} else {
jpaTm().transactWithoutBackup(lockReleaser);
((JpaTransactionManager) transactionManager).transactWithoutBackup(lockReleaser);
}
}

View file

@ -22,8 +22,8 @@ import org.joda.time.Duration;
/**
* Code execution locked on some shared resource.
*
* <p>Locks are either specific to a tld or global to the entire system, in which case a tld of
* null is used.
* <p>Locks are either specific to a tld or global to the entire system, in which case a tld of null
* is used.
*/
public interface LockHandler extends Serializable {
@ -42,4 +42,22 @@ public interface LockHandler extends Serializable {
@Nullable String tld,
Duration leaseLength,
String... lockNames);
/**
* Acquire one or more locks using only Cloud SQL and execute a Void {@link Callable}.
*
* <p>Runs on a thread that will be killed if it doesn't complete before the lease expires.
*
* <p>Note that locks are specific either to a given tld or to the entire system (in which case
* tld should be passed as null).
*
* <p>This method exists so that Beam pipelines can acquire / load / release locks.
*
* @return true if all locks were acquired and the callable was run; false otherwise.
*/
boolean executeWithSqlLocks(
final Callable<Void> callable,
@Nullable String tld,
Duration leaseLength,
String... lockNames);
}

View file

@ -73,12 +73,42 @@ public class LockHandlerImpl implements LockHandler {
@Nullable String tld,
Duration leaseLength,
String... lockNames) {
return executeWithLockAcquirer(callable, tld, leaseLength, this::acquire, lockNames);
}
/**
* Acquire one or more locks using only Cloud SQL and execute a Void {@link Callable}.
*
* <p>Thread will be killed if it doesn't complete before the lease expires.
*
* <p>Note that locks are specific either to a given tld or to the entire system (in which case
* tld should be passed as null).
*
* <p>This method exists so that Beam pipelines can acquire / load / release locks.
*
* @return whether all locks were acquired and the callable was run.
*/
@Override
public boolean executeWithSqlLocks(
final Callable<Void> callable,
@Nullable String tld,
Duration leaseLength,
String... lockNames) {
return executeWithLockAcquirer(callable, tld, leaseLength, this::acquireSql, lockNames);
}
private boolean executeWithLockAcquirer(
final Callable<Void> callable,
@Nullable String tld,
Duration leaseLength,
LockAcquirer lockAcquirer,
String... lockNames) {
DateTime startTime = clock.nowUtc();
String sanitizedTld = Strings.emptyToNull(tld);
try {
return AppEngineTimeLimiter.create()
.callWithTimeout(
new LockingCallable(callable, sanitizedTld, leaseLength, lockNames),
new LockingCallable(callable, lockAcquirer, sanitizedTld, leaseLength, lockNames),
leaseLength.minus(LOCK_TIMEOUT_FUDGE).getMillis(),
TimeUnit.MILLISECONDS);
} catch (ExecutionException | UncheckedExecutionException e) {
@ -108,17 +138,32 @@ public class LockHandlerImpl implements LockHandler {
return Lock.acquire(lockName, tld, leaseLength, requestStatusChecker, true);
}
@VisibleForTesting
Optional<Lock> acquireSql(String lockName, @Nullable String tld, Duration leaseLength) {
return Lock.acquireSql(lockName, tld, leaseLength, requestStatusChecker, true);
}
private interface LockAcquirer {
Optional<Lock> acquireLock(String lockName, @Nullable String tld, Duration leaseLength);
}
/** A {@link Callable} that acquires and releases a lock around a delegate {@link Callable}. */
private class LockingCallable implements Callable<Boolean> {
private static class LockingCallable implements Callable<Boolean> {
final Callable<Void> delegate;
final LockAcquirer lockAcquirer;
@Nullable final String tld;
final Duration leaseLength;
final Set<String> lockNames;
LockingCallable(
Callable<Void> delegate, String tld, Duration leaseLength, String... lockNames) {
Callable<Void> delegate,
LockAcquirer lockAcquirer,
String tld,
Duration leaseLength,
String... lockNames) {
checkArgument(leaseLength.isLongerThan(LOCK_TIMEOUT_FUDGE));
this.delegate = delegate;
this.lockAcquirer = lockAcquirer;
this.tld = tld;
this.leaseLength = leaseLength;
// Make sure we join locks in a fixed (lexicographical) order to avoid deadlock.
@ -130,7 +175,7 @@ public class LockHandlerImpl implements LockHandler {
Set<Lock> acquiredLocks = new HashSet<>();
try {
for (String lockName : lockNames) {
Optional<Lock> lock = acquire(lockName, tld, leaseLength);
Optional<Lock> lock = lockAcquirer.acquireLock(lockName, tld, leaseLength);
if (!lock.isPresent()) {
logger.atInfo().log("Couldn't acquire lock named: %s for TLD %s.", lockName, tld);
return false;

View file

@ -34,6 +34,7 @@ import static google.registry.util.DateTimeUtils.START_OF_TIME;
import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -111,6 +112,7 @@ public class ReplayCommitLogsToSqlActionTest {
DelegationSignerData.class,
DomainBase.class,
GracePeriod.class,
Lock.class,
PremiumList.class,
PremiumEntry.class,
RegistrarContact.class,
@ -135,6 +137,7 @@ public class ReplayCommitLogsToSqlActionTest {
@BeforeEach
void beforeEach() {
inject.setStaticField(Ofy.class, "clock", fakeClock);
lenient().when(requestStatusChecker.getLogId()).thenReturn("requestLogId");
action.gcsUtils = gcsUtils;
action.response = response;
action.requestStatusChecker = requestStatusChecker;
@ -464,9 +467,10 @@ public class ReplayCommitLogsToSqlActionTest {
}
});
runAndAssertSuccess(now.minusMinutes(1), 1, 1);
// jpaTm()::putIgnoringReadOnly should only have been called with the checkpoint
// jpaTm()::putIgnoringReadOnly should only have been called with the checkpoint and the lock
verify(spy, times(2)).putIgnoringReadOnly(any(SqlReplayCheckpoint.class));
verify(spy, times(2)).putIgnoringReadOnly(any());
verify(spy).putIgnoringReadOnly(any(Lock.class));
verify(spy, times(3)).putIgnoringReadOnly(any());
}
@Test
@ -506,7 +510,7 @@ public class ReplayCommitLogsToSqlActionTest {
@Test
void testFailure_cannotAcquireLock() {
Truth8.assertThat(
Lock.acquire(
Lock.acquireSql(
ReplayCommitLogsToSqlAction.class.getSimpleName(),
null,
Duration.standardHours(1),

View file

@ -294,7 +294,7 @@ public class ReplicateToDatastoreActionTest {
RequestStatusChecker requestStatusChecker = mock(RequestStatusChecker.class);
when(requestStatusChecker.getLogId()).thenReturn("logId");
Truth8.assertThat(
Lock.acquire(
Lock.acquireSql(
ReplicateToDatastoreAction.class.getSimpleName(),
null,
Duration.standardHours(1),

View file

@ -20,6 +20,7 @@ import static google.registry.model.server.Lock.LockState.FREE;
import static google.registry.model.server.Lock.LockState.IN_USE;
import static google.registry.model.server.Lock.LockState.OWNER_DIED;
import static google.registry.model.server.Lock.LockState.TIMED_OUT;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@ -28,8 +29,10 @@ import static org.mockito.Mockito.when;
import google.registry.model.EntityTestCase;
import google.registry.model.server.Lock.LockState;
import google.registry.testing.DatabaseHelper;
import google.registry.testing.DualDatabaseTest;
import google.registry.testing.TestOfyAndSql;
import google.registry.testing.TestOfyOnly;
import google.registry.util.RequestStatusChecker;
import java.util.Optional;
import org.joda.time.Duration;
@ -132,6 +135,19 @@ public class LockTest extends EntityTestCase {
assertThat(acquire("b", ONE_DAY, IN_USE)).isEmpty();
}
@TestOfyOnly
void testSqlLock_inOfyMode() {
Lock.lockMetrics = origLockMetrics;
Optional<Lock> lock = Lock.acquireSql(RESOURCE_NAME, null, ONE_DAY, requestStatusChecker, true);
assertThat(lock).isPresent();
assertThat(DatabaseHelper.loadAllOf(Lock.class)).isEmpty();
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Lock.class))).containsExactly(lock.get());
lock.get().releaseSql();
assertThat(DatabaseHelper.loadAllOf(Lock.class)).isEmpty();
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Lock.class))).isEmpty();
}
@TestOfyAndSql
void testFailure_emptyResourceName() {
IllegalArgumentException thrown =

View file

@ -40,7 +40,9 @@ final class LockHandlerImplTest {
private final FakeClock clock = new FakeClock(DateTime.parse("2001-08-29T12:20:00Z"));
@RegisterExtension final AppEngineExtension appEngine = AppEngineExtension.builder().build();
@RegisterExtension
final AppEngineExtension appEngine =
AppEngineExtension.builder().withDatastoreAndCloudSql().build();
private static class CountingCallable implements Callable<Void> {
int numCalled = 0;
@ -69,18 +71,13 @@ final class LockHandlerImplTest {
}
private boolean executeWithLocks(Callable<Void> callable, final @Nullable Lock acquiredLock) {
LockHandlerImpl lockHandler = new LockHandlerImpl(new RequestStatusCheckerImpl(), clock) {
private static final long serialVersionUID = 0L;
@Override
Optional<Lock> acquire(String resourceName, String tld, Duration leaseLength) {
assertThat(resourceName).isEqualTo("resourceName");
assertThat(tld).isEqualTo("tld");
assertThat(leaseLength).isEqualTo(ONE_DAY);
return Optional.ofNullable(acquiredLock);
}
};
return createTestLockHandler(acquiredLock)
.executeWithLocks(callable, "tld", ONE_DAY, "resourceName");
}
return lockHandler.executeWithLocks(callable, "tld", ONE_DAY, "resourceName");
private boolean executeWithSqlLocks(Callable<Void> callable, final @Nullable Lock acquiredLock) {
return createTestLockHandler(acquiredLock)
.executeWithSqlLocks(callable, "tld", ONE_DAY, "resourceName");
}
@Test
@ -92,6 +89,15 @@ final class LockHandlerImplTest {
verify(lock, times(1)).release();
}
@Test
void testSqlLockSucceeds() {
Lock lock = mock(Lock.class);
CountingCallable countingCallable = new CountingCallable();
assertThat(executeWithSqlLocks(countingCallable, lock)).isTrue();
assertThat(countingCallable.numCalled).isEqualTo(1);
verify(lock, times(1)).release();
}
@Test
void testLockSucceeds_uncheckedException() {
Lock lock = mock(Lock.class);
@ -140,4 +146,23 @@ final class LockHandlerImplTest {
assertThat(executeWithLocks(countingCallable, lock)).isFalse();
assertThat(countingCallable.numCalled).isEqualTo(0);
}
private LockHandler createTestLockHandler(@Nullable Lock acquiredLock) {
return new LockHandlerImpl(new RequestStatusCheckerImpl(), clock) {
private static final long serialVersionUID = 0L;
@Override
Optional<Lock> acquire(String resourceName, String tld, Duration leaseLength) {
assertThat(resourceName).isEqualTo("resourceName");
assertThat(tld).isEqualTo("tld");
assertThat(leaseLength).isEqualTo(ONE_DAY);
return Optional.ofNullable(acquiredLock);
}
@Override
Optional<Lock> acquireSql(String resourceName, String tld, Duration leaseLength) {
return acquire(resourceName, tld, leaseLength);
}
};
}
}

View file

@ -26,11 +26,11 @@ public class FakeLockHandler implements LockHandler {
private static final long serialVersionUID = 6437880915118738492L;
boolean lockSucceeds;
private final boolean lockSucceeds;
/**
* @param lockSucceeds if true - the lock acquisition will succeed and the callable will be
* called. If false, lock acquisition will fail and the caller isn't called.
* called. If false, lock acquisition will fail and the caller isn't called.
*/
public FakeLockHandler(boolean lockSucceeds) {
this.lockSucceeds = lockSucceeds;
@ -38,10 +38,17 @@ public class FakeLockHandler implements LockHandler {
@Override
public boolean executeWithLocks(
final Callable<Void> callable,
@Nullable String tld,
Duration leaseLength,
String... lockNames) {
Callable<Void> callable, @Nullable String tld, Duration leaseLength, String... lockNames) {
return execute(callable);
}
@Override
public boolean executeWithSqlLocks(
Callable<Void> callable, @Nullable String tld, Duration leaseLength, String... lockNames) {
return execute(callable);
}
private boolean execute(Callable<Void> callable) {
if (!lockSucceeds) {
return false;
}