Add a temporary fix to Hibernate detach in BEAM queries (#1314)

* Add a temporary fix to Hibernate detach in query

Make all queries in RegistryQuery (exclusively used by BEAM) use
EntityManager.clear() to detach entities. This is a temporary measure
that unblocks work in BEAM. We will revert the work once
JpaTransactionManager can detach entities properly for all types of
queries.

Also fixed regression bugs that broke query result streaming:
- The code that sets query fetch size was not carried over from
QueryComposer.
- The new ReadOnlyCheckingTypedQuery class did not override parent's
getResultStream() method, which calls getList().
This commit is contained in:
Weimin Yu 2021-09-10 21:23:07 -04:00 committed by GitHub
parent 20b73222d4
commit e75c1c33eb
2 changed files with 33 additions and 4 deletions

View file

@ -16,6 +16,7 @@ package google.registry.beam.common;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import google.registry.persistence.transaction.JpaTransactionManager;
import java.io.Serializable;
import java.util.Map;
import java.util.function.Supplier;
@ -28,6 +29,15 @@ import javax.persistence.criteria.CriteriaQuery;
/** Interface for query instances used by {@link RegistryJpaIO.Read}. */
public interface RegistryQuery<T> extends Serializable {
/**
* Number of JPA entities to fetch in each batch during a query.
*
* <p>With Hibernate, for result streaming to work, a query's fetchSize property must be set to a
* non-zero value.
*/
int QUERY_FETCH_SIZE = 1000;
Stream<T> stream();
interface CriteriaQuerySupplier<T> extends Supplier<CriteriaQuery<T>>, Serializable {}
@ -49,6 +59,7 @@ public interface RegistryQuery<T> extends Serializable {
if (parameters != null) {
parameters.forEach(query::setParameter);
}
JpaTransactionManager.setQueryFetchSize(query, QUERY_FETCH_SIZE);
@SuppressWarnings("unchecked")
Stream<T> resultStream = query.getResultStream();
return nativeQuery ? resultStream : resultStream.map(e -> detach(entityManager, e));
@ -64,11 +75,14 @@ public interface RegistryQuery<T> extends Serializable {
static <T> RegistryQuery<T> createQuery(
String jpql, @Nullable Map<String, Object> parameters, Class<T> clazz) {
return () -> {
TypedQuery<T> query = jpaTm().query(jpql, clazz);
// TODO(b/193662898): switch to jpaTm().query() when it can properly detach loaded entities.
EntityManager entityManager = jpaTm().getEntityManager();
TypedQuery<T> query = entityManager.createQuery(jpql, clazz);
if (parameters != null) {
parameters.forEach(query::setParameter);
}
return query.getResultStream();
JpaTransactionManager.setQueryFetchSize(query, QUERY_FETCH_SIZE);
return query.getResultStream().map(e -> detach(entityManager, e));
};
}
@ -82,7 +96,13 @@ public interface RegistryQuery<T> extends Serializable {
* @param <T> Type of each row in the result set.
*/
static <T> RegistryQuery<T> createQuery(CriteriaQuerySupplier<T> criteriaQuery) {
return () -> jpaTm().query(criteriaQuery.get()).getResultStream();
return () -> {
// TODO(b/193662898): switch to jpaTm().query() when it can properly detach loaded entities.
EntityManager entityManager = jpaTm().getEntityManager();
TypedQuery<T> query = entityManager.createQuery(criteriaQuery.get());
JpaTransactionManager.setQueryFetchSize(query, QUERY_FETCH_SIZE);
return query.getResultStream().map(e -> detach(entityManager, e));
};
}
/**
@ -108,7 +128,10 @@ public interface RegistryQuery<T> extends Serializable {
return;
}
try {
entityManager.detach(object);
// TODO(b/193662898): choose detach() or clear() based on the type of transaction.
// For context, EntityManager.detach() does not remove all metadata about loaded entities.
// See b/193925312 or https://hibernate.atlassian.net/browse/HHH-14735 for details.
entityManager.clear();
} catch (IllegalArgumentException e) {
// Not an entity. Do nothing.
}

View file

@ -21,6 +21,7 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import javax.persistence.FlushModeType;
import javax.persistence.LockModeType;
import javax.persistence.Parameter;
@ -41,6 +42,11 @@ class ReadOnlyCheckingTypedQuery<T> implements TypedQuery<T> {
return delegate.getResultList();
}
@Override
public Stream<T> getResultStream() {
return delegate.getResultStream();
}
@Override
public T getSingleResult() {
return delegate.getSingleResult();