mirror of
https://github.com/google/nomulus.git
synced 2025-06-22 20:30:46 +02:00
Fix the JPA Read connector for large data (#1155)
* Fix the JPA Read connector for large data Allow result set streaming by setting the fetchSize on JDBC statements. Many JDBC drivers by default buffers the entire result set, causing delays in first result and/or out of memory errors. Also fixed a entity instantiation problem exposed in production runs. Lastly, removed incorrect comments.
This commit is contained in:
parent
b2b2fc12d8
commit
6e5d42b38d
4 changed files with 58 additions and 54 deletions
|
@ -34,11 +34,11 @@ import org.apache.beam.sdk.coders.SerializableCoder;
|
|||
import org.apache.beam.sdk.metrics.Counter;
|
||||
import org.apache.beam.sdk.metrics.Metrics;
|
||||
import org.apache.beam.sdk.transforms.Create;
|
||||
import org.apache.beam.sdk.transforms.Deduplicate;
|
||||
import org.apache.beam.sdk.transforms.DoFn;
|
||||
import org.apache.beam.sdk.transforms.GroupIntoBatches;
|
||||
import org.apache.beam.sdk.transforms.PTransform;
|
||||
import org.apache.beam.sdk.transforms.ParDo;
|
||||
import org.apache.beam.sdk.transforms.Reshuffle;
|
||||
import org.apache.beam.sdk.transforms.SerializableFunction;
|
||||
import org.apache.beam.sdk.transforms.WithKeys;
|
||||
import org.apache.beam.sdk.util.ShardedKey;
|
||||
|
@ -83,22 +83,6 @@ public final class RegistryJpaIO {
|
|||
* A {@link PTransform transform} that executes a JPA {@link CriteriaQuery} and adds the results
|
||||
* to the BEAM pipeline. Users have the option to transform the results before sending them to the
|
||||
* next stages.
|
||||
*
|
||||
* <p>The BEAM pipeline may execute this transform multiple times due to transient failures,
|
||||
* loading duplicate results into the pipeline. Before we add dedepuplication support, the easiest
|
||||
* workaround is to map results to {@link KV} pairs, and apply the {@link Deduplicate} transform
|
||||
* to the output of this transform:
|
||||
*
|
||||
* <pre>{@code
|
||||
* PCollection<String> contactIds =
|
||||
* pipeline
|
||||
* .apply(RegistryJpaIO.read(
|
||||
* (JpaTransactionManager tm) -> tm.createQueryComposer...,
|
||||
* contact -> KV.of(contact.getRepoId(), contact.getContactId()))
|
||||
* .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))
|
||||
* .apply(Deduplicate.keyedValues())
|
||||
* .apply(Values.create());
|
||||
* }</pre>
|
||||
*/
|
||||
@AutoValue
|
||||
public abstract static class Read<R, T> extends PTransform<PBegin, PCollection<T>> {
|
||||
|
@ -118,13 +102,15 @@ public final class RegistryJpaIO {
|
|||
abstract Builder<R, T> toBuilder();
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("deprecation") // Reshuffle still recommended by GCP.
|
||||
public PCollection<T> expand(PBegin input) {
|
||||
return input
|
||||
.apply("Starting " + name(), Create.of((Void) null))
|
||||
.apply(
|
||||
"Run query for " + name(),
|
||||
ParDo.of(new QueryRunner<>(queryFactory(), resultMapper())))
|
||||
.setCoder(coder());
|
||||
.setCoder(coder())
|
||||
.apply("Reshuffle", Reshuffle.viaRandomKey());
|
||||
}
|
||||
|
||||
public Read<R, T> withName(String name) {
|
||||
|
@ -178,15 +164,17 @@ public final class RegistryJpaIO {
|
|||
|
||||
@ProcessElement
|
||||
public void processElement(OutputReceiver<T> outputReceiver) {
|
||||
// TODO(b/187210388): JpaTransactionManager should support non-transactional query.
|
||||
// TODO(weiminyu): add deduplication
|
||||
jpaTm()
|
||||
.transactNoRetry(
|
||||
() ->
|
||||
querySupplier.apply(jpaTm()).stream()
|
||||
.map(resultMapper::apply)
|
||||
.forEach(outputReceiver::output));
|
||||
// TODO(weiminyu): improve performance by reshuffle.
|
||||
// AppEngineEnvironment is need for handling VKeys, which involve Ofy keys. Unlike
|
||||
// SqlBatchWriter, it is unnecessary to initialize ObjectifyService in this class.
|
||||
try (AppEngineEnvironment env = new AppEngineEnvironment()) {
|
||||
// TODO(b/187210388): JpaTransactionManager should support non-transactional query.
|
||||
jpaTm()
|
||||
.transactNoRetry(
|
||||
() ->
|
||||
querySupplier.apply(jpaTm()).stream()
|
||||
.map(resultMapper::apply)
|
||||
.forEach(outputReceiver::output));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -323,8 +311,9 @@ public final class RegistryJpaIO {
|
|||
|
||||
@Setup
|
||||
public void setup() {
|
||||
// Below is needed as long as Objectify keys are still involved in the handling of SQL
|
||||
// entities (e.g., in VKeys).
|
||||
// AppEngineEnvironment is needed as long as Objectify keys are still involved in the handling
|
||||
// of SQL entities (e.g., in VKeys). ObjectifyService needs to be initialized when conversion
|
||||
// between Ofy entity and Datastore entity is needed.
|
||||
try (AppEngineEnvironment env = new AppEngineEnvironment()) {
|
||||
ObjectifyService.initOfy();
|
||||
}
|
||||
|
|
|
@ -692,8 +692,12 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
|
|||
|
||||
private static class JpaQueryComposerImpl<T> extends QueryComposer<T> {
|
||||
|
||||
private static final int DEFAULT_FETCH_SIZE = 1000;
|
||||
|
||||
EntityManager em;
|
||||
|
||||
private int fetchSize = DEFAULT_FETCH_SIZE;
|
||||
|
||||
JpaQueryComposerImpl(Class<T> entityClass, EntityManager em) {
|
||||
super(entityClass);
|
||||
this.em = em;
|
||||
|
@ -716,6 +720,13 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
|
|||
return em.createQuery(queryBuilder.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryComposer<T> withFetchSize(int fetchSize) {
|
||||
checkArgument(fetchSize >= 0, "FetchSize must not be negative");
|
||||
this.fetchSize = fetchSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<T> first() {
|
||||
List<T> results = buildQuery().setMaxResults(1).getResultList();
|
||||
|
@ -729,7 +740,16 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
|
|||
|
||||
@Override
|
||||
public Stream<T> stream() {
|
||||
return buildQuery().getResultStream();
|
||||
if (fetchSize == 0) {
|
||||
logger.atWarning().log("Query result streaming is not enabled.");
|
||||
}
|
||||
TypedQuery<T> query = buildQuery();
|
||||
if (query instanceof org.hibernate.query.Query) {
|
||||
((org.hibernate.query.Query) query).setFetchSize(fetchSize);
|
||||
} else {
|
||||
logger.atWarning().log("Query implemention does not support result streaming.");
|
||||
}
|
||||
return query.getResultStream();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -74,6 +74,24 @@ public abstract class QueryComposer<T> {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies {@code fetchSize} to the JDBC statement (by calling {@link
|
||||
* java.sql.Statement#setFetchSize}) if the query result is accessed by the {@link #stream}
|
||||
* method. Calling this method is optional. Children of this class will apply a default positive
|
||||
* fetch size if the user does not provide one.
|
||||
*
|
||||
* <p>With many JDBC drivers, including Postgresql, a positive fetch size is required for
|
||||
* streaming large result sets. A zero value, often the drivers' default setting, requires that
|
||||
* the entire result set is buffered.
|
||||
*
|
||||
* <p>The fetch size value, the default as well as the user-provided one, will be applied if and
|
||||
* only if the underlying query implementor supports it. The Hibernate implementations do support
|
||||
* this.
|
||||
*/
|
||||
public QueryComposer<T> withFetchSize(int fetchSize) {
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Returns the first result of the query or an empty optional if there is none. */
|
||||
public abstract Optional<T> first();
|
||||
|
||||
|
|
|
@ -30,12 +30,7 @@ import google.registry.testing.DatabaseHelper;
|
|||
import google.registry.testing.DatastoreEntityExtension;
|
||||
import google.registry.testing.FakeClock;
|
||||
import google.registry.testing.SystemPropertyExtension;
|
||||
import org.apache.beam.sdk.coders.KvCoder;
|
||||
import org.apache.beam.sdk.coders.StringUtf8Coder;
|
||||
import org.apache.beam.sdk.testing.PAssert;
|
||||
import org.apache.beam.sdk.transforms.Deduplicate;
|
||||
import org.apache.beam.sdk.transforms.Values;
|
||||
import org.apache.beam.sdk.values.KV;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
@ -88,7 +83,7 @@ public class RegistryJpaReadTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
void nonTransactionalQuery_noDedupe() {
|
||||
void nonTransactionalQuery() {
|
||||
Read<ContactResource, String> read =
|
||||
RegistryJpaIO.read(
|
||||
(JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class),
|
||||
|
@ -98,22 +93,4 @@ public class RegistryJpaReadTest {
|
|||
PAssert.that(repoIds).containsInAnyOrder("contact_0", "contact_1", "contact_2");
|
||||
testPipeline.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
void nonTransactionalQuery_dedupe() {
|
||||
// This method only serves as an example of deduplication. Duplicates are not actually added.
|
||||
Read<ContactResource, KV<String, String>> read =
|
||||
RegistryJpaIO.read(
|
||||
(JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class),
|
||||
contact -> KV.of(contact.getRepoId(), contact.getContactId()))
|
||||
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
|
||||
PCollection<String> repoIds =
|
||||
testPipeline
|
||||
.apply(read)
|
||||
.apply("Deduplicate", Deduplicate.keyedValues())
|
||||
.apply("Get values", Values.create());
|
||||
|
||||
PAssert.that(repoIds).containsInAnyOrder("contact_0", "contact_1", "contact_2");
|
||||
testPipeline.run();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue