From 81fcdbdcea435a79e5844585107ac4accf2dde8e Mon Sep 17 00:00:00 2001 From: Weimin Yu Date: Tue, 22 Jun 2021 22:13:57 -0400 Subject: [PATCH] Make SQL queries return scrollable results (#1214) * Make SQL queries return scrollable results With Postgresql, we must override the default fetchSize (0) to enable scrollable result sets. Previously we only did this in QueryComposer. In this change we enable scrollable results for all queries by default. We also provide a helper function (JpaTransactionManager.setQueryFetchSize) that can override the default. --- .../registry/beam/common/JpaDemoPipeline.java | 31 +++++++++---------- .../persistence/PersistenceModule.java | 10 ++++++ .../transaction/JpaTransactionManager.java | 8 +++++ .../JpaTransactionManagerImpl.java | 6 +--- 4 files changed, 34 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/google/registry/beam/common/JpaDemoPipeline.java b/core/src/main/java/google/registry/beam/common/JpaDemoPipeline.java index 426af79ed..8a28ba39a 100644 --- a/core/src/main/java/google/registry/beam/common/JpaDemoPipeline.java +++ b/core/src/main/java/google/registry/beam/common/JpaDemoPipeline.java @@ -18,17 +18,23 @@ import static com.google.common.base.Verify.verify; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import google.registry.backup.AppEngineEnvironment; +import google.registry.model.contact.ContactResource; +import google.registry.persistence.transaction.CriteriaQueryBuilder; import google.registry.persistence.transaction.JpaTransactionManager; import java.io.Serializable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; -/** Toy pipeline that demonstrates how to use {@link JpaTransactionManager} in BEAM pipelines. */ +/** + * Toy pipeline that demonstrates how to use {@link JpaTransactionManager} in BEAM pipelines. + * + *

This pipeline may also be used as an integration test for {@link RegistryJpaIO.Read} in a + * project with realistic data. + */ public class JpaDemoPipeline implements Serializable { public static void main(String[] args) { @@ -38,23 +44,16 @@ public class JpaDemoPipeline implements Serializable { Pipeline pipeline = Pipeline.create(options); pipeline - .apply("Start", Create.of((Void) null)) .apply( - "Generate Elements", - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(OutputReceiver output) { - for (int i = 0; i < 500; i++) { - output.output(null); - } - } - })) + "Read contacts", + RegistryJpaIO.read( + () -> CriteriaQueryBuilder.create(ContactResource.class).build(), + ContactResource::getRepoId)) .apply( - "Make Query", + "Count Contacts", ParDo.of( - new DoFn() { - private Counter counter = Metrics.counter("Demo", "Read"); + new DoFn() { + private Counter counter = Metrics.counter("Contacts", "Read"); @ProcessElement public void processElement() { diff --git a/core/src/main/java/google/registry/persistence/PersistenceModule.java b/core/src/main/java/google/registry/persistence/PersistenceModule.java index 06d4e2a90..0e949833d 100644 --- a/core/src/main/java/google/registry/persistence/PersistenceModule.java +++ b/core/src/main/java/google/registry/persistence/PersistenceModule.java @@ -74,6 +74,15 @@ public abstract class PersistenceModule { public static final String HIKARI_DS_CLOUD_SQL_INSTANCE = "hibernate.hikari.dataSource.cloudSqlInstance"; + /** + * Postgresql-specific: driver default fetch size is 0, which disables streaming result sets. Here + * we set a small default geared toward Nomulus server transactions. Large queries can override + * the defaults using {@link JpaTransactionManager#setQueryFetchSize}. + */ + public static final String JDBC_FETCH_SIZE = "hibernate.jdbc.fetch_size"; + + private static final int DEFAULT_SERVER_FETCH_SIZE = 20; + @VisibleForTesting @Provides @DefaultHibernateConfigs @@ -100,6 +109,7 @@ public abstract class PersistenceModule { properties.put(HIKARI_MAXIMUM_POOL_SIZE, getHibernateHikariMaximumPoolSize()); properties.put(HIKARI_IDLE_TIMEOUT, getHibernateHikariIdleTimeout()); properties.put(Environment.DIALECT, NomulusPostgreSQLDialect.class.getName()); + properties.put(JDBC_FETCH_SIZE, Integer.toString(DEFAULT_SERVER_FETCH_SIZE)); return properties.build(); } diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java index eefc20950..4e0db4e7d 100644 --- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java +++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java @@ -65,4 +65,12 @@ public interface JpaTransactionManager extends TransactionManager { *

The errorprone check forbids injection of {@link java.io.Closeable} resources. */ void teardown(); + + /** + * Sets the JDBC driver fetch size for the {@code query}. This overrides the default + * configuration. + */ + static Query setQueryFetchSize(Query query, int fetchSize) { + return query.setHint("org.hibernate.fetchSize", fetchSize); + } } diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java index e4ae4981d..1af6935c4 100644 --- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java +++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java @@ -1052,11 +1052,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { logger.atWarning().log("Query result streaming is not enabled."); } TypedQuery query = buildQuery(); - if (query instanceof org.hibernate.query.Query) { - ((org.hibernate.query.Query) query).setFetchSize(fetchSize); - } else { - logger.atWarning().log("Query implemention does not support result streaming."); - } + JpaTransactionManager.setQueryFetchSize(query, fetchSize); return query.getResultStream().map(JpaTransactionManagerImpl.this::detach); }