diff --git a/core/src/main/java/google/registry/beam/common/RegistryQuery.java b/core/src/main/java/google/registry/beam/common/RegistryQuery.java index f61dc01d5..dba1cf5b3 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryQuery.java +++ b/core/src/main/java/google/registry/beam/common/RegistryQuery.java @@ -16,6 +16,7 @@ package google.registry.beam.common; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import google.registry.persistence.transaction.JpaTransactionManager; import java.io.Serializable; import java.util.Map; import java.util.function.Supplier; @@ -28,6 +29,15 @@ import javax.persistence.criteria.CriteriaQuery; /** Interface for query instances used by {@link RegistryJpaIO.Read}. */ public interface RegistryQuery extends Serializable { + + /** + * Number of JPA entities to fetch in each batch during a query. + * + *

With Hibernate, for result streaming to work, a query's fetchSize property must be set to a + * non-zero value. + */ + int QUERY_FETCH_SIZE = 1000; + Stream stream(); interface CriteriaQuerySupplier extends Supplier>, Serializable {} @@ -49,6 +59,7 @@ public interface RegistryQuery extends Serializable { if (parameters != null) { parameters.forEach(query::setParameter); } + JpaTransactionManager.setQueryFetchSize(query, QUERY_FETCH_SIZE); @SuppressWarnings("unchecked") Stream resultStream = query.getResultStream(); return nativeQuery ? resultStream : resultStream.map(e -> detach(entityManager, e)); @@ -64,11 +75,14 @@ public interface RegistryQuery extends Serializable { static RegistryQuery createQuery( String jpql, @Nullable Map parameters, Class clazz) { return () -> { - TypedQuery query = jpaTm().query(jpql, clazz); + // TODO(b/193662898): switch to jpaTm().query() when it can properly detach loaded entities. + EntityManager entityManager = jpaTm().getEntityManager(); + TypedQuery query = entityManager.createQuery(jpql, clazz); if (parameters != null) { parameters.forEach(query::setParameter); } - return query.getResultStream(); + JpaTransactionManager.setQueryFetchSize(query, QUERY_FETCH_SIZE); + return query.getResultStream().map(e -> detach(entityManager, e)); }; } @@ -82,7 +96,13 @@ public interface RegistryQuery extends Serializable { * @param Type of each row in the result set. */ static RegistryQuery createQuery(CriteriaQuerySupplier criteriaQuery) { - return () -> jpaTm().query(criteriaQuery.get()).getResultStream(); + return () -> { + // TODO(b/193662898): switch to jpaTm().query() when it can properly detach loaded entities. + EntityManager entityManager = jpaTm().getEntityManager(); + TypedQuery query = entityManager.createQuery(criteriaQuery.get()); + JpaTransactionManager.setQueryFetchSize(query, QUERY_FETCH_SIZE); + return query.getResultStream().map(e -> detach(entityManager, e)); + }; } /** @@ -108,7 +128,10 @@ public interface RegistryQuery extends Serializable { return; } try { - entityManager.detach(object); + // TODO(b/193662898): choose detach() or clear() based on the type of transaction. + // For context, EntityManager.detach() does not remove all metadata about loaded entities. + // See b/193925312 or https://hibernate.atlassian.net/browse/HHH-14735 for details. + entityManager.clear(); } catch (IllegalArgumentException e) { // Not an entity. Do nothing. } diff --git a/core/src/main/java/google/registry/persistence/transaction/ReadOnlyCheckingTypedQuery.java b/core/src/main/java/google/registry/persistence/transaction/ReadOnlyCheckingTypedQuery.java index dcda3a136..f0077c835 100644 --- a/core/src/main/java/google/registry/persistence/transaction/ReadOnlyCheckingTypedQuery.java +++ b/core/src/main/java/google/registry/persistence/transaction/ReadOnlyCheckingTypedQuery.java @@ -21,6 +21,7 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Stream; import javax.persistence.FlushModeType; import javax.persistence.LockModeType; import javax.persistence.Parameter; @@ -41,6 +42,11 @@ class ReadOnlyCheckingTypedQuery implements TypedQuery { return delegate.getResultList(); } + @Override + public Stream getResultStream() { + return delegate.getResultStream(); + } + @Override public T getSingleResult() { return delegate.getSingleResult();