Do not retry transactions inside Beam (#2318)

This commit is contained in:
Lai Jiang 2024-02-05 13:40:56 -05:00 committed by GitHub
parent e492936cec
commit 009fda67b7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 23 additions and 2 deletions

View file

@ -209,10 +209,18 @@ public final class RegistryJpaIO {
@ProcessElement
public void processElement(OutputReceiver<T> outputReceiver) {
tm().transact(
// Note the use of no-retry transaction here. The results from the query are streamed to the
// output receiver inside the transaction, which cannot be rolled back in case of a retry,
// which in turn results in duplicate elements. If we try to pass the results to the output
// receiver outside the transaction, they have to be materialized into a list containing all
// the elements (without resorting to manual pagination) and greatly decrease the
// parallelism of the pipeline.
tm().transactNoRetry(
() -> {
query.stream().map(resultMapper::apply).forEach(outputReceiver::output);
});
return null;
},
null);
}
}
}

View file

@ -203,6 +203,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
return transact(work, null);
}
@Override
public <T> T transactNoRetry(
Callable<T> work, @Nullable TransactionIsolationLevel isolationLevel) {
if (inTransaction()) {

View file

@ -63,6 +63,18 @@ public interface TransactionManager {
*/
<T> T transact(Callable<T> work, TransactionIsolationLevel isolationLevel);
/**
* Executes the work in a transaction at the given {@link TransactionIsolationLevel} and returns
* the result, without retrying upon retryable exceptions.
*
* <p>This method should only be used when the transaction contains side effects that are not
* rolled back by the transaction manager, for example in {@link
* google.registry.beam.common.RegistryJpaIO} where the results from a query are streamed to the
* next transformation inside a transaction, as the result stream has to materialize to a list
* outside a transaction and doing so would greatly affect the parallelism of the pipeline.
*/
<T> T transactNoRetry(Callable<T> work, TransactionIsolationLevel isolationLevel);
/**
* Executes the work in a (potentially wrapped) transaction and returns the result.
*