Fix RegistryJpaIO.Read problem with large data (#1161)

* Fix RegistryJpaIO.Read problem with large data

The read connector needs to detach loaded entities. This 
is now the default behavior in QueryComposer

Also removed the 'transaction mode' property from the Read connector.
There are no obvious use cases for non-transaction query, and
implementation is not straightforward with the current code base.

Also changed the return type of QueryComposer.list() to ImmutableList.
This commit is contained in:
Weimin Yu 2021-05-14 15:19:12 -04:00 committed by GitHub
parent 6b6544adc8
commit b9027047c4
7 changed files with 46 additions and 32 deletions

View file

@ -80,9 +80,9 @@ public final class RegistryJpaIO {
extends SerializableFunction<JpaTransactionManager, QueryComposer<T>> {}
/**
* A {@link PTransform transform} that executes a JPA {@link CriteriaQuery} and adds the results
* to the BEAM pipeline. Users have the option to transform the results before sending them to the
* next stages.
* A {@link PTransform transform} that transactionally executes a JPA {@link CriteriaQuery} and
* adds the results to the BEAM pipeline. Users have the option to transform the results before
* sending them to the next stages.
*/
@AutoValue
public abstract static class Read<R, T> extends PTransform<PBegin, PCollection<T>> {
@ -95,8 +95,6 @@ public final class RegistryJpaIO {
abstract SerializableFunction<R, T> resultMapper();
abstract TransactionMode transactionMode();
abstract Coder<T> coder();
abstract Builder<R, T> toBuilder();
@ -121,10 +119,6 @@ public final class RegistryJpaIO {
return toBuilder().resultMapper(mapper).build();
}
public Read<R, T> withTransactionMode(TransactionMode transactionMode) {
return toBuilder().transactionMode(transactionMode).build();
}
public Read<R, T> withCoder(Coder<T> coder) {
return toBuilder().coder(coder).build();
}
@ -133,7 +127,6 @@ public final class RegistryJpaIO {
return new AutoValue_RegistryJpaIO_Read.Builder()
.name(DEFAULT_NAME)
.resultMapper(x -> x)
.transactionMode(TransactionMode.TRANSACTIONAL)
.coder(SerializableCoder.of(Serializable.class));
}
@ -146,8 +139,6 @@ public final class RegistryJpaIO {
abstract Builder<R, T> resultMapper(SerializableFunction<R, T> mapper);
abstract Builder<R, T> transactionMode(TransactionMode transactionMode);
abstract Builder<R, T> coder(Coder coder);
abstract Read<R, T> build();
@ -171,17 +162,12 @@ public final class RegistryJpaIO {
jpaTm()
.transactNoRetry(
() ->
querySupplier.apply(jpaTm()).stream()
querySupplier.apply(jpaTm()).withAutoDetachOnLoad(true).stream()
.map(resultMapper::apply)
.forEach(outputReceiver::output));
}
}
}
public enum TransactionMode {
NOT_TRANSACTIONAL,
TRANSACTIONAL;
}
}
/**

View file

@ -449,8 +449,8 @@ public class DatastoreTransactionManager implements TransactionManager {
}
@Override
public List<T> list() {
return buildQuery().list();
public ImmutableList<T> list() {
return ImmutableList.copyOf(buildQuery().list());
}
private void checkOnlyOneInequalityField() {

View file

@ -698,6 +698,8 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
private int fetchSize = DEFAULT_FETCH_SIZE;
private boolean autoDetachOnLoad = true;
JpaQueryComposerImpl(Class<T> entityClass, EntityManager em) {
super(entityClass);
this.em = em;
@ -720,6 +722,12 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
return em.createQuery(queryBuilder.build());
}
@Override
public QueryComposer<T> withAutoDetachOnLoad(boolean autoDetachOnLoad) {
this.autoDetachOnLoad = autoDetachOnLoad;
return this;
}
@Override
public QueryComposer<T> withFetchSize(int fetchSize) {
checkArgument(fetchSize >= 0, "FetchSize must not be negative");
@ -730,12 +738,12 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
@Override
public Optional<T> first() {
List<T> results = buildQuery().setMaxResults(1).getResultList();
return results.size() > 0 ? Optional.of(results.get(0)) : Optional.empty();
return results.size() > 0 ? Optional.of(maybeDetachEntity(results.get(0))) : Optional.empty();
}
@Override
public T getSingleResult() {
return buildQuery().getSingleResult();
return maybeDetachEntity(buildQuery().getSingleResult());
}
@Override
@ -749,7 +757,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
} else {
logger.atWarning().log("Query implemention does not support result streaming.");
}
return query.getResultStream();
return query.getResultStream().map(this::maybeDetachEntity);
}
@Override
@ -759,8 +767,17 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
}
@Override
public List<T> list() {
return buildQuery().getResultList();
public ImmutableList<T> list() {
return buildQuery().getResultList().stream()
.map(this::maybeDetachEntity)
.collect(ImmutableList.toImmutableList());
}
private T maybeDetachEntity(T entity) {
if (autoDetachOnLoad) {
em.detach(entity);
}
return entity;
}
}
}

View file

@ -17,6 +17,7 @@ package google.registry.persistence.transaction;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import google.registry.persistence.transaction.CriteriaQueryBuilder.WhereOperator;
import java.util.ArrayList;
import java.util.List;
@ -92,6 +93,16 @@ public abstract class QueryComposer<T> {
return this;
}
/**
* Specifies if JPA entities should be automatically detached from the persistence context after
* loading. The default behavior is auto-detach.
*
* <p>This configuration has no effect on Datastore queries.
*/
public QueryComposer<T> withAutoDetachOnLoad(boolean autoDetachOnLoad) {
return this;
}
/** Returns the first result of the query or an empty optional if there is none. */
public abstract Optional<T> first();
@ -110,7 +121,7 @@ public abstract class QueryComposer<T> {
public abstract long count();
/** Returns the results of the query as a list. */
public abstract List<T> list();
public abstract ImmutableList<T> list();
// We have to wrap the CriteriaQueryBuilder predicate factories in our own functions because at
// the point where we pass them to the Comparator constructor, the compiler can't determine which

View file

@ -69,6 +69,7 @@ final class GenerateLordnCommand implements CommandWithRemoteApi {
.createQueryComposer(DomainBase.class)
.where("tld", Comparator.EQ, tld)
.orderBy("repoId")
.withAutoDetachOnLoad(false)
.stream()
.forEach(domain -> processDomain(claimsCsv, sunriseCsv, domain)));
ImmutableList<String> claimsRows = claimsCsv.build();

View file

@ -39,7 +39,6 @@ import google.registry.tools.ConfirmingCommand;
import google.registry.util.Clock;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;
import javax.inject.Inject;
import org.joda.time.LocalDate;
@ -137,18 +136,18 @@ public class BackfillSpec11ThreatMatchesCommand extends ConfirmingCommand
flatteningToImmutableListMultimap(
Function.identity(),
(domainName) -> {
List<DomainBase> domains = loadDomainsForFqdn(domainName);
domains.sort(Comparator.comparing(DomainBase::getCreationTime).reversed());
ImmutableList<DomainBase> domains = loadDomainsForFqdn(domainName);
checkState(
!domains.isEmpty(),
"Domain name %s had no associated DomainBase objects.",
domainName);
return domains.stream();
return domains.stream()
.sorted(Comparator.comparing(DomainBase::getCreationTime).reversed());
}));
}
/** Loads in all {@link DomainBase} objects for a given FQDN. */
private List<DomainBase> loadDomainsForFqdn(String fullyQualifiedDomainName) {
private ImmutableList<DomainBase> loadDomainsForFqdn(String fullyQualifiedDomainName) {
return transactIfJpaTm(
() ->
tm().createQueryComposer(DomainBase.class)

View file

@ -83,7 +83,7 @@ public class RegistryJpaReadTest {
}
@Test
void nonTransactionalQuery() {
void jpaRead() {
Read<ContactResource, String> read =
RegistryJpaIO.read(
(JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class),