diff --git a/core/src/main/java/google/registry/beam/common/JpaDemoPipeline.java b/core/src/main/java/google/registry/beam/common/JpaDemoPipeline.java index 426af79ed..8a28ba39a 100644 --- a/core/src/main/java/google/registry/beam/common/JpaDemoPipeline.java +++ b/core/src/main/java/google/registry/beam/common/JpaDemoPipeline.java @@ -18,17 +18,23 @@ import static com.google.common.base.Verify.verify; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import google.registry.backup.AppEngineEnvironment; +import google.registry.model.contact.ContactResource; +import google.registry.persistence.transaction.CriteriaQueryBuilder; import google.registry.persistence.transaction.JpaTransactionManager; import java.io.Serializable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; -/** Toy pipeline that demonstrates how to use {@link JpaTransactionManager} in BEAM pipelines. */ +/** + * Toy pipeline that demonstrates how to use {@link JpaTransactionManager} in BEAM pipelines. + * + *

This pipeline may also be used as an integration test for {@link RegistryJpaIO.Read} in a + * project with realistic data. + */ public class JpaDemoPipeline implements Serializable { public static void main(String[] args) { @@ -38,23 +44,16 @@ public class JpaDemoPipeline implements Serializable { Pipeline pipeline = Pipeline.create(options); pipeline - .apply("Start", Create.of((Void) null)) .apply( - "Generate Elements", - ParDo.of( - new DoFn() { - @ProcessElement - public void processElement(OutputReceiver output) { - for (int i = 0; i < 500; i++) { - output.output(null); - } - } - })) + "Read contacts", + RegistryJpaIO.read( + () -> CriteriaQueryBuilder.create(ContactResource.class).build(), + ContactResource::getRepoId)) .apply( - "Make Query", + "Count Contacts", ParDo.of( - new DoFn() { - private Counter counter = Metrics.counter("Demo", "Read"); + new DoFn() { + private Counter counter = Metrics.counter("Contacts", "Read"); @ProcessElement public void processElement() { diff --git a/core/src/main/java/google/registry/persistence/PersistenceModule.java b/core/src/main/java/google/registry/persistence/PersistenceModule.java index 06d4e2a90..0e949833d 100644 --- a/core/src/main/java/google/registry/persistence/PersistenceModule.java +++ b/core/src/main/java/google/registry/persistence/PersistenceModule.java @@ -74,6 +74,15 @@ public abstract class PersistenceModule { public static final String HIKARI_DS_CLOUD_SQL_INSTANCE = "hibernate.hikari.dataSource.cloudSqlInstance"; + /** + * Postgresql-specific: driver default fetch size is 0, which disables streaming result sets. Here + * we set a small default geared toward Nomulus server transactions. Large queries can override + * the defaults using {@link JpaTransactionManager#setQueryFetchSize}. + */ + public static final String JDBC_FETCH_SIZE = "hibernate.jdbc.fetch_size"; + + private static final int DEFAULT_SERVER_FETCH_SIZE = 20; + @VisibleForTesting @Provides @DefaultHibernateConfigs @@ -100,6 +109,7 @@ public abstract class PersistenceModule { properties.put(HIKARI_MAXIMUM_POOL_SIZE, getHibernateHikariMaximumPoolSize()); properties.put(HIKARI_IDLE_TIMEOUT, getHibernateHikariIdleTimeout()); properties.put(Environment.DIALECT, NomulusPostgreSQLDialect.class.getName()); + properties.put(JDBC_FETCH_SIZE, Integer.toString(DEFAULT_SERVER_FETCH_SIZE)); return properties.build(); } diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java index eefc20950..4e0db4e7d 100644 --- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java +++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java @@ -65,4 +65,12 @@ public interface JpaTransactionManager extends TransactionManager { *

The errorprone check forbids injection of {@link java.io.Closeable} resources. */ void teardown(); + + /** + * Sets the JDBC driver fetch size for the {@code query}. This overrides the default + * configuration. + */ + static Query setQueryFetchSize(Query query, int fetchSize) { + return query.setHint("org.hibernate.fetchSize", fetchSize); + } } diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java index e4ae4981d..1af6935c4 100644 --- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java +++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java @@ -1052,11 +1052,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager { logger.atWarning().log("Query result streaming is not enabled."); } TypedQuery query = buildQuery(); - if (query instanceof org.hibernate.query.Query) { - ((org.hibernate.query.Query) query).setFetchSize(fetchSize); - } else { - logger.atWarning().log("Query implemention does not support result streaming."); - } + JpaTransactionManager.setQueryFetchSize(query, fetchSize); return query.getResultStream().map(JpaTransactionManagerImpl.this::detach); }