From 6e5d42b38d82e677ecccfd2bcb02172c7a3667f7 Mon Sep 17 00:00:00 2001 From: Weimin Yu Date: Wed, 12 May 2021 19:07:38 -0400 Subject: [PATCH] Fix the JPA Read connector for large data (#1155) * Fix the JPA Read connector for large data Allow result set streaming by setting the fetchSize on JDBC statements. Many JDBC drivers by default buffers the entire result set, causing delays in first result and/or out of memory errors. Also fixed a entity instantiation problem exposed in production runs. Lastly, removed incorrect comments. --- .../registry/beam/common/RegistryJpaIO.java | 47 +++++++------------ .../JpaTransactionManagerImpl.java | 22 ++++++++- .../transaction/QueryComposer.java | 18 +++++++ .../beam/common/RegistryJpaReadTest.java | 25 +--------- 4 files changed, 58 insertions(+), 54 deletions(-) diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java index 2f169c0ef..b224e2547 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -34,11 +34,11 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.Deduplicate; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.util.ShardedKey; @@ -83,22 +83,6 @@ public final class RegistryJpaIO { * A {@link PTransform transform} that executes a JPA {@link CriteriaQuery} and adds the results * to the BEAM pipeline. Users have the option to transform the results before sending them to the * next stages. - * - *

The BEAM pipeline may execute this transform multiple times due to transient failures, - * loading duplicate results into the pipeline. Before we add dedepuplication support, the easiest - * workaround is to map results to {@link KV} pairs, and apply the {@link Deduplicate} transform - * to the output of this transform: - * - *

{@code
-   * PCollection contactIds =
-   *     pipeline
-   *         .apply(RegistryJpaIO.read(
-   *             (JpaTransactionManager tm) -> tm.createQueryComposer...,
-   *             contact -> KV.of(contact.getRepoId(), contact.getContactId()))
-   *         .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))
-   *         .apply(Deduplicate.keyedValues())
-   *         .apply(Values.create());
-   * }
*/ @AutoValue public abstract static class Read extends PTransform> { @@ -118,13 +102,15 @@ public final class RegistryJpaIO { abstract Builder toBuilder(); @Override + @SuppressWarnings("deprecation") // Reshuffle still recommended by GCP. public PCollection expand(PBegin input) { return input .apply("Starting " + name(), Create.of((Void) null)) .apply( "Run query for " + name(), ParDo.of(new QueryRunner<>(queryFactory(), resultMapper()))) - .setCoder(coder()); + .setCoder(coder()) + .apply("Reshuffle", Reshuffle.viaRandomKey()); } public Read withName(String name) { @@ -178,15 +164,17 @@ public final class RegistryJpaIO { @ProcessElement public void processElement(OutputReceiver outputReceiver) { - // TODO(b/187210388): JpaTransactionManager should support non-transactional query. - // TODO(weiminyu): add deduplication - jpaTm() - .transactNoRetry( - () -> - querySupplier.apply(jpaTm()).stream() - .map(resultMapper::apply) - .forEach(outputReceiver::output)); - // TODO(weiminyu): improve performance by reshuffle. + // AppEngineEnvironment is need for handling VKeys, which involve Ofy keys. Unlike + // SqlBatchWriter, it is unnecessary to initialize ObjectifyService in this class. + try (AppEngineEnvironment env = new AppEngineEnvironment()) { + // TODO(b/187210388): JpaTransactionManager should support non-transactional query. + jpaTm() + .transactNoRetry( + () -> + querySupplier.apply(jpaTm()).stream() + .map(resultMapper::apply) + .forEach(outputReceiver::output)); + } } } @@ -323,8 +311,9 @@ public final class RegistryJpaIO { @Setup public void setup() { - // Below is needed as long as Objectify keys are still involved in the handling of SQL - // entities (e.g., in VKeys). + // AppEngineEnvironment is needed as long as Objectify keys are still involved in the handling + // of SQL entities (e.g., in VKeys). ObjectifyService needs to be initialized when conversion + // between Ofy entity and Datastore entity is needed. try (AppEngineEnvironment env = new AppEngineEnvironment()) { ObjectifyService.initOfy(); } diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java index e72ac797f..347dc0ad2 100644 --- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java +++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java @@ -692,8 +692,12 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { private static class JpaQueryComposerImpl extends QueryComposer { + private static final int DEFAULT_FETCH_SIZE = 1000; + EntityManager em; + private int fetchSize = DEFAULT_FETCH_SIZE; + JpaQueryComposerImpl(Class entityClass, EntityManager em) { super(entityClass); this.em = em; @@ -716,6 +720,13 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { return em.createQuery(queryBuilder.build()); } + @Override + public QueryComposer withFetchSize(int fetchSize) { + checkArgument(fetchSize >= 0, "FetchSize must not be negative"); + this.fetchSize = fetchSize; + return this; + } + @Override public Optional first() { List results = buildQuery().setMaxResults(1).getResultList(); @@ -729,7 +740,16 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { @Override public Stream stream() { - return buildQuery().getResultStream(); + if (fetchSize == 0) { + logger.atWarning().log("Query result streaming is not enabled."); + } + TypedQuery query = buildQuery(); + if (query instanceof org.hibernate.query.Query) { + ((org.hibernate.query.Query) query).setFetchSize(fetchSize); + } else { + logger.atWarning().log("Query implemention does not support result streaming."); + } + return query.getResultStream(); } @Override diff --git a/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java b/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java index c145b294b..3171f2e3e 100644 --- a/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java +++ b/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java @@ -74,6 +74,24 @@ public abstract class QueryComposer { return this; } + /** + * Applies {@code fetchSize} to the JDBC statement (by calling {@link + * java.sql.Statement#setFetchSize}) if the query result is accessed by the {@link #stream} + * method. Calling this method is optional. Children of this class will apply a default positive + * fetch size if the user does not provide one. + * + *

With many JDBC drivers, including Postgresql, a positive fetch size is required for + * streaming large result sets. A zero value, often the drivers' default setting, requires that + * the entire result set is buffered. + * + *

The fetch size value, the default as well as the user-provided one, will be applied if and + * only if the underlying query implementor supports it. The Hibernate implementations do support + * this. + */ + public QueryComposer withFetchSize(int fetchSize) { + return this; + } + /** Returns the first result of the query or an empty optional if there is none. */ public abstract Optional first(); diff --git a/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java b/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java index fa153209b..082abe411 100644 --- a/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java +++ b/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java @@ -30,12 +30,7 @@ import google.registry.testing.DatabaseHelper; import google.registry.testing.DatastoreEntityExtension; import google.registry.testing.FakeClock; import google.registry.testing.SystemPropertyExtension; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.Deduplicate; -import org.apache.beam.sdk.transforms.Values; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.DateTime; import org.junit.jupiter.api.BeforeEach; @@ -88,7 +83,7 @@ public class RegistryJpaReadTest { } @Test - void nonTransactionalQuery_noDedupe() { + void nonTransactionalQuery() { Read read = RegistryJpaIO.read( (JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class), @@ -98,22 +93,4 @@ public class RegistryJpaReadTest { PAssert.that(repoIds).containsInAnyOrder("contact_0", "contact_1", "contact_2"); testPipeline.run(); } - - @Test - void nonTransactionalQuery_dedupe() { - // This method only serves as an example of deduplication. Duplicates are not actually added. - Read> read = - RegistryJpaIO.read( - (JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class), - contact -> KV.of(contact.getRepoId(), contact.getContactId())) - .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - PCollection repoIds = - testPipeline - .apply(read) - .apply("Deduplicate", Deduplicate.keyedValues()) - .apply("Get values", Values.create()); - - PAssert.that(repoIds).containsInAnyOrder("contact_0", "contact_1", "contact_2"); - testPipeline.run(); - } }