Make SQL queries return scrollable results (#1214)

* Make SQL queries return scrollable results

With Postgresql, we must override the default fetchSize (0) to enable
scrollable result sets. Previously we only did this in QueryComposer.

In this change we enable scrollable results for all queries by default.
We also provide a helper function
(JpaTransactionManager.setQueryFetchSize) that can override the default.
This commit is contained in:
Weimin Yu 2021-06-22 22:13:57 -04:00 committed by GitHub
parent 2b91e3bb89
commit 81fcdbdcea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 34 additions and 21 deletions

View file

@ -18,17 +18,23 @@ import static com.google.common.base.Verify.verify;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import google.registry.backup.AppEngineEnvironment; import google.registry.backup.AppEngineEnvironment;
import google.registry.model.contact.ContactResource;
import google.registry.persistence.transaction.CriteriaQueryBuilder;
import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.persistence.transaction.JpaTransactionManager;
import java.io.Serializable; import java.io.Serializable;
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo;
/** Toy pipeline that demonstrates how to use {@link JpaTransactionManager} in BEAM pipelines. */ /**
* Toy pipeline that demonstrates how to use {@link JpaTransactionManager} in BEAM pipelines.
*
* <p>This pipeline may also be used as an integration test for {@link RegistryJpaIO.Read} in a
* project with realistic data.
*/
public class JpaDemoPipeline implements Serializable { public class JpaDemoPipeline implements Serializable {
public static void main(String[] args) { public static void main(String[] args) {
@ -38,23 +44,16 @@ public class JpaDemoPipeline implements Serializable {
Pipeline pipeline = Pipeline.create(options); Pipeline pipeline = Pipeline.create(options);
pipeline pipeline
.apply("Start", Create.of((Void) null))
.apply( .apply(
"Generate Elements", "Read contacts",
ParDo.of( RegistryJpaIO.read(
new DoFn<Void, Void>() { () -> CriteriaQueryBuilder.create(ContactResource.class).build(),
@ProcessElement ContactResource::getRepoId))
public void processElement(OutputReceiver<Void> output) {
for (int i = 0; i < 500; i++) {
output.output(null);
}
}
}))
.apply( .apply(
"Make Query", "Count Contacts",
ParDo.of( ParDo.of(
new DoFn<Void, Void>() { new DoFn<String, Void>() {
private Counter counter = Metrics.counter("Demo", "Read"); private Counter counter = Metrics.counter("Contacts", "Read");
@ProcessElement @ProcessElement
public void processElement() { public void processElement() {

View file

@ -74,6 +74,15 @@ public abstract class PersistenceModule {
public static final String HIKARI_DS_CLOUD_SQL_INSTANCE = public static final String HIKARI_DS_CLOUD_SQL_INSTANCE =
"hibernate.hikari.dataSource.cloudSqlInstance"; "hibernate.hikari.dataSource.cloudSqlInstance";
/**
* Postgresql-specific: driver default fetch size is 0, which disables streaming result sets. Here
* we set a small default geared toward Nomulus server transactions. Large queries can override
* the defaults using {@link JpaTransactionManager#setQueryFetchSize}.
*/
public static final String JDBC_FETCH_SIZE = "hibernate.jdbc.fetch_size";
private static final int DEFAULT_SERVER_FETCH_SIZE = 20;
@VisibleForTesting @VisibleForTesting
@Provides @Provides
@DefaultHibernateConfigs @DefaultHibernateConfigs
@ -100,6 +109,7 @@ public abstract class PersistenceModule {
properties.put(HIKARI_MAXIMUM_POOL_SIZE, getHibernateHikariMaximumPoolSize()); properties.put(HIKARI_MAXIMUM_POOL_SIZE, getHibernateHikariMaximumPoolSize());
properties.put(HIKARI_IDLE_TIMEOUT, getHibernateHikariIdleTimeout()); properties.put(HIKARI_IDLE_TIMEOUT, getHibernateHikariIdleTimeout());
properties.put(Environment.DIALECT, NomulusPostgreSQLDialect.class.getName()); properties.put(Environment.DIALECT, NomulusPostgreSQLDialect.class.getName());
properties.put(JDBC_FETCH_SIZE, Integer.toString(DEFAULT_SERVER_FETCH_SIZE));
return properties.build(); return properties.build();
} }

View file

@ -65,4 +65,12 @@ public interface JpaTransactionManager extends TransactionManager {
* <p>The errorprone check forbids injection of {@link java.io.Closeable} resources. * <p>The errorprone check forbids injection of {@link java.io.Closeable} resources.
*/ */
void teardown(); void teardown();
/**
* Sets the JDBC driver fetch size for the {@code query}. This overrides the default
* configuration.
*/
static Query setQueryFetchSize(Query query, int fetchSize) {
return query.setHint("org.hibernate.fetchSize", fetchSize);
}
} }

View file

@ -1052,11 +1052,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
logger.atWarning().log("Query result streaming is not enabled."); logger.atWarning().log("Query result streaming is not enabled.");
} }
TypedQuery<T> query = buildQuery(); TypedQuery<T> query = buildQuery();
if (query instanceof org.hibernate.query.Query) { JpaTransactionManager.setQueryFetchSize(query, fetchSize);
((org.hibernate.query.Query) query).setFetchSize(fetchSize);
} else {
logger.atWarning().log("Query implemention does not support result streaming.");
}
return query.getResultStream().map(JpaTransactionManagerImpl.this::detach); return query.getResultStream().map(JpaTransactionManagerImpl.this::detach);
} }