Make transaction isolation level the first argument to transact() (#2329)

This makes the callsites look neater, as the work to execute itself is often a
many line lambda, whereas the transaction isolation level is not more than a
couple dozen characters.
This commit is contained in:
Ben McIlwain 2024-02-16 19:07:48 -05:00 committed by GitHub
parent 08bcf579a5
commit 7a301edab7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 58 additions and 42 deletions

View file

@ -219,8 +219,7 @@ public final class RegistryJpaIO {
() -> { () -> {
query.stream().map(resultMapper::apply).forEach(outputReceiver::output); query.stream().map(resultMapper::apply).forEach(outputReceiver::output);
return null; return null;
}, });
null);
} }
} }
} }

View file

@ -37,12 +37,12 @@ public final class BsaTransactions {
@CanIgnoreReturnValue @CanIgnoreReturnValue
public static <T> T bsaTransact(Callable<T> work) { public static <T> T bsaTransact(Callable<T> work) {
verify(!isInTransaction(), "May only be used for top-level transactions."); verify(!isInTransaction(), "May only be used for top-level transactions.");
return tm().transact(work, TRANSACTION_REPEATABLE_READ); return tm().transact(TRANSACTION_REPEATABLE_READ, work);
} }
public static void bsaTransact(ThrowingRunnable work) { public static void bsaTransact(ThrowingRunnable work) {
verify(!isInTransaction(), "May only be used for top-level transactions."); verify(!isInTransaction(), "May only be used for top-level transactions.");
tm().transact(work, TRANSACTION_REPEATABLE_READ); tm().transact(TRANSACTION_REPEATABLE_READ, work);
} }
@CanIgnoreReturnValue @CanIgnoreReturnValue

View file

@ -67,6 +67,7 @@ public final class LabelDiffUpdates {
labels.stream().collect(groupingBy(BlockLabel::labelType, toImmutableList()))); labels.stream().collect(groupingBy(BlockLabel::labelType, toImmutableList())));
tm().transact( tm().transact(
TRANSACTION_REPEATABLE_READ,
() -> { () -> {
for (Map.Entry<LabelType, ImmutableList<BlockLabel>> entry : for (Map.Entry<LabelType, ImmutableList<BlockLabel>> entry :
labelsByType.entrySet()) { labelsByType.entrySet()) {
@ -128,8 +129,7 @@ public final class LabelDiffUpdates {
break; break;
} }
} }
}, });
TRANSACTION_REPEATABLE_READ);
logger.atInfo().log("Processed %s of labels.", labels.size()); logger.atInfo().log("Processed %s of labels.", labels.size());
return nonBlockedDomains.build(); return nonBlockedDomains.build();
} }

View file

@ -81,6 +81,7 @@ public class FlowRunner {
// TODO(mcilwain/weiminyu): Use transactReadOnly() here for TransactionalFlow and transact() // TODO(mcilwain/weiminyu): Use transactReadOnly() here for TransactionalFlow and transact()
// for MutatingFlow. // for MutatingFlow.
return tm().transact( return tm().transact(
isolationLevelOverride.orElse(null),
() -> { () -> {
try { try {
EppOutput output = EppOutput.create(flowProvider.get().run()); EppOutput output = EppOutput.create(flowProvider.get().run());
@ -96,8 +97,7 @@ public class FlowRunner {
} catch (EppException e) { } catch (EppException e) {
throw new EppRuntimeException(e); throw new EppRuntimeException(e);
} }
}, });
isolationLevelOverride.orElse(null));
} catch (DryRunException e) { } catch (DryRunException e) {
return e.output; return e.output;
} catch (EppRuntimeException e) { } catch (EppRuntimeException e) {

View file

@ -175,14 +175,14 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
public <T> T reTransact(Callable<T> work) { public <T> T reTransact(Callable<T> work) {
// This prevents inner transaction from retrying, thus avoiding a cascade retry effect. // This prevents inner transaction from retrying, thus avoiding a cascade retry effect.
if (inTransaction()) { if (inTransaction()) {
return transactNoRetry(work, null); return transactNoRetry(null, work);
} }
return retrier.callWithRetry( return retrier.callWithRetry(
() -> transactNoRetry(work, null), JpaRetries::isFailedTxnRetriable); () -> transactNoRetry(null, work), JpaRetries::isFailedTxnRetriable);
} }
@Override @Override
public <T> T transact(Callable<T> work, TransactionIsolationLevel isolationLevel) { public <T> T transact(TransactionIsolationLevel isolationLevel, Callable<T> work) {
if (inTransaction()) { if (inTransaction()) {
if (!getHibernateAllowNestedTransactions()) { if (!getHibernateAllowNestedTransactions()) {
throw new IllegalStateException(NESTED_TRANSACTION_MESSAGE); throw new IllegalStateException(NESTED_TRANSACTION_MESSAGE);
@ -192,20 +192,25 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
logger.atWarning().withStackTrace(StackSize.MEDIUM).log(NESTED_TRANSACTION_MESSAGE); logger.atWarning().withStackTrace(StackSize.MEDIUM).log(NESTED_TRANSACTION_MESSAGE);
} }
// This prevents inner transaction from retrying, thus avoiding a cascade retry effect. // This prevents inner transaction from retrying, thus avoiding a cascade retry effect.
return transactNoRetry(work, isolationLevel); return transactNoRetry(isolationLevel, work);
} }
return retrier.callWithRetry( return retrier.callWithRetry(
() -> transactNoRetry(work, isolationLevel), JpaRetries::isFailedTxnRetriable); () -> transactNoRetry(isolationLevel, work), JpaRetries::isFailedTxnRetriable);
} }
@Override @Override
public <T> T transact(Callable<T> work) { public <T> T transact(Callable<T> work) {
return transact(work, null); return transact(null, work);
}
@Override
public <T> T transactNoRetry(Callable<T> work) {
return transactNoRetry(null, work);
} }
@Override @Override
public <T> T transactNoRetry( public <T> T transactNoRetry(
Callable<T> work, @Nullable TransactionIsolationLevel isolationLevel) { @Nullable TransactionIsolationLevel isolationLevel, Callable<T> work) {
if (inTransaction()) { if (inTransaction()) {
// This check will no longer be necessary when the transact() method always throws // This check will no longer be necessary when the transact() method always throws
// inside a nested transaction, as the only way to pass a non-null isolation level // inside a nested transaction, as the only way to pass a non-null isolation level
@ -266,18 +271,18 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
} }
@Override @Override
public void transact(ThrowingRunnable work, TransactionIsolationLevel isolationLevel) { public void transact(TransactionIsolationLevel isolationLevel, ThrowingRunnable work) {
transact( transact(
isolationLevel,
() -> { () -> {
work.run(); work.run();
return null; return null;
}, });
isolationLevel);
} }
@Override @Override
public void transact(ThrowingRunnable work) { public void transact(ThrowingRunnable work) {
transact(work, null); transact(null, work);
} }
@Override @Override

View file

@ -61,7 +61,19 @@ public interface TransactionManager {
* Executes the work in a transaction at the given {@link TransactionIsolationLevel} and returns * Executes the work in a transaction at the given {@link TransactionIsolationLevel} and returns
* the result. * the result.
*/ */
<T> T transact(Callable<T> work, TransactionIsolationLevel isolationLevel); <T> T transact(TransactionIsolationLevel isolationLevel, Callable<T> work);
/**
* Executes the work in a transaction and returns the result, without retrying upon retryable
* exceptions.
*
* <p>This method should only be used when the transaction contains side effects that are not
* rolled back by the transaction manager, for example in {@link
* google.registry.beam.common.RegistryJpaIO} where the results from a query are streamed to the
* next transformation inside a transaction, as the result stream has to materialize to a list
* outside a transaction and doing so would greatly affect the parallelism of the pipeline.
*/
<T> T transactNoRetry(Callable<T> work);
/** /**
* Executes the work in a transaction at the given {@link TransactionIsolationLevel} and returns * Executes the work in a transaction at the given {@link TransactionIsolationLevel} and returns
@ -73,7 +85,7 @@ public interface TransactionManager {
* next transformation inside a transaction, as the result stream has to materialize to a list * next transformation inside a transaction, as the result stream has to materialize to a list
* outside a transaction and doing so would greatly affect the parallelism of the pipeline. * outside a transaction and doing so would greatly affect the parallelism of the pipeline.
*/ */
<T> T transactNoRetry(Callable<T> work, TransactionIsolationLevel isolationLevel); <T> T transactNoRetry(TransactionIsolationLevel isolationLevel, Callable<T> work);
/** /**
* Executes the work in a (potentially wrapped) transaction and returns the result. * Executes the work in a (potentially wrapped) transaction and returns the result.
@ -95,7 +107,7 @@ public interface TransactionManager {
void transact(ThrowingRunnable work); void transact(ThrowingRunnable work);
/** Executes the work in a transaction at the given {@link TransactionIsolationLevel}. */ /** Executes the work in a transaction at the given {@link TransactionIsolationLevel}. */
void transact(ThrowingRunnable work, TransactionIsolationLevel isolationLevel); void transact(TransactionIsolationLevel isolationLevel, ThrowingRunnable work);
/** /**
* Executes the work in a (potentially wrapped) transaction and returns the result. * Executes the work in a (potentially wrapped) transaction and returns the result.

View file

@ -97,7 +97,7 @@ public class RefreshDnsForAllDomainsAction implements Runnable {
public void run() { public void run() {
assertTldsExist(tlds); assertTldsExist(tlds);
checkArgument(batchSize > 0, "Must specify a positive number for batch size"); checkArgument(batchSize > 0, "Must specify a positive number for batch size");
Duration smear = tm().transact(this::calculateSmear, TRANSACTION_REPEATABLE_READ); Duration smear = tm().transact(TRANSACTION_REPEATABLE_READ, this::calculateSmear);
ImmutableList<String> domainsBatch; ImmutableList<String> domainsBatch;
@Nullable String lastInPreviousBatch = null; @Nullable String lastInPreviousBatch = null;
@ -105,7 +105,7 @@ public class RefreshDnsForAllDomainsAction implements Runnable {
Optional<String> lastInPreviousBatchOpt = Optional.ofNullable(lastInPreviousBatch); Optional<String> lastInPreviousBatchOpt = Optional.ofNullable(lastInPreviousBatch);
domainsBatch = domainsBatch =
tm().transact( tm().transact(
() -> refreshBatch(lastInPreviousBatchOpt, smear), TRANSACTION_REPEATABLE_READ); TRANSACTION_REPEATABLE_READ, () -> refreshBatch(lastInPreviousBatchOpt, smear));
lastInPreviousBatch = domainsBatch.isEmpty() ? null : getLast(domainsBatch); lastInPreviousBatch = domainsBatch.isEmpty() ? null : getLast(domainsBatch);
} while (domainsBatch.size() == batchSize); } while (domainsBatch.size() == batchSize);
} }

View file

@ -150,15 +150,15 @@ class JpaTransactionManagerImplTest {
void transact_setIsolationLevel() { void transact_setIsolationLevel() {
// If not specified, run at the default isolation level. // If not specified, run at the default isolation level.
tm().transact( tm().transact(
() -> assertTransactionIsolationLevel(tm().getDefaultTransactionIsolationLevel()), null,
null); () -> assertTransactionIsolationLevel(tm().getDefaultTransactionIsolationLevel()));
tm().transact( tm().transact(
() -> assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED), TRANSACTION_READ_UNCOMMITTED,
TRANSACTION_READ_UNCOMMITTED); () -> assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED));
// Make sure that we can start a new transaction on the same thread at a different level. // Make sure that we can start a new transaction on the same thread at a different level.
tm().transact( tm().transact(
() -> assertTransactionIsolationLevel(TRANSACTION_REPEATABLE_READ), TRANSACTION_REPEATABLE_READ,
TRANSACTION_REPEATABLE_READ); () -> assertTransactionIsolationLevel(TRANSACTION_REPEATABLE_READ));
} }
@Test @Test
@ -188,12 +188,12 @@ class JpaTransactionManagerImplTest {
}); });
// reTransact() respects enclosing transaction's isolation level. // reTransact() respects enclosing transaction's isolation level.
tm().transact( tm().transact(
TRANSACTION_READ_UNCOMMITTED,
() -> { () -> {
assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED); assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED);
tm().reTransact( tm().reTransact(
() -> assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED)); () -> assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED));
}, });
TRANSACTION_READ_UNCOMMITTED);
} }
} }
@ -219,7 +219,7 @@ class JpaTransactionManagerImplTest {
() -> { () -> {
assertTransactionIsolationLevel( assertTransactionIsolationLevel(
tm().getDefaultTransactionIsolationLevel()); tm().getDefaultTransactionIsolationLevel());
tm().transact(() -> null, TRANSACTION_READ_COMMITTED); tm().transact(TRANSACTION_READ_COMMITTED, () -> null);
})); }));
assertThat(thrown).hasMessageThat().contains("cannot be specified"); assertThat(thrown).hasMessageThat().contains("cannot be specified");
// reTransact() allowed in nested transactions. // reTransact() allowed in nested transactions.
@ -233,12 +233,12 @@ class JpaTransactionManagerImplTest {
}); });
// reTransact() respects enclosing transaction's isolation level. // reTransact() respects enclosing transaction's isolation level.
tm().transact( tm().transact(
TRANSACTION_READ_UNCOMMITTED,
() -> { () -> {
assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED); assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED);
tm().reTransact( tm().reTransact(
() -> assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED)); () -> assertTransactionIsolationLevel(TRANSACTION_READ_UNCOMMITTED));
}, });
TRANSACTION_READ_UNCOMMITTED);
} }
} }
@ -308,18 +308,18 @@ class JpaTransactionManagerImplTest {
tm.transact( tm.transact(
() -> { () -> {
tm.transactNoRetry( tm.transactNoRetry(
null,
() -> { () -> {
assertTransactionIsolationLevel(tm.getDefaultTransactionIsolationLevel()); assertTransactionIsolationLevel(tm.getDefaultTransactionIsolationLevel());
return null; return null;
}, });
null);
}); });
// Calling transactNoRetry() with an isolation level override inside a transaction is not // Calling transactNoRetry() with an isolation level override inside a transaction is not
// allowed. // allowed.
IllegalStateException thrown = IllegalStateException thrown =
assertThrows( assertThrows(
IllegalStateException.class, IllegalStateException.class,
() -> tm.transact(() -> tm.transactNoRetry(() -> null, TRANSACTION_READ_UNCOMMITTED))); () -> tm.transact(() -> tm.transactNoRetry(TRANSACTION_READ_UNCOMMITTED, () -> null)));
assertThat(thrown).hasMessageThat().contains("cannot be specified"); assertThat(thrown).hasMessageThat().contains("cannot be specified");
} }
@ -328,19 +328,19 @@ class JpaTransactionManagerImplTest {
JpaTransactionManagerImpl spyJpaTm = spy((JpaTransactionManagerImpl) tm()); JpaTransactionManagerImpl spyJpaTm = spy((JpaTransactionManagerImpl) tm());
doThrow(OptimisticLockException.class).when(spyJpaTm).delete(any(VKey.class)); doThrow(OptimisticLockException.class).when(spyJpaTm).delete(any(VKey.class));
spyJpaTm.transactNoRetry( spyJpaTm.transactNoRetry(
null,
() -> { () -> {
spyJpaTm.insert(theEntity); spyJpaTm.insert(theEntity);
return null; return null;
}, });
null);
Executable transaction = Executable transaction =
() -> () ->
spyJpaTm.transactNoRetry( spyJpaTm.transactNoRetry(
null,
() -> { () -> {
spyJpaTm.delete(theEntityKey); spyJpaTm.delete(theEntityKey);
return null; return null;
}, });
null);
assertThrows(OptimisticLockException.class, transaction); assertThrows(OptimisticLockException.class, transaction);
verify(spyJpaTm, times(1)).delete(theEntityKey); verify(spyJpaTm, times(1)).delete(theEntityKey);
assertThrows(OptimisticLockException.class, transaction); assertThrows(OptimisticLockException.class, transaction);