From b9027047c40332d877c3313b6322375f19edbfc5 Mon Sep 17 00:00:00 2001 From: Weimin Yu Date: Fri, 14 May 2021 15:19:12 -0400 Subject: [PATCH] Fix RegistryJpaIO.Read problem with large data (#1161) * Fix RegistryJpaIO.Read problem with large data The read connector needs to detach loaded entities. This is now the default behavior in QueryComposer Also removed the 'transaction mode' property from the Read connector. There are no obvious use cases for non-transaction query, and implementation is not straightforward with the current code base. Also changed the return type of QueryComposer.list() to ImmutableList. --- .../registry/beam/common/RegistryJpaIO.java | 22 +++------------ .../ofy/DatastoreTransactionManager.java | 4 +-- .../JpaTransactionManagerImpl.java | 27 +++++++++++++++---- .../transaction/QueryComposer.java | 13 ++++++++- .../registry/tools/GenerateLordnCommand.java | 1 + .../BackfillSpec11ThreatMatchesCommand.java | 9 +++---- .../beam/common/RegistryJpaReadTest.java | 2 +- 7 files changed, 46 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java index b224e2547..fc541ea06 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -80,9 +80,9 @@ public final class RegistryJpaIO { extends SerializableFunction> {} /** - * A {@link PTransform transform} that executes a JPA {@link CriteriaQuery} and adds the results - * to the BEAM pipeline. Users have the option to transform the results before sending them to the - * next stages. + * A {@link PTransform transform} that transactionally executes a JPA {@link CriteriaQuery} and + * adds the results to the BEAM pipeline. Users have the option to transform the results before + * sending them to the next stages. */ @AutoValue public abstract static class Read extends PTransform> { @@ -95,8 +95,6 @@ public final class RegistryJpaIO { abstract SerializableFunction resultMapper(); - abstract TransactionMode transactionMode(); - abstract Coder coder(); abstract Builder toBuilder(); @@ -121,10 +119,6 @@ public final class RegistryJpaIO { return toBuilder().resultMapper(mapper).build(); } - public Read withTransactionMode(TransactionMode transactionMode) { - return toBuilder().transactionMode(transactionMode).build(); - } - public Read withCoder(Coder coder) { return toBuilder().coder(coder).build(); } @@ -133,7 +127,6 @@ public final class RegistryJpaIO { return new AutoValue_RegistryJpaIO_Read.Builder() .name(DEFAULT_NAME) .resultMapper(x -> x) - .transactionMode(TransactionMode.TRANSACTIONAL) .coder(SerializableCoder.of(Serializable.class)); } @@ -146,8 +139,6 @@ public final class RegistryJpaIO { abstract Builder resultMapper(SerializableFunction mapper); - abstract Builder transactionMode(TransactionMode transactionMode); - abstract Builder coder(Coder coder); abstract Read build(); @@ -171,17 +162,12 @@ public final class RegistryJpaIO { jpaTm() .transactNoRetry( () -> - querySupplier.apply(jpaTm()).stream() + querySupplier.apply(jpaTm()).withAutoDetachOnLoad(true).stream() .map(resultMapper::apply) .forEach(outputReceiver::output)); } } } - - public enum TransactionMode { - NOT_TRANSACTIONAL, - TRANSACTIONAL; - } } /** diff --git a/core/src/main/java/google/registry/model/ofy/DatastoreTransactionManager.java b/core/src/main/java/google/registry/model/ofy/DatastoreTransactionManager.java index 1d9b51b40..e1948adfc 100644 --- a/core/src/main/java/google/registry/model/ofy/DatastoreTransactionManager.java +++ b/core/src/main/java/google/registry/model/ofy/DatastoreTransactionManager.java @@ -449,8 +449,8 @@ public class DatastoreTransactionManager implements TransactionManager { } @Override - public List list() { - return buildQuery().list(); + public ImmutableList list() { + return ImmutableList.copyOf(buildQuery().list()); } private void checkOnlyOneInequalityField() { diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java index 347dc0ad2..6a7508438 100644 --- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java +++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java @@ -698,6 +698,8 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { private int fetchSize = DEFAULT_FETCH_SIZE; + private boolean autoDetachOnLoad = true; + JpaQueryComposerImpl(Class entityClass, EntityManager em) { super(entityClass); this.em = em; @@ -720,6 +722,12 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { return em.createQuery(queryBuilder.build()); } + @Override + public QueryComposer withAutoDetachOnLoad(boolean autoDetachOnLoad) { + this.autoDetachOnLoad = autoDetachOnLoad; + return this; + } + @Override public QueryComposer withFetchSize(int fetchSize) { checkArgument(fetchSize >= 0, "FetchSize must not be negative"); @@ -730,12 +738,12 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { @Override public Optional first() { List results = buildQuery().setMaxResults(1).getResultList(); - return results.size() > 0 ? Optional.of(results.get(0)) : Optional.empty(); + return results.size() > 0 ? Optional.of(maybeDetachEntity(results.get(0))) : Optional.empty(); } @Override public T getSingleResult() { - return buildQuery().getSingleResult(); + return maybeDetachEntity(buildQuery().getSingleResult()); } @Override @@ -749,7 +757,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { } else { logger.atWarning().log("Query implemention does not support result streaming."); } - return query.getResultStream(); + return query.getResultStream().map(this::maybeDetachEntity); } @Override @@ -759,8 +767,17 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { } @Override - public List list() { - return buildQuery().getResultList(); + public ImmutableList list() { + return buildQuery().getResultList().stream() + .map(this::maybeDetachEntity) + .collect(ImmutableList.toImmutableList()); + } + + private T maybeDetachEntity(T entity) { + if (autoDetachOnLoad) { + em.detach(entity); + } + return entity; } } } diff --git a/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java b/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java index 3171f2e3e..1624309e8 100644 --- a/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java +++ b/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java @@ -17,6 +17,7 @@ package google.registry.persistence.transaction; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; import google.registry.persistence.transaction.CriteriaQueryBuilder.WhereOperator; import java.util.ArrayList; import java.util.List; @@ -92,6 +93,16 @@ public abstract class QueryComposer { return this; } + /** + * Specifies if JPA entities should be automatically detached from the persistence context after + * loading. The default behavior is auto-detach. + * + *

This configuration has no effect on Datastore queries. + */ + public QueryComposer withAutoDetachOnLoad(boolean autoDetachOnLoad) { + return this; + } + /** Returns the first result of the query or an empty optional if there is none. */ public abstract Optional first(); @@ -110,7 +121,7 @@ public abstract class QueryComposer { public abstract long count(); /** Returns the results of the query as a list. */ - public abstract List list(); + public abstract ImmutableList list(); // We have to wrap the CriteriaQueryBuilder predicate factories in our own functions because at // the point where we pass them to the Comparator constructor, the compiler can't determine which diff --git a/core/src/main/java/google/registry/tools/GenerateLordnCommand.java b/core/src/main/java/google/registry/tools/GenerateLordnCommand.java index 9522c4e39..3ea364805 100644 --- a/core/src/main/java/google/registry/tools/GenerateLordnCommand.java +++ b/core/src/main/java/google/registry/tools/GenerateLordnCommand.java @@ -69,6 +69,7 @@ final class GenerateLordnCommand implements CommandWithRemoteApi { .createQueryComposer(DomainBase.class) .where("tld", Comparator.EQ, tld) .orderBy("repoId") + .withAutoDetachOnLoad(false) .stream() .forEach(domain -> processDomain(claimsCsv, sunriseCsv, domain))); ImmutableList claimsRows = claimsCsv.build(); diff --git a/core/src/main/java/google/registry/tools/javascrap/BackfillSpec11ThreatMatchesCommand.java b/core/src/main/java/google/registry/tools/javascrap/BackfillSpec11ThreatMatchesCommand.java index 2e77cc5de..8176e660b 100644 --- a/core/src/main/java/google/registry/tools/javascrap/BackfillSpec11ThreatMatchesCommand.java +++ b/core/src/main/java/google/registry/tools/javascrap/BackfillSpec11ThreatMatchesCommand.java @@ -39,7 +39,6 @@ import google.registry.tools.ConfirmingCommand; import google.registry.util.Clock; import java.io.IOException; import java.util.Comparator; -import java.util.List; import java.util.function.Function; import javax.inject.Inject; import org.joda.time.LocalDate; @@ -137,18 +136,18 @@ public class BackfillSpec11ThreatMatchesCommand extends ConfirmingCommand flatteningToImmutableListMultimap( Function.identity(), (domainName) -> { - List domains = loadDomainsForFqdn(domainName); - domains.sort(Comparator.comparing(DomainBase::getCreationTime).reversed()); + ImmutableList domains = loadDomainsForFqdn(domainName); checkState( !domains.isEmpty(), "Domain name %s had no associated DomainBase objects.", domainName); - return domains.stream(); + return domains.stream() + .sorted(Comparator.comparing(DomainBase::getCreationTime).reversed()); })); } /** Loads in all {@link DomainBase} objects for a given FQDN. */ - private List loadDomainsForFqdn(String fullyQualifiedDomainName) { + private ImmutableList loadDomainsForFqdn(String fullyQualifiedDomainName) { return transactIfJpaTm( () -> tm().createQueryComposer(DomainBase.class) diff --git a/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java b/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java index 082abe411..e6d42f86d 100644 --- a/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java +++ b/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java @@ -83,7 +83,7 @@ public class RegistryJpaReadTest { } @Test - void nonTransactionalQuery() { + void jpaRead() { Read read = RegistryJpaIO.read( (JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class),