diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java index 7be467b40..0d9b65d75 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -209,10 +209,18 @@ public final class RegistryJpaIO { @ProcessElement public void processElement(OutputReceiver outputReceiver) { - tm().transact( + // Note the use of no-retry transaction here. The results from the query are streamed to the + // output receiver inside the transaction, which cannot be rolled back in case of a retry, + // which in turn results in duplicate elements. If we try to pass the results to the output + // receiver outside the transaction, they have to be materialized into a list containing all + // the elements (without resorting to manual pagination) and greatly decrease the + // parallelism of the pipeline. + tm().transactNoRetry( () -> { query.stream().map(resultMapper::apply).forEach(outputReceiver::output); - }); + return null; + }, + null); } } } diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java index ce094bda3..82806444b 100644 --- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java +++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java @@ -203,6 +203,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { return transact(work, null); } + @Override public T transactNoRetry( Callable work, @Nullable TransactionIsolationLevel isolationLevel) { if (inTransaction()) { diff --git a/core/src/main/java/google/registry/persistence/transaction/TransactionManager.java b/core/src/main/java/google/registry/persistence/transaction/TransactionManager.java index 3c7b4b034..d96aba9b8 100644 --- a/core/src/main/java/google/registry/persistence/transaction/TransactionManager.java +++ b/core/src/main/java/google/registry/persistence/transaction/TransactionManager.java @@ -63,6 +63,18 @@ public interface TransactionManager { */ T transact(Callable work, TransactionIsolationLevel isolationLevel); + /** + * Executes the work in a transaction at the given {@link TransactionIsolationLevel} and returns + * the result, without retrying upon retryable exceptions. + * + *

This method should only be used when the transaction contains side effects that are not + * rolled back by the transaction manager, for example in {@link + * google.registry.beam.common.RegistryJpaIO} where the results from a query are streamed to the + * next transformation inside a transaction, as the result stream has to materialize to a list + * outside a transaction and doing so would greatly affect the parallelism of the pipeline. + */ + T transactNoRetry(Callable work, TransactionIsolationLevel isolationLevel); + /** * Executes the work in a (potentially wrapped) transaction and returns the result. *