From e4e7c5ead2d564c9e0132b214ccc8d2a7a1f43ee Mon Sep 17 00:00:00 2001 From: Lai Jiang Date: Fri, 18 Jun 2021 10:29:00 -0400 Subject: [PATCH] Make RegistryJpaIO use CriteriaQuery intead of QueryComposer (#1209) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit QueryComposer could be used when the transaction manager is not determined (i. e. it supports both ofy and sql), but this also imposes limits on what you can do with it. For example it does not support IN operator in the where clause. Since QueryComposer itself creates a CriteriaQuery for JPA TM it make sense to have RegistryJpaIO take a CriteriaQuery directly as it only uses JPA. Also add some more helper methods to use native queries and typed queires, and fix some generic type warnings. --- This change is [Reviewable](https://reviewable.io/reviews/google/nomulus/1209) --- config/presubmits.py | 2 + .../registry/beam/common/RegistryJpaIO.java | 91 +++++++++++++------ .../registry/beam/common/RegistryQuery.java | 81 +++++++++++------ .../registry/beam/spec11/Spec11Pipeline.java | 1 + .../transaction/QueryComposer.java | 10 -- .../registry/tools/GenerateLordnCommand.java | 1 - .../beam/common/RegistryJpaReadTest.java | 57 ++++++++++-- 7 files changed, 170 insertions(+), 73 deletions(-) diff --git a/config/presubmits.py b/config/presubmits.py index 4cd5a1194..442676ea1 100644 --- a/config/presubmits.py +++ b/config/presubmits.py @@ -202,6 +202,8 @@ PRESUBMITS = { "java", # ActivityReportingQueryBuilder deals with Dremel queries {"src/test", "ActivityReportingQueryBuilder.java", + # This class contains helper method to make queries in Beam. + "RegistryJpaIO.java", # TODO(b/179158393): Remove everything below, which should be done # using Criteria "ForeignKeyIndex.java", diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java index 96b5aa9f7..c0144ebbe 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -21,14 +21,15 @@ import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableList; import com.google.common.collect.Streams; import google.registry.backup.AppEngineEnvironment; -import google.registry.beam.common.RegistryQuery.QueryComposerFactory; -import google.registry.beam.common.RegistryQuery.RegistryQueryFactory; +import google.registry.beam.common.RegistryQuery.CriteriaQuerySupplier; import google.registry.model.ofy.ObjectifyService; import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.persistence.transaction.TransactionManagerFactory; import java.io.Serializable; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; import javax.persistence.criteria.CriteriaQuery; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; @@ -60,13 +61,18 @@ public final class RegistryJpaIO { private RegistryJpaIO() {} - public static Read read(QueryComposerFactory queryFactory) { - return Read.builder().queryFactory(queryFactory).build(); + public static Read read(CriteriaQuerySupplier query) { + return read(query, x -> x); } public static Read read( - QueryComposerFactory queryFactory, SerializableFunction resultMapper) { - return Read.builder().queryFactory(queryFactory).resultMapper(resultMapper).build(); + CriteriaQuerySupplier query, SerializableFunction resultMapper) { + return Read.builder().criteriaQuery(query).resultMapper(resultMapper).build(); + } + + public static Read read( + String sql, boolean nativeQuery, SerializableFunction resultMapper) { + return read(sql, null, nativeQuery, resultMapper); } /** @@ -74,8 +80,39 @@ public final class RegistryJpaIO { * *

User should take care to prevent sql-injection attacks. */ - public static Read read(String jpql, SerializableFunction resultMapper) { - return Read.builder().jpqlQueryFactory(jpql).resultMapper(resultMapper).build(); + public static Read read( + String sql, + @Nullable Map parameter, + boolean nativeQuery, + SerializableFunction resultMapper) { + Read.Builder builder = Read.builder(); + if (nativeQuery) { + builder.nativeQuery(sql, parameter); + } else { + builder.jpqlQuery(sql, parameter); + } + return builder.resultMapper(resultMapper).build(); + } + + public static Read read( + String jpql, Class clazz, SerializableFunction resultMapper) { + return read(jpql, null, clazz, resultMapper); + } + + /** + * Returns a {@link Read} connector based on the given {@code jpql} typed query string. + * + *

User should take care to prevent sql-injection attacks. + */ + public static Read read( + String jpql, + @Nullable Map parameter, + Class clazz, + SerializableFunction resultMapper) { + return Read.builder() + .jpqlQuery(jpql, clazz, parameter) + .resultMapper(resultMapper) + .build(); } public static Write write() { @@ -94,7 +131,7 @@ public final class RegistryJpaIO { abstract String name(); - abstract RegistryQueryFactory queryFactory(); + abstract RegistryQuery query(); abstract SerializableFunction resultMapper(); @@ -107,9 +144,7 @@ public final class RegistryJpaIO { public PCollection expand(PBegin input) { return input .apply("Starting " + name(), Create.of((Void) null)) - .apply( - "Run query for " + name(), - ParDo.of(new QueryRunner<>(queryFactory(), resultMapper()))) + .apply("Run query for " + name(), ParDo.of(new QueryRunner<>(query(), resultMapper()))) .setCoder(coder()) .apply("Reshuffle", Reshuffle.viaRandomKey()); } @@ -127,9 +162,8 @@ public final class RegistryJpaIO { } static Builder builder() { - return new AutoValue_RegistryJpaIO_Read.Builder() + return new AutoValue_RegistryJpaIO_Read.Builder() .name(DEFAULT_NAME) - .resultMapper(x -> x) .coder(SerializableCoder.of(Serializable.class)); } @@ -138,7 +172,7 @@ public final class RegistryJpaIO { abstract Builder name(String name); - abstract Builder queryFactory(RegistryQueryFactory queryFactory); + abstract Builder query(RegistryQuery query); abstract Builder resultMapper(SerializableFunction mapper); @@ -146,21 +180,29 @@ public final class RegistryJpaIO { abstract Read build(); - Builder queryFactory(QueryComposerFactory queryFactory) { - return queryFactory(RegistryQuery.createQueryFactory(queryFactory)); + Builder criteriaQuery(CriteriaQuerySupplier criteriaQuery) { + return query(RegistryQuery.createQuery(criteriaQuery)); } - Builder jpqlQueryFactory(String jpql) { - return queryFactory(RegistryQuery.createQueryFactory(jpql)); + Builder nativeQuery(String sql, Map parameters) { + return query(RegistryQuery.createQuery(sql, parameters, true)); + } + + Builder jpqlQuery(String jpql, Map parameters) { + return query(RegistryQuery.createQuery(jpql, parameters, false)); + } + + Builder jpqlQuery(String jpql, Class clazz, Map parameters) { + return query(RegistryQuery.createQuery(jpql, parameters, clazz)); } } static class QueryRunner extends DoFn { - private final RegistryQueryFactory queryFactory; + private final RegistryQuery query; private final SerializableFunction resultMapper; - QueryRunner(RegistryQueryFactory queryFactory, SerializableFunction resultMapper) { - this.queryFactory = queryFactory; + QueryRunner(RegistryQuery query, SerializableFunction resultMapper) { + this.query = query; this.resultMapper = resultMapper; } @@ -172,10 +214,7 @@ public final class RegistryJpaIO { // TODO(b/187210388): JpaTransactionManager should support non-transactional query. jpaTm() .transactNoRetry( - () -> - queryFactory.apply(jpaTm()).stream() - .map(resultMapper::apply) - .forEach(outputReceiver::output)); + () -> query.stream().map(resultMapper::apply).forEach(outputReceiver::output)); } } } diff --git a/core/src/main/java/google/registry/beam/common/RegistryQuery.java b/core/src/main/java/google/registry/beam/common/RegistryQuery.java index 0d4b0cd00..f61dc01d5 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryQuery.java +++ b/core/src/main/java/google/registry/beam/common/RegistryQuery.java @@ -14,48 +14,75 @@ package google.registry.beam.common; -import google.registry.persistence.transaction.JpaTransactionManager; -import google.registry.persistence.transaction.QueryComposer; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; + +import java.io.Serializable; +import java.util.Map; +import java.util.function.Supplier; import java.util.stream.Stream; +import javax.annotation.Nullable; import javax.persistence.EntityManager; import javax.persistence.Query; -import org.apache.beam.sdk.transforms.SerializableFunction; +import javax.persistence.TypedQuery; +import javax.persistence.criteria.CriteriaQuery; /** Interface for query instances used by {@link RegistryJpaIO.Read}. */ -public interface RegistryQuery { +public interface RegistryQuery extends Serializable { Stream stream(); - /** Factory for {@link RegistryQuery}. */ - interface RegistryQueryFactory - extends SerializableFunction> {} - - // TODO(mmuller): Consider detached JpaQueryComposer that works with any JpaTransactionManager - // instance, i.e., change composer.buildQuery() to composer.buildQuery(JpaTransactionManager). - // This way QueryComposer becomes reusable and serializable (at least with Hibernate), and this - // interface would no longer be necessary. - interface QueryComposerFactory - extends SerializableFunction> {} + interface CriteriaQuerySupplier extends Supplier>, Serializable {} /** - * Returns a {@link RegistryQueryFactory} that creates a JPQL query from constant text. + * Returns a {@link RegistryQuery} that creates a string query from constant text. * + * @param nativeQuery whether the given string is to be interpreted as a native query or JPQL. + * @param parameters parameters to be substituted in the query. * @param Type of each row in the result set, {@link Object} in single-select queries, and * {@code Object[]} in multi-select queries. */ - @SuppressWarnings("unchecked") // query.getResultStream: jpa api uses raw type - static RegistryQueryFactory createQueryFactory(String jpql) { - return (JpaTransactionManager jpa) -> - () -> { - EntityManager entityManager = jpa.getEntityManager(); - Query query = entityManager.createQuery(jpql); - return query.getResultStream().map(e -> detach(entityManager, e)); - }; + static RegistryQuery createQuery( + String sql, @Nullable Map parameters, boolean nativeQuery) { + return () -> { + EntityManager entityManager = jpaTm().getEntityManager(); + Query query = + nativeQuery ? entityManager.createNativeQuery(sql) : entityManager.createQuery(sql); + if (parameters != null) { + parameters.forEach(query::setParameter); + } + @SuppressWarnings("unchecked") + Stream resultStream = query.getResultStream(); + return nativeQuery ? resultStream : resultStream.map(e -> detach(entityManager, e)); + }; } - static RegistryQueryFactory createQueryFactory( - QueryComposerFactory queryComposerFactory) { - return (JpaTransactionManager jpa) -> - () -> queryComposerFactory.apply(jpa).withAutoDetachOnLoad(true).stream(); + /** + * Returns a {@link RegistryQuery} that creates a typed JPQL query from constant text. + * + * @param parameters parameters to be substituted in the query. + * @param Type of each row in the result set. + */ + static RegistryQuery createQuery( + String jpql, @Nullable Map parameters, Class clazz) { + return () -> { + TypedQuery query = jpaTm().query(jpql, clazz); + if (parameters != null) { + parameters.forEach(query::setParameter); + } + return query.getResultStream(); + }; + } + + /** + * Returns a {@link RegistryQuery} from a {@link CriteriaQuery} supplier. + * + *

A serializable supplier is needed in because {@link CriteriaQuery} itself must be created + * within a transaction, and we are not in a transaction yet when this function is called to set + * up the pipeline. + * + * @param Type of each row in the result set. + */ + static RegistryQuery createQuery(CriteriaQuerySupplier criteriaQuery) { + return () -> jpaTm().query(criteriaQuery.get()).getResultStream(); } /** diff --git a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java index 97a114b7d..32030d5c3 100644 --- a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java +++ b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java @@ -116,6 +116,7 @@ public class Spec11Pipeline implements Serializable { "select d, r.emailAddress from Domain d join Registrar r on" + " d.currentSponsorClientId = r.clientIdentifier where r.type = 'REAL'" + " and d.deletionTime > now()", + false, Spec11Pipeline::parseRow); return pipeline.apply("Read active domains from Cloud SQL", read); diff --git a/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java b/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java index d3308371e..743be2e26 100644 --- a/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java +++ b/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java @@ -93,16 +93,6 @@ public abstract class QueryComposer { return this; } - /** - * Specifies if JPA entities should be automatically detached from the persistence context after - * loading. The default behavior is auto-detach. - * - *

This configuration has no effect on Datastore queries. - */ - public QueryComposer withAutoDetachOnLoad(boolean autoDetachOnLoad) { - return this; - } - /** Returns the first result of the query or an empty optional if there is none. */ public abstract Optional first(); diff --git a/core/src/main/java/google/registry/tools/GenerateLordnCommand.java b/core/src/main/java/google/registry/tools/GenerateLordnCommand.java index 3ea364805..9522c4e39 100644 --- a/core/src/main/java/google/registry/tools/GenerateLordnCommand.java +++ b/core/src/main/java/google/registry/tools/GenerateLordnCommand.java @@ -69,7 +69,6 @@ final class GenerateLordnCommand implements CommandWithRemoteApi { .createQueryComposer(DomainBase.class) .where("tld", Comparator.EQ, tld) .orderBy("repoId") - .withAutoDetachOnLoad(false) .stream() .forEach(domain -> processDomain(claimsCsv, sunriseCsv, domain))); ImmutableList claimsRows = claimsCsv.build(); diff --git a/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java b/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java index c6c8a03ac..c1f80edde 100644 --- a/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java +++ b/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java @@ -21,6 +21,7 @@ import static google.registry.util.DateTimeUtils.END_OF_TIME; import static google.registry.util.DateTimeUtils.START_OF_TIME; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import google.registry.beam.TestPipelineExtension; import google.registry.beam.common.RegistryJpaIO.Read; @@ -37,9 +38,9 @@ import google.registry.model.eppcommon.StatusValue; import google.registry.model.registrar.Registrar; import google.registry.model.registry.Registry; import google.registry.model.transfer.ContactTransferData; +import google.registry.persistence.transaction.CriteriaQueryBuilder; import google.registry.persistence.transaction.JpaTestRules; import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension; -import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.testing.AppEngineExtension; import google.registry.testing.DatabaseHelper; import google.registry.testing.DatastoreEntityExtension; @@ -98,10 +99,10 @@ public class RegistryJpaReadTest { } @Test - void readWithQueryComposer() { + void readWithCriteriaQuery() { Read read = RegistryJpaIO.read( - (JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class), + () -> CriteriaQueryBuilder.create(ContactResource.class).build(), ContactBase::getContactId); PCollection repoIds = testPipeline.apply(read); @@ -115,19 +116,57 @@ public class RegistryJpaReadTest { Read read = RegistryJpaIO.read( "select d, r.emailAddress from Domain d join Registrar r on" - + " d.currentSponsorClientId = r.clientIdentifier where r.type = 'REAL'" + + " d.currentSponsorClientId = r.clientIdentifier where r.type = :type" + " and d.deletionTime > now()", - RegistryJpaReadTest::parseRow); + ImmutableMap.of("type", Registrar.Type.REAL), + false, + (Object[] row) -> { + DomainBase domainBase = (DomainBase) row[0]; + String emailAddress = (String) row[1]; + return domainBase.getRepoId() + "-" + emailAddress; + }); PCollection joinedStrings = testPipeline.apply(read); PAssert.that(joinedStrings).containsInAnyOrder("4-COM-me@google.com"); testPipeline.run(); } - private static String parseRow(Object[] row) { - DomainBase domainBase = (DomainBase) row[0]; - String emailAddress = (String) row[1]; - return domainBase.getRepoId() + "-" + emailAddress; + @Test + void readWithStringNativeQuery() { + setupForJoinQuery(); + Read read = + RegistryJpaIO.read( + "select d.repo_id, r.email_address from \"Domain\" d join \"Registrar\" r on" + + " d.current_sponsor_registrar_id = r.registrar_id where r.type = :type" + + " and d.deletion_time > now()", + ImmutableMap.of("type", "REAL"), + true, + (Object[] row) -> { + String repoId = (String) row[0]; + String emailAddress = (String) row[1]; + return repoId + "-" + emailAddress; + }); + PCollection joinedStrings = testPipeline.apply(read); + + PAssert.that(joinedStrings).containsInAnyOrder("4-COM-me@google.com"); + testPipeline.run(); + } + + @Test + void readWithStringTypedQuery() { + setupForJoinQuery(); + Read read = + RegistryJpaIO.read( + "select d from Domain d join Registrar r on" + + " d.currentSponsorClientId = r.clientIdentifier where r.type = :type" + + " and d.deletionTime > now()", + ImmutableMap.of("type", Registrar.Type.REAL), + DomainBase.class, + DomainBase::getRepoId); + PCollection repoIds = testPipeline.apply(read); + + PAssert.that(repoIds).containsInAnyOrder("4-COM"); + testPipeline.run(); } private void setupForJoinQuery() {