diff --git a/config/presubmits.py b/config/presubmits.py index 4cd5a1194..442676ea1 100644 --- a/config/presubmits.py +++ b/config/presubmits.py @@ -202,6 +202,8 @@ PRESUBMITS = { "java", # ActivityReportingQueryBuilder deals with Dremel queries {"src/test", "ActivityReportingQueryBuilder.java", + # This class contains helper method to make queries in Beam. + "RegistryJpaIO.java", # TODO(b/179158393): Remove everything below, which should be done # using Criteria "ForeignKeyIndex.java", diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java index 96b5aa9f7..c0144ebbe 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -21,14 +21,15 @@ import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableList; import com.google.common.collect.Streams; import google.registry.backup.AppEngineEnvironment; -import google.registry.beam.common.RegistryQuery.QueryComposerFactory; -import google.registry.beam.common.RegistryQuery.RegistryQueryFactory; +import google.registry.beam.common.RegistryQuery.CriteriaQuerySupplier; import google.registry.model.ofy.ObjectifyService; import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.persistence.transaction.TransactionManagerFactory; import java.io.Serializable; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; import javax.persistence.criteria.CriteriaQuery; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; @@ -60,13 +61,18 @@ public final class RegistryJpaIO { private RegistryJpaIO() {} - public static Read read(QueryComposerFactory queryFactory) { - return Read.builder().queryFactory(queryFactory).build(); + public static Read read(CriteriaQuerySupplier query) { + return read(query, x -> x); } public static Read read( - QueryComposerFactory queryFactory, SerializableFunction resultMapper) { - return Read.builder().queryFactory(queryFactory).resultMapper(resultMapper).build(); + CriteriaQuerySupplier query, SerializableFunction resultMapper) { + return Read.builder().criteriaQuery(query).resultMapper(resultMapper).build(); + } + + public static Read read( + String sql, boolean nativeQuery, SerializableFunction resultMapper) { + return read(sql, null, nativeQuery, resultMapper); } /** @@ -74,8 +80,39 @@ public final class RegistryJpaIO { * *

User should take care to prevent sql-injection attacks. */ - public static Read read(String jpql, SerializableFunction resultMapper) { - return Read.builder().jpqlQueryFactory(jpql).resultMapper(resultMapper).build(); + public static Read read( + String sql, + @Nullable Map parameter, + boolean nativeQuery, + SerializableFunction resultMapper) { + Read.Builder builder = Read.builder(); + if (nativeQuery) { + builder.nativeQuery(sql, parameter); + } else { + builder.jpqlQuery(sql, parameter); + } + return builder.resultMapper(resultMapper).build(); + } + + public static Read read( + String jpql, Class clazz, SerializableFunction resultMapper) { + return read(jpql, null, clazz, resultMapper); + } + + /** + * Returns a {@link Read} connector based on the given {@code jpql} typed query string. + * + *

User should take care to prevent sql-injection attacks. + */ + public static Read read( + String jpql, + @Nullable Map parameter, + Class clazz, + SerializableFunction resultMapper) { + return Read.builder() + .jpqlQuery(jpql, clazz, parameter) + .resultMapper(resultMapper) + .build(); } public static Write write() { @@ -94,7 +131,7 @@ public final class RegistryJpaIO { abstract String name(); - abstract RegistryQueryFactory queryFactory(); + abstract RegistryQuery query(); abstract SerializableFunction resultMapper(); @@ -107,9 +144,7 @@ public final class RegistryJpaIO { public PCollection expand(PBegin input) { return input .apply("Starting " + name(), Create.of((Void) null)) - .apply( - "Run query for " + name(), - ParDo.of(new QueryRunner<>(queryFactory(), resultMapper()))) + .apply("Run query for " + name(), ParDo.of(new QueryRunner<>(query(), resultMapper()))) .setCoder(coder()) .apply("Reshuffle", Reshuffle.viaRandomKey()); } @@ -127,9 +162,8 @@ public final class RegistryJpaIO { } static Builder builder() { - return new AutoValue_RegistryJpaIO_Read.Builder() + return new AutoValue_RegistryJpaIO_Read.Builder() .name(DEFAULT_NAME) - .resultMapper(x -> x) .coder(SerializableCoder.of(Serializable.class)); } @@ -138,7 +172,7 @@ public final class RegistryJpaIO { abstract Builder name(String name); - abstract Builder queryFactory(RegistryQueryFactory queryFactory); + abstract Builder query(RegistryQuery query); abstract Builder resultMapper(SerializableFunction mapper); @@ -146,21 +180,29 @@ public final class RegistryJpaIO { abstract Read build(); - Builder queryFactory(QueryComposerFactory queryFactory) { - return queryFactory(RegistryQuery.createQueryFactory(queryFactory)); + Builder criteriaQuery(CriteriaQuerySupplier criteriaQuery) { + return query(RegistryQuery.createQuery(criteriaQuery)); } - Builder jpqlQueryFactory(String jpql) { - return queryFactory(RegistryQuery.createQueryFactory(jpql)); + Builder nativeQuery(String sql, Map parameters) { + return query(RegistryQuery.createQuery(sql, parameters, true)); + } + + Builder jpqlQuery(String jpql, Map parameters) { + return query(RegistryQuery.createQuery(jpql, parameters, false)); + } + + Builder jpqlQuery(String jpql, Class clazz, Map parameters) { + return query(RegistryQuery.createQuery(jpql, parameters, clazz)); } } static class QueryRunner extends DoFn { - private final RegistryQueryFactory queryFactory; + private final RegistryQuery query; private final SerializableFunction resultMapper; - QueryRunner(RegistryQueryFactory queryFactory, SerializableFunction resultMapper) { - this.queryFactory = queryFactory; + QueryRunner(RegistryQuery query, SerializableFunction resultMapper) { + this.query = query; this.resultMapper = resultMapper; } @@ -172,10 +214,7 @@ public final class RegistryJpaIO { // TODO(b/187210388): JpaTransactionManager should support non-transactional query. jpaTm() .transactNoRetry( - () -> - queryFactory.apply(jpaTm()).stream() - .map(resultMapper::apply) - .forEach(outputReceiver::output)); + () -> query.stream().map(resultMapper::apply).forEach(outputReceiver::output)); } } } diff --git a/core/src/main/java/google/registry/beam/common/RegistryQuery.java b/core/src/main/java/google/registry/beam/common/RegistryQuery.java index 0d4b0cd00..f61dc01d5 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryQuery.java +++ b/core/src/main/java/google/registry/beam/common/RegistryQuery.java @@ -14,48 +14,75 @@ package google.registry.beam.common; -import google.registry.persistence.transaction.JpaTransactionManager; -import google.registry.persistence.transaction.QueryComposer; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; + +import java.io.Serializable; +import java.util.Map; +import java.util.function.Supplier; import java.util.stream.Stream; +import javax.annotation.Nullable; import javax.persistence.EntityManager; import javax.persistence.Query; -import org.apache.beam.sdk.transforms.SerializableFunction; +import javax.persistence.TypedQuery; +import javax.persistence.criteria.CriteriaQuery; /** Interface for query instances used by {@link RegistryJpaIO.Read}. */ -public interface RegistryQuery { +public interface RegistryQuery extends Serializable { Stream stream(); - /** Factory for {@link RegistryQuery}. */ - interface RegistryQueryFactory - extends SerializableFunction> {} - - // TODO(mmuller): Consider detached JpaQueryComposer that works with any JpaTransactionManager - // instance, i.e., change composer.buildQuery() to composer.buildQuery(JpaTransactionManager). - // This way QueryComposer becomes reusable and serializable (at least with Hibernate), and this - // interface would no longer be necessary. - interface QueryComposerFactory - extends SerializableFunction> {} + interface CriteriaQuerySupplier extends Supplier>, Serializable {} /** - * Returns a {@link RegistryQueryFactory} that creates a JPQL query from constant text. + * Returns a {@link RegistryQuery} that creates a string query from constant text. * + * @param nativeQuery whether the given string is to be interpreted as a native query or JPQL. + * @param parameters parameters to be substituted in the query. * @param Type of each row in the result set, {@link Object} in single-select queries, and * {@code Object[]} in multi-select queries. */ - @SuppressWarnings("unchecked") // query.getResultStream: jpa api uses raw type - static RegistryQueryFactory createQueryFactory(String jpql) { - return (JpaTransactionManager jpa) -> - () -> { - EntityManager entityManager = jpa.getEntityManager(); - Query query = entityManager.createQuery(jpql); - return query.getResultStream().map(e -> detach(entityManager, e)); - }; + static RegistryQuery createQuery( + String sql, @Nullable Map parameters, boolean nativeQuery) { + return () -> { + EntityManager entityManager = jpaTm().getEntityManager(); + Query query = + nativeQuery ? entityManager.createNativeQuery(sql) : entityManager.createQuery(sql); + if (parameters != null) { + parameters.forEach(query::setParameter); + } + @SuppressWarnings("unchecked") + Stream resultStream = query.getResultStream(); + return nativeQuery ? resultStream : resultStream.map(e -> detach(entityManager, e)); + }; } - static RegistryQueryFactory createQueryFactory( - QueryComposerFactory queryComposerFactory) { - return (JpaTransactionManager jpa) -> - () -> queryComposerFactory.apply(jpa).withAutoDetachOnLoad(true).stream(); + /** + * Returns a {@link RegistryQuery} that creates a typed JPQL query from constant text. + * + * @param parameters parameters to be substituted in the query. + * @param Type of each row in the result set. + */ + static RegistryQuery createQuery( + String jpql, @Nullable Map parameters, Class clazz) { + return () -> { + TypedQuery query = jpaTm().query(jpql, clazz); + if (parameters != null) { + parameters.forEach(query::setParameter); + } + return query.getResultStream(); + }; + } + + /** + * Returns a {@link RegistryQuery} from a {@link CriteriaQuery} supplier. + * + *

A serializable supplier is needed in because {@link CriteriaQuery} itself must be created + * within a transaction, and we are not in a transaction yet when this function is called to set + * up the pipeline. + * + * @param Type of each row in the result set. + */ + static RegistryQuery createQuery(CriteriaQuerySupplier criteriaQuery) { + return () -> jpaTm().query(criteriaQuery.get()).getResultStream(); } /** diff --git a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java index 97a114b7d..32030d5c3 100644 --- a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java +++ b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java @@ -116,6 +116,7 @@ public class Spec11Pipeline implements Serializable { "select d, r.emailAddress from Domain d join Registrar r on" + " d.currentSponsorClientId = r.clientIdentifier where r.type = 'REAL'" + " and d.deletionTime > now()", + false, Spec11Pipeline::parseRow); return pipeline.apply("Read active domains from Cloud SQL", read); diff --git a/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java b/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java index d3308371e..743be2e26 100644 --- a/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java +++ b/core/src/main/java/google/registry/persistence/transaction/QueryComposer.java @@ -93,16 +93,6 @@ public abstract class QueryComposer { return this; } - /** - * Specifies if JPA entities should be automatically detached from the persistence context after - * loading. The default behavior is auto-detach. - * - *

This configuration has no effect on Datastore queries. - */ - public QueryComposer withAutoDetachOnLoad(boolean autoDetachOnLoad) { - return this; - } - /** Returns the first result of the query or an empty optional if there is none. */ public abstract Optional first(); diff --git a/core/src/main/java/google/registry/tools/GenerateLordnCommand.java b/core/src/main/java/google/registry/tools/GenerateLordnCommand.java index 3ea364805..9522c4e39 100644 --- a/core/src/main/java/google/registry/tools/GenerateLordnCommand.java +++ b/core/src/main/java/google/registry/tools/GenerateLordnCommand.java @@ -69,7 +69,6 @@ final class GenerateLordnCommand implements CommandWithRemoteApi { .createQueryComposer(DomainBase.class) .where("tld", Comparator.EQ, tld) .orderBy("repoId") - .withAutoDetachOnLoad(false) .stream() .forEach(domain -> processDomain(claimsCsv, sunriseCsv, domain))); ImmutableList claimsRows = claimsCsv.build(); diff --git a/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java b/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java index c6c8a03ac..c1f80edde 100644 --- a/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java +++ b/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java @@ -21,6 +21,7 @@ import static google.registry.util.DateTimeUtils.END_OF_TIME; import static google.registry.util.DateTimeUtils.START_OF_TIME; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import google.registry.beam.TestPipelineExtension; import google.registry.beam.common.RegistryJpaIO.Read; @@ -37,9 +38,9 @@ import google.registry.model.eppcommon.StatusValue; import google.registry.model.registrar.Registrar; import google.registry.model.registry.Registry; import google.registry.model.transfer.ContactTransferData; +import google.registry.persistence.transaction.CriteriaQueryBuilder; import google.registry.persistence.transaction.JpaTestRules; import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension; -import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.testing.AppEngineExtension; import google.registry.testing.DatabaseHelper; import google.registry.testing.DatastoreEntityExtension; @@ -98,10 +99,10 @@ public class RegistryJpaReadTest { } @Test - void readWithQueryComposer() { + void readWithCriteriaQuery() { Read read = RegistryJpaIO.read( - (JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class), + () -> CriteriaQueryBuilder.create(ContactResource.class).build(), ContactBase::getContactId); PCollection repoIds = testPipeline.apply(read); @@ -115,19 +116,57 @@ public class RegistryJpaReadTest { Read read = RegistryJpaIO.read( "select d, r.emailAddress from Domain d join Registrar r on" - + " d.currentSponsorClientId = r.clientIdentifier where r.type = 'REAL'" + + " d.currentSponsorClientId = r.clientIdentifier where r.type = :type" + " and d.deletionTime > now()", - RegistryJpaReadTest::parseRow); + ImmutableMap.of("type", Registrar.Type.REAL), + false, + (Object[] row) -> { + DomainBase domainBase = (DomainBase) row[0]; + String emailAddress = (String) row[1]; + return domainBase.getRepoId() + "-" + emailAddress; + }); PCollection joinedStrings = testPipeline.apply(read); PAssert.that(joinedStrings).containsInAnyOrder("4-COM-me@google.com"); testPipeline.run(); } - private static String parseRow(Object[] row) { - DomainBase domainBase = (DomainBase) row[0]; - String emailAddress = (String) row[1]; - return domainBase.getRepoId() + "-" + emailAddress; + @Test + void readWithStringNativeQuery() { + setupForJoinQuery(); + Read read = + RegistryJpaIO.read( + "select d.repo_id, r.email_address from \"Domain\" d join \"Registrar\" r on" + + " d.current_sponsor_registrar_id = r.registrar_id where r.type = :type" + + " and d.deletion_time > now()", + ImmutableMap.of("type", "REAL"), + true, + (Object[] row) -> { + String repoId = (String) row[0]; + String emailAddress = (String) row[1]; + return repoId + "-" + emailAddress; + }); + PCollection joinedStrings = testPipeline.apply(read); + + PAssert.that(joinedStrings).containsInAnyOrder("4-COM-me@google.com"); + testPipeline.run(); + } + + @Test + void readWithStringTypedQuery() { + setupForJoinQuery(); + Read read = + RegistryJpaIO.read( + "select d from Domain d join Registrar r on" + + " d.currentSponsorClientId = r.clientIdentifier where r.type = :type" + + " and d.deletionTime > now()", + ImmutableMap.of("type", Registrar.Type.REAL), + DomainBase.class, + DomainBase::getRepoId); + PCollection repoIds = testPipeline.apply(read); + + PAssert.that(repoIds).containsInAnyOrder("4-COM"); + testPipeline.run(); } private void setupForJoinQuery() {