diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java index 2f169c0ef..b224e2547 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -34,11 +34,11 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.Deduplicate; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.util.ShardedKey; @@ -83,22 +83,6 @@ public final class RegistryJpaIO { * A {@link PTransform transform} that executes a JPA {@link CriteriaQuery} and adds the results * to the BEAM pipeline. Users have the option to transform the results before sending them to the * next stages. - * - *

The BEAM pipeline may execute this transform multiple times due to transient failures, - * loading duplicate results into the pipeline. Before we add dedepuplication support, the easiest - * workaround is to map results to {@link KV} pairs, and apply the {@link Deduplicate} transform - * to the output of this transform: - * - *

{@code
-   * PCollection contactIds =
-   *     pipeline
-   *         .apply(RegistryJpaIO.read(
-   *             (JpaTransactionManager tm) -> tm.createQueryComposer...,
-   *             contact -> KV.of(contact.getRepoId(), contact.getContactId()))
-   *         .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))
-   *         .apply(Deduplicate.keyedValues())
-   *         .apply(Values.create());
-   * }
*/ @AutoValue public abstract static class Read extends PTransform> { @@ -118,13 +102,15 @@ public final class RegistryJpaIO { abstract Builder toBuilder(); @Override + @SuppressWarnings("deprecation") // Reshuffle still recommended by GCP. public PCollection expand(PBegin input) { return input .apply("Starting " + name(), Create.of((Void) null)) .apply( "Run query for " + name(), ParDo.of(new QueryRunner<>(queryFactory(), resultMapper()))) - .setCoder(coder()); + .setCoder(coder()) + .apply("Reshuffle", Reshuffle.viaRandomKey()); } public Read withName(String name) { @@ -178,15 +164,17 @@ public final class RegistryJpaIO { @ProcessElement public void processElement(OutputReceiver outputReceiver) { - // TODO(b/187210388): JpaTransactionManager should support non-transactional query. - // TODO(weiminyu): add deduplication - jpaTm() - .transactNoRetry( - () -> - querySupplier.apply(jpaTm()).stream() - .map(resultMapper::apply) - .forEach(outputReceiver::output)); - // TODO(weiminyu): improve performance by reshuffle. + // AppEngineEnvironment is need for handling VKeys, which involve Ofy keys. Unlike + // SqlBatchWriter, it is unnecessary to initialize ObjectifyService in this class. + try (AppEngineEnvironment env = new AppEngineEnvironment()) { + // TODO(b/187210388): JpaTransactionManager should support non-transactional query. + jpaTm() + .transactNoRetry( + () -> + querySupplier.apply(jpaTm()).stream() + .map(resultMapper::apply) + .forEach(outputReceiver::output)); + } } } @@ -323,8 +311,9 @@ public final class RegistryJpaIO { @Setup public void setup() { - // Below is needed as long as Objectify keys are still involved in the handling of SQL - // entities (e.g., in VKeys). + // AppEngineEnvironment is needed as long as Objectify keys are still involved in the handling + // of SQL entities (e.g., in VKeys). ObjectifyService needs to be initialized when conversion + // between Ofy entity and Datastore entity is needed. try (AppEngineEnvironment env = new AppEngineEnvironment()) { ObjectifyService.initOfy(); } diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java index e72ac797f..347dc0ad2 100644 --- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java +++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java @@ -692,8 +692,12 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { private static class JpaQueryComposerImpl extends QueryComposer { + private static final int DEFAULT_FETCH_SIZE = 1000; + EntityManager em; + private int fetchSize = DEFAULT_FETCH_SIZE; + JpaQueryComposerImpl(Class entityClass, EntityManager em) { super(entityClass); this.em = em; @@ -716,6 +720,13 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { return em.createQuery(queryBuilder.build()); } + @Override + public QueryComposer withFetchSize(int fetchSize) { + checkArgument(fetchSize >= 0, "FetchSize must not be negative"); + this.fetchSize = fetchSize; + return this; + } + @Override public Optional first() { List results = buildQuery().setMaxResults(1).getResultList(); @@ -729,7 +740,16 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { @Override public Stream stream() { - return buildQuery().getResultStream(); + if (fetchSize == 0) { + logger.atWarning().log("Query result streaming is not enabled."); + } + TypedQuery query = buildQuery(); + if (query instanceof org.hibernate.query.Query) { + ((org.hibernate.query.Query) query).setFetchSize(fetchSize); + } else { + logger.atWarning().log("Query implemention does not support result streaming."); + } + return query.getResultStream(); } @Override diff --git a/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java b/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java index c145b294b..3171f2e3e 100644 --- a/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java +++ b/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java @@ -74,6 +74,24 @@ public abstract class QueryComposer { return this; } + /** + * Applies {@code fetchSize} to the JDBC statement (by calling {@link + * java.sql.Statement#setFetchSize}) if the query result is accessed by the {@link #stream} + * method. Calling this method is optional. Children of this class will apply a default positive + * fetch size if the user does not provide one. + * + *

With many JDBC drivers, including Postgresql, a positive fetch size is required for + * streaming large result sets. A zero value, often the drivers' default setting, requires that + * the entire result set is buffered. + * + *

The fetch size value, the default as well as the user-provided one, will be applied if and + * only if the underlying query implementor supports it. The Hibernate implementations do support + * this. + */ + public QueryComposer withFetchSize(int fetchSize) { + return this; + } + /** Returns the first result of the query or an empty optional if there is none. */ public abstract Optional first(); diff --git a/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java b/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java index fa153209b..082abe411 100644 --- a/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java +++ b/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java @@ -30,12 +30,7 @@ import google.registry.testing.DatabaseHelper; import google.registry.testing.DatastoreEntityExtension; import google.registry.testing.FakeClock; import google.registry.testing.SystemPropertyExtension; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.Deduplicate; -import org.apache.beam.sdk.transforms.Values; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.DateTime; import org.junit.jupiter.api.BeforeEach; @@ -88,7 +83,7 @@ public class RegistryJpaReadTest { } @Test - void nonTransactionalQuery_noDedupe() { + void nonTransactionalQuery() { Read read = RegistryJpaIO.read( (JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class), @@ -98,22 +93,4 @@ public class RegistryJpaReadTest { PAssert.that(repoIds).containsInAnyOrder("contact_0", "contact_1", "contact_2"); testPipeline.run(); } - - @Test - void nonTransactionalQuery_dedupe() { - // This method only serves as an example of deduplication. Duplicates are not actually added. - Read> read = - RegistryJpaIO.read( - (JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class), - contact -> KV.of(contact.getRepoId(), contact.getContactId())) - .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - PCollection repoIds = - testPipeline - .apply(read) - .apply("Deduplicate", Deduplicate.keyedValues()) - .apply("Get values", Values.create()); - - PAssert.that(repoIds).containsInAnyOrder("contact_0", "contact_1", "contact_2"); - testPipeline.run(); - } }