diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java index 0d9b65d75..f0c79321d 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -219,8 +219,7 @@ public final class RegistryJpaIO { () -> { query.stream().map(resultMapper::apply).forEach(outputReceiver::output); return null; - }, - null); + }); } } } diff --git a/core/src/main/java/google/registry/bsa/BsaTransactions.java b/core/src/main/java/google/registry/bsa/BsaTransactions.java index 566a9434f..ebeb9ea34 100644 --- a/core/src/main/java/google/registry/bsa/BsaTransactions.java +++ b/core/src/main/java/google/registry/bsa/BsaTransactions.java @@ -37,12 +37,12 @@ public final class BsaTransactions { @CanIgnoreReturnValue public static T bsaTransact(Callable work) { verify(!isInTransaction(), "May only be used for top-level transactions."); - return tm().transact(work, TRANSACTION_REPEATABLE_READ); + return tm().transact(TRANSACTION_REPEATABLE_READ, work); } public static void bsaTransact(ThrowingRunnable work) { verify(!isInTransaction(), "May only be used for top-level transactions."); - tm().transact(work, TRANSACTION_REPEATABLE_READ); + tm().transact(TRANSACTION_REPEATABLE_READ, work); } @CanIgnoreReturnValue diff --git a/core/src/main/java/google/registry/bsa/persistence/LabelDiffUpdates.java b/core/src/main/java/google/registry/bsa/persistence/LabelDiffUpdates.java index 8d36c140f..e2c33f54a 100644 --- a/core/src/main/java/google/registry/bsa/persistence/LabelDiffUpdates.java +++ b/core/src/main/java/google/registry/bsa/persistence/LabelDiffUpdates.java @@ -67,6 +67,7 @@ public final class LabelDiffUpdates { labels.stream().collect(groupingBy(BlockLabel::labelType, toImmutableList()))); tm().transact( + TRANSACTION_REPEATABLE_READ, () -> { for (Map.Entry> entry : labelsByType.entrySet()) { @@ -128,8 +129,7 @@ public final class LabelDiffUpdates { break; } } - }, - TRANSACTION_REPEATABLE_READ); + }); logger.atInfo().log("Processed %s of labels.", labels.size()); return nonBlockedDomains.build(); } diff --git a/core/src/main/java/google/registry/flows/FlowRunner.java b/core/src/main/java/google/registry/flows/FlowRunner.java index d1ea079e6..1872c384a 100644 --- a/core/src/main/java/google/registry/flows/FlowRunner.java +++ b/core/src/main/java/google/registry/flows/FlowRunner.java @@ -81,6 +81,7 @@ public class FlowRunner { // TODO(mcilwain/weiminyu): Use transactReadOnly() here for TransactionalFlow and transact() // for MutatingFlow. return tm().transact( + isolationLevelOverride.orElse(null), () -> { try { EppOutput output = EppOutput.create(flowProvider.get().run()); @@ -96,8 +97,7 @@ public class FlowRunner { } catch (EppException e) { throw new EppRuntimeException(e); } - }, - isolationLevelOverride.orElse(null)); + }); } catch (DryRunException e) { return e.output; } catch (EppRuntimeException e) { diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java index 82806444b..49ad62d6b 100644 --- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java +++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java @@ -175,14 +175,14 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { public T reTransact(Callable work) { // This prevents inner transaction from retrying, thus avoiding a cascade retry effect. if (inTransaction()) { - return transactNoRetry(work, null); + return transactNoRetry(null, work); } return retrier.callWithRetry( - () -> transactNoRetry(work, null), JpaRetries::isFailedTxnRetriable); + () -> transactNoRetry(null, work), JpaRetries::isFailedTxnRetriable); } @Override - public T transact(Callable work, TransactionIsolationLevel isolationLevel) { + public T transact(TransactionIsolationLevel isolationLevel, Callable work) { if (inTransaction()) { if (!getHibernateAllowNestedTransactions()) { throw new IllegalStateException(NESTED_TRANSACTION_MESSAGE); @@ -192,20 +192,25 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { logger.atWarning().withStackTrace(StackSize.MEDIUM).log(NESTED_TRANSACTION_MESSAGE); } // This prevents inner transaction from retrying, thus avoiding a cascade retry effect. - return transactNoRetry(work, isolationLevel); + return transactNoRetry(isolationLevel, work); } return retrier.callWithRetry( - () -> transactNoRetry(work, isolationLevel), JpaRetries::isFailedTxnRetriable); + () -> transactNoRetry(isolationLevel, work), JpaRetries::isFailedTxnRetriable); } @Override public T transact(Callable work) { - return transact(work, null); + return transact(null, work); + } + + @Override + public T transactNoRetry(Callable work) { + return transactNoRetry(null, work); } @Override public T transactNoRetry( - Callable work, @Nullable TransactionIsolationLevel isolationLevel) { + @Nullable TransactionIsolationLevel isolationLevel, Callable work) { if (inTransaction()) { // This check will no longer be necessary when the transact() method always throws // inside a nested transaction, as the only way to pass a non-null isolation level @@ -266,18 +271,18 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { } @Override - public void transact(ThrowingRunnable work, TransactionIsolationLevel isolationLevel) { + public void transact(TransactionIsolationLevel isolationLevel, ThrowingRunnable work) { transact( + isolationLevel, () -> { work.run(); return null; - }, - isolationLevel); + }); } @Override public void transact(ThrowingRunnable work) { - transact(work, null); + transact(null, work); } @Override diff --git a/core/src/main/java/google/registry/persistence/transaction/TransactionManager.java b/core/src/main/java/google/registry/persistence/transaction/TransactionManager.java index d96aba9b8..adae5c7fe 100644 --- a/core/src/main/java/google/registry/persistence/transaction/TransactionManager.java +++ b/core/src/main/java/google/registry/persistence/transaction/TransactionManager.java @@ -61,7 +61,19 @@ public interface TransactionManager { * Executes the work in a transaction at the given {@link TransactionIsolationLevel} and returns * the result. */ - T transact(Callable work, TransactionIsolationLevel isolationLevel); + T transact(TransactionIsolationLevel isolationLevel, Callable work); + + /** + * Executes the work in a transaction and returns the result, without retrying upon retryable + * exceptions. + * + *

This method should only be used when the transaction contains side effects that are not + * rolled back by the transaction manager, for example in {@link + * google.registry.beam.common.RegistryJpaIO} where the results from a query are streamed to the + * next transformation inside a transaction, as the result stream has to materialize to a list + * outside a transaction and doing so would greatly affect the parallelism of the pipeline. + */ + T transactNoRetry(Callable work); /** * Executes the work in a transaction at the given {@link TransactionIsolationLevel} and returns @@ -73,7 +85,7 @@ public interface TransactionManager { * next transformation inside a transaction, as the result stream has to materialize to a list * outside a transaction and doing so would greatly affect the parallelism of the pipeline. */ - T transactNoRetry(Callable work, TransactionIsolationLevel isolationLevel); + T transactNoRetry(TransactionIsolationLevel isolationLevel, Callable work); /** * Executes the work in a (potentially wrapped) transaction and returns the result. @@ -95,7 +107,7 @@ public interface TransactionManager { void transact(ThrowingRunnable work); /** Executes the work in a transaction at the given {@link TransactionIsolationLevel}. */ - void transact(ThrowingRunnable work, TransactionIsolationLevel isolationLevel); + void transact(TransactionIsolationLevel isolationLevel, ThrowingRunnable work); /** * Executes the work in a (potentially wrapped) transaction and returns the result. diff --git a/core/src/main/java/google/registry/tools/server/RefreshDnsForAllDomainsAction.java b/core/src/main/java/google/registry/tools/server/RefreshDnsForAllDomainsAction.java index 7512c31a3..b43e296fa 100644 --- a/core/src/main/java/google/registry/tools/server/RefreshDnsForAllDomainsAction.java +++ b/core/src/main/java/google/registry/tools/server/RefreshDnsForAllDomainsAction.java @@ -97,7 +97,7 @@ public class RefreshDnsForAllDomainsAction implements Runnable { public void run() { assertTldsExist(tlds); checkArgument(batchSize > 0, "Must specify a positive number for batch size"); - Duration smear = tm().transact(this::calculateSmear, TRANSACTION_REPEATABLE_READ); + Duration smear = tm().transact(TRANSACTION_REPEATABLE_READ, this::calculateSmear); ImmutableList domainsBatch; @Nullable String lastInPreviousBatch = null; @@ -105,7 +105,7 @@ public class RefreshDnsForAllDomainsAction implements Runnable { Optional lastInPreviousBatchOpt = Optional.ofNullable(lastInPreviousBatch); domainsBatch = tm().transact( - () -> refreshBatch(lastInPreviousBatchOpt, smear), TRANSACTION_REPEATABLE_READ); + TRANSACTION_REPEATABLE_READ, () -> refreshBatch(lastInPreviousBatchOpt, smear)); lastInPreviousBatch = domainsBatch.isEmpty() ? null : getLast(domainsBatch); } while (domainsBatch.size() == batchSize); } diff --git a/core/src/test/java/google/registry/persistence/transaction/JpaTransactionManagerImplTest.java b/core/src/test/java/google/registry/persistence/transaction/JpaTransactionManagerImplTest.java index 4791bb0ce..17ea806fa 100644 --- a/core/src/test/java/google/registry/persistence/transaction/JpaTransactionManagerImplTest.java +++ b/core/src/test/java/google/registry/persistence/transaction/JpaTransactionManagerImplTest.java @@ -150,15 +150,15 @@ class JpaTransactionManagerImplTest { void transact_setIsolationLevel() { // If not specified, run at the default isolation level. tm().transact( - () -> assertTransactionIsolationLevel(tm().getDefaultTransactionIsolationLevel()), - null); + null, + () -> assertTransactionIsolationLevel(tm().getDefaultTransactionIsolationLevel())); tm().transact( - () -> assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED), - TRANSACTION_READ_UNCOMMITTED); + TRANSACTION_READ_UNCOMMITTED, + () -> assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED)); // Make sure that we can start a new transaction on the same thread at a different level. tm().transact( - () -> assertTransactionIsolationLevel(TRANSACTION_REPEATABLE_READ), - TRANSACTION_REPEATABLE_READ); + TRANSACTION_REPEATABLE_READ, + () -> assertTransactionIsolationLevel(TRANSACTION_REPEATABLE_READ)); } @Test @@ -188,12 +188,12 @@ class JpaTransactionManagerImplTest { }); // reTransact() respects enclosing transaction's isolation level. tm().transact( + TRANSACTION_READ_UNCOMMITTED, () -> { assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED); tm().reTransact( () -> assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED)); - }, - TRANSACTION_READ_UNCOMMITTED); + }); } } @@ -219,7 +219,7 @@ class JpaTransactionManagerImplTest { () -> { assertTransactionIsolationLevel( tm().getDefaultTransactionIsolationLevel()); - tm().transact(() -> null, TRANSACTION_READ_COMMITTED); + tm().transact(TRANSACTION_READ_COMMITTED, () -> null); })); assertThat(thrown).hasMessageThat().contains("cannot be specified"); // reTransact() allowed in nested transactions. @@ -233,12 +233,12 @@ class JpaTransactionManagerImplTest { }); // reTransact() respects enclosing transaction's isolation level. tm().transact( + TRANSACTION_READ_UNCOMMITTED, () -> { assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED); tm().reTransact( () -> assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED)); - }, - TRANSACTION_READ_UNCOMMITTED); + }); } } @@ -308,18 +308,18 @@ class JpaTransactionManagerImplTest { tm.transact( () -> { tm.transactNoRetry( + null, () -> { assertTransactionIsolationLevel(tm.getDefaultTransactionIsolationLevel()); return null; - }, - null); + }); }); // Calling transactNoRetry() with an isolation level override inside a transaction is not // allowed. IllegalStateException thrown = assertThrows( IllegalStateException.class, - () -> tm.transact(() -> tm.transactNoRetry(() -> null, TRANSACTION_READ_UNCOMMITTED))); + () -> tm.transact(() -> tm.transactNoRetry(TRANSACTION_READ_UNCOMMITTED, () -> null))); assertThat(thrown).hasMessageThat().contains("cannot be specified"); } @@ -328,19 +328,19 @@ class JpaTransactionManagerImplTest { JpaTransactionManagerImpl spyJpaTm = spy((JpaTransactionManagerImpl) tm()); doThrow(OptimisticLockException.class).when(spyJpaTm).delete(any(VKey.class)); spyJpaTm.transactNoRetry( + null, () -> { spyJpaTm.insert(theEntity); return null; - }, - null); + }); Executable transaction = () -> spyJpaTm.transactNoRetry( + null, () -> { spyJpaTm.delete(theEntityKey); return null; - }, - null); + }); assertThrows(OptimisticLockException.class, transaction); verify(spyJpaTm, times(1)).delete(theEntityKey); assertThrows(OptimisticLockException.class, transaction);