diff --git a/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java b/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java index 90bed21a8..c35d8d9dd 100644 --- a/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java +++ b/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java @@ -112,7 +112,7 @@ public class ReplayCommitLogsToSqlAction implements Runnable { return; } Optional lock = - Lock.acquire( + Lock.acquireSql( this.getClass().getSimpleName(), null, LEASE_LENGTH, requestStatusChecker, false); if (!lock.isPresent()) { String message = "Can't acquire SQL commit log replay lock, aborting."; diff --git a/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java b/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java index 8a2391a99..630a168c8 100644 --- a/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java +++ b/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java @@ -169,7 +169,7 @@ public class ReplicateToDatastoreAction implements Runnable { return; } Optional lock = - Lock.acquire( + Lock.acquireSql( this.getClass().getSimpleName(), null, LEASE_LENGTH, requestStatusChecker, false); if (!lock.isPresent()) { String message = "Can't acquire ReplicateToDatastoreAction lock, aborting."; diff --git a/core/src/main/java/google/registry/model/server/Lock.java b/core/src/main/java/google/registry/model/server/Lock.java index 1e4e7ac7e..774ed5af1 100644 --- a/core/src/main/java/google/registry/model/server/Lock.java +++ b/core/src/main/java/google/registry/model/server/Lock.java @@ -32,6 +32,8 @@ import google.registry.model.annotations.NotBackedUp; import google.registry.model.annotations.NotBackedUp.Reason; import google.registry.model.replay.DatastoreAndSqlEntity; import google.registry.persistence.VKey; +import google.registry.persistence.transaction.JpaTransactionManager; +import google.registry.persistence.transaction.TransactionManager; import google.registry.util.RequestStatusChecker; import google.registry.util.RequestStatusCheckerImpl; import java.io.Serializable; @@ -212,6 +214,47 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri Duration leaseLength, RequestStatusChecker requestStatusChecker, boolean checkThreadRunning) { + return acquireWithTransactionManager( + resourceName, tld, leaseLength, requestStatusChecker, checkThreadRunning, tm()); + } + + /** + * Try to acquire a lock in SQL. Returns absent if it can't be acquired. + * + *

This method exists so that Beam pipelines can acquire / load / release locks. + */ + public static Optional acquireSql( + String resourceName, + @Nullable String tld, + Duration leaseLength, + RequestStatusChecker requestStatusChecker, + boolean checkThreadRunning) { + return acquireWithTransactionManager( + resourceName, tld, leaseLength, requestStatusChecker, checkThreadRunning, jpaTm()); + } + + /** Release the lock. */ + public void release() { + releaseWithTransactionManager(tm()); + } + + /** + * Release the lock from SQL. + * + *

This method exists so that Beam pipelines can acquire / load / release locks. + */ + public void releaseSql() { + releaseWithTransactionManager(jpaTm()); + } + + /** Try to acquire a lock. Returns absent if it can't be acquired. */ + private static Optional acquireWithTransactionManager( + String resourceName, + @Nullable String tld, + Duration leaseLength, + RequestStatusChecker requestStatusChecker, + boolean checkThreadRunning, + TransactionManager transactionManager) { String scope = (tld != null) ? tld : GLOBAL; String lockId = makeLockId(resourceName, scope); // It's important to use transactNew rather than transact, because a Lock can be used to control @@ -219,11 +262,12 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri // must be definitively acquired before it is used, even when called inside another transaction. Supplier lockAcquirer = () -> { - DateTime now = tm().getTransactionTime(); + DateTime now = transactionManager.getTransactionTime(); // Checking if an unexpired lock still exists - if so, the lock can't be acquired. Lock lock = - tm().loadByKeyIfPresent( + transactionManager + .loadByKeyIfPresent( VKey.create( Lock.class, new LockId(resourceName, scope), @@ -249,13 +293,15 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri create(resourceName, scope, requestStatusChecker.getLogId(), now, leaseLength); // Locks are not parented under an EntityGroupRoot (so as to avoid write // contention) and don't need to be backed up. - tm().putIgnoringReadOnly(newLock); + transactionManager.putIgnoringReadOnly(newLock); return AcquireResult.create(now, lock, newLock, lockState); }; // In ofy, backup is determined per-action, but in SQL it's determined per-transaction AcquireResult acquireResult = - tm().isOfy() ? tm().transactNew(lockAcquirer) : jpaTm().transactWithoutBackup(lockAcquirer); + transactionManager.isOfy() + ? transactionManager.transactNew(lockAcquirer) + : ((JpaTransactionManager) transactionManager).transactWithoutBackup(lockAcquirer); logAcquireResult(acquireResult); lockMetrics.recordAcquire(resourceName, scope, acquireResult.lockState()); @@ -263,7 +309,7 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri } /** Release the lock. */ - public void release() { + private void releaseWithTransactionManager(TransactionManager transactionManager) { // Just use the default clock because we aren't actually doing anything that will use the clock. Supplier lockReleaser = () -> { @@ -274,15 +320,17 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri VKey key = VKey.create( Lock.class, new LockId(resourceName, tld), Key.create(Lock.class, lockId)); - Lock loadedLock = tm().loadByKeyIfPresent(key).orElse(null); + Lock loadedLock = transactionManager.loadByKeyIfPresent(key).orElse(null); if (Lock.this.equals(loadedLock)) { // Use deleteIgnoringReadOnly() so that we don't create a commit log entry for deleting // the lock. logger.atInfo().log("Deleting lock: %s", lockId); - tm().deleteIgnoringReadOnly(key); + transactionManager.deleteIgnoringReadOnly(key); lockMetrics.recordRelease( - resourceName, tld, new Duration(acquiredTime, tm().getTransactionTime())); + resourceName, + tld, + new Duration(acquiredTime, transactionManager.getTransactionTime())); } else { logger.atSevere().log( "The lock we acquired was transferred to someone else before we" @@ -294,11 +342,12 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri } return null; }; + // In ofy, backup is determined per-action, but in SQL it's determined per-transaction - if (tm().isOfy()) { - tm().transact(lockReleaser); + if (transactionManager.isOfy()) { + transactionManager.transact(lockReleaser); } else { - jpaTm().transactWithoutBackup(lockReleaser); + ((JpaTransactionManager) transactionManager).transactWithoutBackup(lockReleaser); } } diff --git a/core/src/main/java/google/registry/request/lock/LockHandler.java b/core/src/main/java/google/registry/request/lock/LockHandler.java index 9f30fb9c5..2c15d5e04 100644 --- a/core/src/main/java/google/registry/request/lock/LockHandler.java +++ b/core/src/main/java/google/registry/request/lock/LockHandler.java @@ -22,8 +22,8 @@ import org.joda.time.Duration; /** * Code execution locked on some shared resource. * - *

Locks are either specific to a tld or global to the entire system, in which case a tld of - * null is used. + *

Locks are either specific to a tld or global to the entire system, in which case a tld of null + * is used. */ public interface LockHandler extends Serializable { @@ -42,4 +42,22 @@ public interface LockHandler extends Serializable { @Nullable String tld, Duration leaseLength, String... lockNames); + + /** + * Acquire one or more locks using only Cloud SQL and execute a Void {@link Callable}. + * + *

Runs on a thread that will be killed if it doesn't complete before the lease expires. + * + *

Note that locks are specific either to a given tld or to the entire system (in which case + * tld should be passed as null). + * + *

This method exists so that Beam pipelines can acquire / load / release locks. + * + * @return true if all locks were acquired and the callable was run; false otherwise. + */ + boolean executeWithSqlLocks( + final Callable callable, + @Nullable String tld, + Duration leaseLength, + String... lockNames); } diff --git a/core/src/main/java/google/registry/request/lock/LockHandlerImpl.java b/core/src/main/java/google/registry/request/lock/LockHandlerImpl.java index 79a979b49..12d0e2396 100644 --- a/core/src/main/java/google/registry/request/lock/LockHandlerImpl.java +++ b/core/src/main/java/google/registry/request/lock/LockHandlerImpl.java @@ -73,12 +73,42 @@ public class LockHandlerImpl implements LockHandler { @Nullable String tld, Duration leaseLength, String... lockNames) { + return executeWithLockAcquirer(callable, tld, leaseLength, this::acquire, lockNames); + } + + /** + * Acquire one or more locks using only Cloud SQL and execute a Void {@link Callable}. + * + *

Thread will be killed if it doesn't complete before the lease expires. + * + *

Note that locks are specific either to a given tld or to the entire system (in which case + * tld should be passed as null). + * + *

This method exists so that Beam pipelines can acquire / load / release locks. + * + * @return whether all locks were acquired and the callable was run. + */ + @Override + public boolean executeWithSqlLocks( + final Callable callable, + @Nullable String tld, + Duration leaseLength, + String... lockNames) { + return executeWithLockAcquirer(callable, tld, leaseLength, this::acquireSql, lockNames); + } + + private boolean executeWithLockAcquirer( + final Callable callable, + @Nullable String tld, + Duration leaseLength, + LockAcquirer lockAcquirer, + String... lockNames) { DateTime startTime = clock.nowUtc(); String sanitizedTld = Strings.emptyToNull(tld); try { return AppEngineTimeLimiter.create() .callWithTimeout( - new LockingCallable(callable, sanitizedTld, leaseLength, lockNames), + new LockingCallable(callable, lockAcquirer, sanitizedTld, leaseLength, lockNames), leaseLength.minus(LOCK_TIMEOUT_FUDGE).getMillis(), TimeUnit.MILLISECONDS); } catch (ExecutionException | UncheckedExecutionException e) { @@ -108,17 +138,32 @@ public class LockHandlerImpl implements LockHandler { return Lock.acquire(lockName, tld, leaseLength, requestStatusChecker, true); } + @VisibleForTesting + Optional acquireSql(String lockName, @Nullable String tld, Duration leaseLength) { + return Lock.acquireSql(lockName, tld, leaseLength, requestStatusChecker, true); + } + + private interface LockAcquirer { + Optional acquireLock(String lockName, @Nullable String tld, Duration leaseLength); + } + /** A {@link Callable} that acquires and releases a lock around a delegate {@link Callable}. */ - private class LockingCallable implements Callable { + private static class LockingCallable implements Callable { final Callable delegate; + final LockAcquirer lockAcquirer; @Nullable final String tld; final Duration leaseLength; final Set lockNames; LockingCallable( - Callable delegate, String tld, Duration leaseLength, String... lockNames) { + Callable delegate, + LockAcquirer lockAcquirer, + String tld, + Duration leaseLength, + String... lockNames) { checkArgument(leaseLength.isLongerThan(LOCK_TIMEOUT_FUDGE)); this.delegate = delegate; + this.lockAcquirer = lockAcquirer; this.tld = tld; this.leaseLength = leaseLength; // Make sure we join locks in a fixed (lexicographical) order to avoid deadlock. @@ -130,7 +175,7 @@ public class LockHandlerImpl implements LockHandler { Set acquiredLocks = new HashSet<>(); try { for (String lockName : lockNames) { - Optional lock = acquire(lockName, tld, leaseLength); + Optional lock = lockAcquirer.acquireLock(lockName, tld, leaseLength); if (!lock.isPresent()) { logger.atInfo().log("Couldn't acquire lock named: %s for TLD %s.", lockName, tld); return false; diff --git a/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java b/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java index 28544ea80..4f4c419f9 100644 --- a/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java +++ b/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java @@ -34,6 +34,7 @@ import static google.registry.util.DateTimeUtils.START_OF_TIME; import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; import static javax.servlet.http.HttpServletResponse.SC_OK; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -111,6 +112,7 @@ public class ReplayCommitLogsToSqlActionTest { DelegationSignerData.class, DomainBase.class, GracePeriod.class, + Lock.class, PremiumList.class, PremiumEntry.class, RegistrarContact.class, @@ -135,6 +137,7 @@ public class ReplayCommitLogsToSqlActionTest { @BeforeEach void beforeEach() { inject.setStaticField(Ofy.class, "clock", fakeClock); + lenient().when(requestStatusChecker.getLogId()).thenReturn("requestLogId"); action.gcsUtils = gcsUtils; action.response = response; action.requestStatusChecker = requestStatusChecker; @@ -464,9 +467,10 @@ public class ReplayCommitLogsToSqlActionTest { } }); runAndAssertSuccess(now.minusMinutes(1), 1, 1); - // jpaTm()::putIgnoringReadOnly should only have been called with the checkpoint + // jpaTm()::putIgnoringReadOnly should only have been called with the checkpoint and the lock verify(spy, times(2)).putIgnoringReadOnly(any(SqlReplayCheckpoint.class)); - verify(spy, times(2)).putIgnoringReadOnly(any()); + verify(spy).putIgnoringReadOnly(any(Lock.class)); + verify(spy, times(3)).putIgnoringReadOnly(any()); } @Test @@ -506,7 +510,7 @@ public class ReplayCommitLogsToSqlActionTest { @Test void testFailure_cannotAcquireLock() { Truth8.assertThat( - Lock.acquire( + Lock.acquireSql( ReplayCommitLogsToSqlAction.class.getSimpleName(), null, Duration.standardHours(1), diff --git a/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java b/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java index a34029988..3b6366a69 100644 --- a/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java +++ b/core/src/test/java/google/registry/model/replay/ReplicateToDatastoreActionTest.java @@ -294,7 +294,7 @@ public class ReplicateToDatastoreActionTest { RequestStatusChecker requestStatusChecker = mock(RequestStatusChecker.class); when(requestStatusChecker.getLogId()).thenReturn("logId"); Truth8.assertThat( - Lock.acquire( + Lock.acquireSql( ReplicateToDatastoreAction.class.getSimpleName(), null, Duration.standardHours(1), diff --git a/core/src/test/java/google/registry/model/server/LockTest.java b/core/src/test/java/google/registry/model/server/LockTest.java index 12526970c..bd0aeaf24 100644 --- a/core/src/test/java/google/registry/model/server/LockTest.java +++ b/core/src/test/java/google/registry/model/server/LockTest.java @@ -20,6 +20,7 @@ import static google.registry.model.server.Lock.LockState.FREE; import static google.registry.model.server.Lock.LockState.IN_USE; import static google.registry.model.server.Lock.LockState.OWNER_DIED; import static google.registry.model.server.Lock.LockState.TIMED_OUT; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -28,8 +29,10 @@ import static org.mockito.Mockito.when; import google.registry.model.EntityTestCase; import google.registry.model.server.Lock.LockState; +import google.registry.testing.DatabaseHelper; import google.registry.testing.DualDatabaseTest; import google.registry.testing.TestOfyAndSql; +import google.registry.testing.TestOfyOnly; import google.registry.util.RequestStatusChecker; import java.util.Optional; import org.joda.time.Duration; @@ -132,6 +135,19 @@ public class LockTest extends EntityTestCase { assertThat(acquire("b", ONE_DAY, IN_USE)).isEmpty(); } + @TestOfyOnly + void testSqlLock_inOfyMode() { + Lock.lockMetrics = origLockMetrics; + Optional lock = Lock.acquireSql(RESOURCE_NAME, null, ONE_DAY, requestStatusChecker, true); + assertThat(lock).isPresent(); + assertThat(DatabaseHelper.loadAllOf(Lock.class)).isEmpty(); + assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Lock.class))).containsExactly(lock.get()); + + lock.get().releaseSql(); + assertThat(DatabaseHelper.loadAllOf(Lock.class)).isEmpty(); + assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Lock.class))).isEmpty(); + } + @TestOfyAndSql void testFailure_emptyResourceName() { IllegalArgumentException thrown = diff --git a/core/src/test/java/google/registry/request/lock/LockHandlerImplTest.java b/core/src/test/java/google/registry/request/lock/LockHandlerImplTest.java index d2c68ba43..ada2dacc1 100644 --- a/core/src/test/java/google/registry/request/lock/LockHandlerImplTest.java +++ b/core/src/test/java/google/registry/request/lock/LockHandlerImplTest.java @@ -40,7 +40,9 @@ final class LockHandlerImplTest { private final FakeClock clock = new FakeClock(DateTime.parse("2001-08-29T12:20:00Z")); - @RegisterExtension final AppEngineExtension appEngine = AppEngineExtension.builder().build(); + @RegisterExtension + final AppEngineExtension appEngine = + AppEngineExtension.builder().withDatastoreAndCloudSql().build(); private static class CountingCallable implements Callable { int numCalled = 0; @@ -69,18 +71,13 @@ final class LockHandlerImplTest { } private boolean executeWithLocks(Callable callable, final @Nullable Lock acquiredLock) { - LockHandlerImpl lockHandler = new LockHandlerImpl(new RequestStatusCheckerImpl(), clock) { - private static final long serialVersionUID = 0L; - @Override - Optional acquire(String resourceName, String tld, Duration leaseLength) { - assertThat(resourceName).isEqualTo("resourceName"); - assertThat(tld).isEqualTo("tld"); - assertThat(leaseLength).isEqualTo(ONE_DAY); - return Optional.ofNullable(acquiredLock); - } - }; + return createTestLockHandler(acquiredLock) + .executeWithLocks(callable, "tld", ONE_DAY, "resourceName"); + } - return lockHandler.executeWithLocks(callable, "tld", ONE_DAY, "resourceName"); + private boolean executeWithSqlLocks(Callable callable, final @Nullable Lock acquiredLock) { + return createTestLockHandler(acquiredLock) + .executeWithSqlLocks(callable, "tld", ONE_DAY, "resourceName"); } @Test @@ -92,6 +89,15 @@ final class LockHandlerImplTest { verify(lock, times(1)).release(); } + @Test + void testSqlLockSucceeds() { + Lock lock = mock(Lock.class); + CountingCallable countingCallable = new CountingCallable(); + assertThat(executeWithSqlLocks(countingCallable, lock)).isTrue(); + assertThat(countingCallable.numCalled).isEqualTo(1); + verify(lock, times(1)).release(); + } + @Test void testLockSucceeds_uncheckedException() { Lock lock = mock(Lock.class); @@ -140,4 +146,23 @@ final class LockHandlerImplTest { assertThat(executeWithLocks(countingCallable, lock)).isFalse(); assertThat(countingCallable.numCalled).isEqualTo(0); } + + private LockHandler createTestLockHandler(@Nullable Lock acquiredLock) { + return new LockHandlerImpl(new RequestStatusCheckerImpl(), clock) { + private static final long serialVersionUID = 0L; + + @Override + Optional acquire(String resourceName, String tld, Duration leaseLength) { + assertThat(resourceName).isEqualTo("resourceName"); + assertThat(tld).isEqualTo("tld"); + assertThat(leaseLength).isEqualTo(ONE_DAY); + return Optional.ofNullable(acquiredLock); + } + + @Override + Optional acquireSql(String resourceName, String tld, Duration leaseLength) { + return acquire(resourceName, tld, leaseLength); + } + }; + } } diff --git a/core/src/test/java/google/registry/testing/FakeLockHandler.java b/core/src/test/java/google/registry/testing/FakeLockHandler.java index cf42050d6..eb024921c 100644 --- a/core/src/test/java/google/registry/testing/FakeLockHandler.java +++ b/core/src/test/java/google/registry/testing/FakeLockHandler.java @@ -26,11 +26,11 @@ public class FakeLockHandler implements LockHandler { private static final long serialVersionUID = 6437880915118738492L; - boolean lockSucceeds; + private final boolean lockSucceeds; /** * @param lockSucceeds if true - the lock acquisition will succeed and the callable will be - * called. If false, lock acquisition will fail and the caller isn't called. + * called. If false, lock acquisition will fail and the caller isn't called. */ public FakeLockHandler(boolean lockSucceeds) { this.lockSucceeds = lockSucceeds; @@ -38,10 +38,17 @@ public class FakeLockHandler implements LockHandler { @Override public boolean executeWithLocks( - final Callable callable, - @Nullable String tld, - Duration leaseLength, - String... lockNames) { + Callable callable, @Nullable String tld, Duration leaseLength, String... lockNames) { + return execute(callable); + } + + @Override + public boolean executeWithSqlLocks( + Callable callable, @Nullable String tld, Duration leaseLength, String... lockNames) { + return execute(callable); + } + + private boolean execute(Callable callable) { if (!lockSucceeds) { return false; }