From f713517197168ad7bfbde37ac1293cc4e4550fef Mon Sep 17 00:00:00 2001 From: Weimin Yu Date: Wed, 19 May 2021 14:45:04 -0400 Subject: [PATCH] Support text-based JPQL query for BEAM (#1168) * Support text-based JPQL query for BEAM --- config/presubmits.py | 1 + .../registry/beam/common/RegistryJpaIO.java | 39 +++++--- .../registry/beam/common/RegistryQuery.java | 89 ++++++++++++++++++ .../beam/common/RegistryJpaReadTest.java | 92 ++++++++++++++++++- 4 files changed, 206 insertions(+), 15 deletions(-) create mode 100644 core/src/main/java/google/registry/beam/common/RegistryQuery.java diff --git a/config/presubmits.py b/config/presubmits.py index 7efdd4f80..4cd5a1194 100644 --- a/config/presubmits.py +++ b/config/presubmits.py @@ -213,6 +213,7 @@ PRESUBMITS = { "RdapDomainSearchAction.java", "RdapNameserverSearchAction.java", "RdapSearchActionBase.java", + "RegistryQuery", }, ): "The first String parameter to EntityManager.create(Native)Query " diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java index fc541ea06..96b5aa9f7 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -21,9 +21,10 @@ import com.google.auto.value.AutoValue; import com.google.common.collect.ImmutableList; import com.google.common.collect.Streams; import google.registry.backup.AppEngineEnvironment; +import google.registry.beam.common.RegistryQuery.QueryComposerFactory; +import google.registry.beam.common.RegistryQuery.RegistryQueryFactory; import google.registry.model.ofy.ObjectifyService; import google.registry.persistence.transaction.JpaTransactionManager; -import google.registry.persistence.transaction.QueryComposer; import google.registry.persistence.transaction.TransactionManagerFactory; import java.io.Serializable; import java.util.Objects; @@ -68,17 +69,19 @@ public final class RegistryJpaIO { return Read.builder().queryFactory(queryFactory).resultMapper(resultMapper).build(); } + /** + * Returns a {@link Read} connector based on the given {@code jpql} query string. + * + *

User should take care to prevent sql-injection attacks. + */ + public static Read read(String jpql, SerializableFunction resultMapper) { + return Read.builder().jpqlQueryFactory(jpql).resultMapper(resultMapper).build(); + } + public static Write write() { return Write.builder().build(); } - // TODO(mmuller): Consider detached JpaQueryComposer that works with any JpaTransactionManager - // instance, i.e., change composer.buildQuery() to composer.buildQuery(JpaTransactionManager). - // This way QueryComposer becomes reusable and serializable (at least with Hibernate), and this - // interface would no longer be necessary. - public interface QueryComposerFactory - extends SerializableFunction> {} - /** * A {@link PTransform transform} that transactionally executes a JPA {@link CriteriaQuery} and * adds the results to the BEAM pipeline. Users have the option to transform the results before @@ -91,7 +94,7 @@ public final class RegistryJpaIO { abstract String name(); - abstract RegistryJpaIO.QueryComposerFactory queryFactory(); + abstract RegistryQueryFactory queryFactory(); abstract SerializableFunction resultMapper(); @@ -135,21 +138,29 @@ public final class RegistryJpaIO { abstract Builder name(String name); - abstract Builder queryFactory(RegistryJpaIO.QueryComposerFactory queryFactory); + abstract Builder queryFactory(RegistryQueryFactory queryFactory); abstract Builder resultMapper(SerializableFunction mapper); abstract Builder coder(Coder coder); abstract Read build(); + + Builder queryFactory(QueryComposerFactory queryFactory) { + return queryFactory(RegistryQuery.createQueryFactory(queryFactory)); + } + + Builder jpqlQueryFactory(String jpql) { + return queryFactory(RegistryQuery.createQueryFactory(jpql)); + } } static class QueryRunner extends DoFn { - private final QueryComposerFactory querySupplier; + private final RegistryQueryFactory queryFactory; private final SerializableFunction resultMapper; - QueryRunner(QueryComposerFactory querySupplier, SerializableFunction resultMapper) { - this.querySupplier = querySupplier; + QueryRunner(RegistryQueryFactory queryFactory, SerializableFunction resultMapper) { + this.queryFactory = queryFactory; this.resultMapper = resultMapper; } @@ -162,7 +173,7 @@ public final class RegistryJpaIO { jpaTm() .transactNoRetry( () -> - querySupplier.apply(jpaTm()).withAutoDetachOnLoad(true).stream() + queryFactory.apply(jpaTm()).stream() .map(resultMapper::apply) .forEach(outputReceiver::output)); } diff --git a/core/src/main/java/google/registry/beam/common/RegistryQuery.java b/core/src/main/java/google/registry/beam/common/RegistryQuery.java new file mode 100644 index 000000000..0d4b0cd00 --- /dev/null +++ b/core/src/main/java/google/registry/beam/common/RegistryQuery.java @@ -0,0 +1,89 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.common; + +import google.registry.persistence.transaction.JpaTransactionManager; +import google.registry.persistence.transaction.QueryComposer; +import java.util.stream.Stream; +import javax.persistence.EntityManager; +import javax.persistence.Query; +import org.apache.beam.sdk.transforms.SerializableFunction; + +/** Interface for query instances used by {@link RegistryJpaIO.Read}. */ +public interface RegistryQuery { + Stream stream(); + + /** Factory for {@link RegistryQuery}. */ + interface RegistryQueryFactory + extends SerializableFunction> {} + + // TODO(mmuller): Consider detached JpaQueryComposer that works with any JpaTransactionManager + // instance, i.e., change composer.buildQuery() to composer.buildQuery(JpaTransactionManager). + // This way QueryComposer becomes reusable and serializable (at least with Hibernate), and this + // interface would no longer be necessary. + interface QueryComposerFactory + extends SerializableFunction> {} + + /** + * Returns a {@link RegistryQueryFactory} that creates a JPQL query from constant text. + * + * @param Type of each row in the result set, {@link Object} in single-select queries, and + * {@code Object[]} in multi-select queries. + */ + @SuppressWarnings("unchecked") // query.getResultStream: jpa api uses raw type + static RegistryQueryFactory createQueryFactory(String jpql) { + return (JpaTransactionManager jpa) -> + () -> { + EntityManager entityManager = jpa.getEntityManager(); + Query query = entityManager.createQuery(jpql); + return query.getResultStream().map(e -> detach(entityManager, e)); + }; + } + + static RegistryQueryFactory createQueryFactory( + QueryComposerFactory queryComposerFactory) { + return (JpaTransactionManager jpa) -> + () -> queryComposerFactory.apply(jpa).withAutoDetachOnLoad(true).stream(); + } + + /** + * Removes an object from the JPA session cache if applicable. + * + * @param object An object that represents a row in the result set. It may be a JPA entity, a + * non-entity object, or an array that holds JPA entities and/or non-entities. + */ + static T detach(EntityManager entityManager, T object) { + if (object.getClass().isArray()) { + for (Object arrayElement : (Object[]) object) { + detachObject(entityManager, arrayElement); + } + } else { + detachObject(entityManager, object); + } + return object; + } + + static void detachObject(EntityManager entityManager, Object object) { + Class objectClass = object.getClass(); + if (objectClass.isPrimitive() || objectClass == String.class) { + return; + } + try { + entityManager.detach(object); + } catch (IllegalArgumentException e) { + // Not an entity. Do nothing. + } + } +} diff --git a/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java b/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java index e6d42f86d..c6c8a03ac 100644 --- a/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java +++ b/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java @@ -15,13 +15,28 @@ package google.registry.beam.common; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static google.registry.testing.AppEngineExtension.makeRegistrar1; +import static google.registry.testing.DatabaseHelper.newRegistry; +import static google.registry.util.DateTimeUtils.END_OF_TIME; +import static google.registry.util.DateTimeUtils.START_OF_TIME; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import google.registry.beam.TestPipelineExtension; import google.registry.beam.common.RegistryJpaIO.Read; import google.registry.model.contact.ContactBase; import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DomainAuthInfo; +import google.registry.model.domain.DomainBase; +import google.registry.model.domain.GracePeriod; +import google.registry.model.domain.launch.LaunchNotice; +import google.registry.model.domain.rgp.GracePeriodStatus; +import google.registry.model.domain.secdns.DelegationSignerData; +import google.registry.model.eppcommon.AuthInfo.PasswordAuth; +import google.registry.model.eppcommon.StatusValue; import google.registry.model.registrar.Registrar; +import google.registry.model.registry.Registry; +import google.registry.model.transfer.ContactTransferData; import google.registry.persistence.transaction.JpaTestRules; import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension; import google.registry.persistence.transaction.JpaTransactionManager; @@ -83,7 +98,7 @@ public class RegistryJpaReadTest { } @Test - void jpaRead() { + void readWithQueryComposer() { Read read = RegistryJpaIO.read( (JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class), @@ -93,4 +108,79 @@ public class RegistryJpaReadTest { PAssert.that(repoIds).containsInAnyOrder("contact_0", "contact_1", "contact_2"); testPipeline.run(); } + + @Test + void readWithStringQuery() { + setupForJoinQuery(); + Read read = + RegistryJpaIO.read( + "select d, r.emailAddress from Domain d join Registrar r on" + + " d.currentSponsorClientId = r.clientIdentifier where r.type = 'REAL'" + + " and d.deletionTime > now()", + RegistryJpaReadTest::parseRow); + PCollection joinedStrings = testPipeline.apply(read); + + PAssert.that(joinedStrings).containsInAnyOrder("4-COM-me@google.com"); + testPipeline.run(); + } + + private static String parseRow(Object[] row) { + DomainBase domainBase = (DomainBase) row[0]; + String emailAddress = (String) row[1]; + return domainBase.getRepoId() + "-" + emailAddress; + } + + private void setupForJoinQuery() { + Registry registry = newRegistry("com", "ABCD_APP"); + Registrar registrar = + makeRegistrar1() + .asBuilder() + .setClientId("registrar1") + .setEmailAddress("me@google.com") + .build(); + ContactResource contact = + new ContactResource.Builder() + .setRepoId("contactid_1") + .setCreationClientId(registrar.getClientId()) + .setTransferData(new ContactTransferData.Builder().build()) + .setPersistedCurrentSponsorClientId(registrar.getClientId()) + .build(); + DomainBase domain = + new DomainBase.Builder() + .setDomainName("example.com") + .setRepoId("4-COM") + .setCreationClientId(registrar.getClientId()) + .setLastEppUpdateTime(fakeClock.nowUtc()) + .setLastEppUpdateClientId(registrar.getClientId()) + .setLastTransferTime(fakeClock.nowUtc()) + .setStatusValues( + ImmutableSet.of( + StatusValue.CLIENT_DELETE_PROHIBITED, + StatusValue.SERVER_DELETE_PROHIBITED, + StatusValue.SERVER_TRANSFER_PROHIBITED, + StatusValue.SERVER_UPDATE_PROHIBITED, + StatusValue.SERVER_RENEW_PROHIBITED, + StatusValue.SERVER_HOLD)) + .setRegistrant(contact.createVKey()) + .setContacts(ImmutableSet.of()) + .setSubordinateHosts(ImmutableSet.of("ns1.example.com")) + .setPersistedCurrentSponsorClientId(registrar.getClientId()) + .setRegistrationExpirationTime(fakeClock.nowUtc().plusYears(1)) + .setAuthInfo(DomainAuthInfo.create(PasswordAuth.create("password"))) + .setDsData(ImmutableSet.of(DelegationSignerData.create(1, 2, 3, new byte[] {0, 1, 2}))) + .setLaunchNotice( + LaunchNotice.create("tcnid", "validatorId", START_OF_TIME, START_OF_TIME)) + .setSmdId("smdid") + .addGracePeriod( + GracePeriod.create( + GracePeriodStatus.ADD, + "4-COM", + END_OF_TIME, + registrar.getClientId(), + null, + 100L)) + .build(); + jpaTm() + .transact(() -> jpaTm().insertAll(ImmutableList.of(registry, registrar, contact, domain))); + } }