Support text-based JPQL query for BEAM (#1168)

* Support text-based JPQL query for BEAM
This commit is contained in:
Weimin Yu 2021-05-19 14:45:04 -04:00 committed by GitHub
parent 47e77e20f7
commit f713517197
4 changed files with 206 additions and 15 deletions

View file

@ -213,6 +213,7 @@ PRESUBMITS = {
"RdapDomainSearchAction.java",
"RdapNameserverSearchAction.java",
"RdapSearchActionBase.java",
"RegistryQuery",
},
):
"The first String parameter to EntityManager.create(Native)Query "

View file

@ -21,9 +21,10 @@ import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import google.registry.backup.AppEngineEnvironment;
import google.registry.beam.common.RegistryQuery.QueryComposerFactory;
import google.registry.beam.common.RegistryQuery.RegistryQueryFactory;
import google.registry.model.ofy.ObjectifyService;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.persistence.transaction.QueryComposer;
import google.registry.persistence.transaction.TransactionManagerFactory;
import java.io.Serializable;
import java.util.Objects;
@ -68,17 +69,19 @@ public final class RegistryJpaIO {
return Read.<R, T>builder().queryFactory(queryFactory).resultMapper(resultMapper).build();
}
/**
* Returns a {@link Read} connector based on the given {@code jpql} query string.
*
* <p>User should take care to prevent sql-injection attacks.
*/
public static <R, T> Read<R, T> read(String jpql, SerializableFunction<R, T> resultMapper) {
return Read.<R, T>builder().jpqlQueryFactory(jpql).resultMapper(resultMapper).build();
}
public static <T> Write<T> write() {
return Write.<T>builder().build();
}
// TODO(mmuller): Consider detached JpaQueryComposer that works with any JpaTransactionManager
// instance, i.e., change composer.buildQuery() to composer.buildQuery(JpaTransactionManager).
// This way QueryComposer becomes reusable and serializable (at least with Hibernate), and this
// interface would no longer be necessary.
public interface QueryComposerFactory<T>
extends SerializableFunction<JpaTransactionManager, QueryComposer<T>> {}
/**
* A {@link PTransform transform} that transactionally executes a JPA {@link CriteriaQuery} and
* adds the results to the BEAM pipeline. Users have the option to transform the results before
@ -91,7 +94,7 @@ public final class RegistryJpaIO {
abstract String name();
abstract RegistryJpaIO.QueryComposerFactory<R> queryFactory();
abstract RegistryQueryFactory<R> queryFactory();
abstract SerializableFunction<R, T> resultMapper();
@ -135,21 +138,29 @@ public final class RegistryJpaIO {
abstract Builder<R, T> name(String name);
abstract Builder<R, T> queryFactory(RegistryJpaIO.QueryComposerFactory<R> queryFactory);
abstract Builder<R, T> queryFactory(RegistryQueryFactory<R> queryFactory);
abstract Builder<R, T> resultMapper(SerializableFunction<R, T> mapper);
abstract Builder<R, T> coder(Coder coder);
abstract Read<R, T> build();
Builder<R, T> queryFactory(QueryComposerFactory<R> queryFactory) {
return queryFactory(RegistryQuery.createQueryFactory(queryFactory));
}
Builder<R, T> jpqlQueryFactory(String jpql) {
return queryFactory(RegistryQuery.createQueryFactory(jpql));
}
}
static class QueryRunner<R, T> extends DoFn<Void, T> {
private final QueryComposerFactory<R> querySupplier;
private final RegistryQueryFactory<R> queryFactory;
private final SerializableFunction<R, T> resultMapper;
QueryRunner(QueryComposerFactory<R> querySupplier, SerializableFunction<R, T> resultMapper) {
this.querySupplier = querySupplier;
QueryRunner(RegistryQueryFactory<R> queryFactory, SerializableFunction<R, T> resultMapper) {
this.queryFactory = queryFactory;
this.resultMapper = resultMapper;
}
@ -162,7 +173,7 @@ public final class RegistryJpaIO {
jpaTm()
.transactNoRetry(
() ->
querySupplier.apply(jpaTm()).withAutoDetachOnLoad(true).stream()
queryFactory.apply(jpaTm()).stream()
.map(resultMapper::apply)
.forEach(outputReceiver::output));
}

View file

@ -0,0 +1,89 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.common;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.persistence.transaction.QueryComposer;
import java.util.stream.Stream;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.beam.sdk.transforms.SerializableFunction;
/** Interface for query instances used by {@link RegistryJpaIO.Read}. */
public interface RegistryQuery<T> {
Stream<T> stream();
/** Factory for {@link RegistryQuery}. */
interface RegistryQueryFactory<T>
extends SerializableFunction<JpaTransactionManager, RegistryQuery<T>> {}
// TODO(mmuller): Consider detached JpaQueryComposer that works with any JpaTransactionManager
// instance, i.e., change composer.buildQuery() to composer.buildQuery(JpaTransactionManager).
// This way QueryComposer becomes reusable and serializable (at least with Hibernate), and this
// interface would no longer be necessary.
interface QueryComposerFactory<T>
extends SerializableFunction<JpaTransactionManager, QueryComposer<T>> {}
/**
* Returns a {@link RegistryQueryFactory} that creates a JPQL query from constant text.
*
* @param <T> Type of each row in the result set, {@link Object} in single-select queries, and
* {@code Object[]} in multi-select queries.
*/
@SuppressWarnings("unchecked") // query.getResultStream: jpa api uses raw type
static <T> RegistryQueryFactory<T> createQueryFactory(String jpql) {
return (JpaTransactionManager jpa) ->
() -> {
EntityManager entityManager = jpa.getEntityManager();
Query query = entityManager.createQuery(jpql);
return query.getResultStream().map(e -> detach(entityManager, e));
};
}
static <T> RegistryQueryFactory<T> createQueryFactory(
QueryComposerFactory<T> queryComposerFactory) {
return (JpaTransactionManager jpa) ->
() -> queryComposerFactory.apply(jpa).withAutoDetachOnLoad(true).stream();
}
/**
* Removes an object from the JPA session cache if applicable.
*
* @param object An object that represents a row in the result set. It may be a JPA entity, a
* non-entity object, or an array that holds JPA entities and/or non-entities.
*/
static <T> T detach(EntityManager entityManager, T object) {
if (object.getClass().isArray()) {
for (Object arrayElement : (Object[]) object) {
detachObject(entityManager, arrayElement);
}
} else {
detachObject(entityManager, object);
}
return object;
}
static void detachObject(EntityManager entityManager, Object object) {
Class<?> objectClass = object.getClass();
if (objectClass.isPrimitive() || objectClass == String.class) {
return;
}
try {
entityManager.detach(object);
} catch (IllegalArgumentException e) {
// Not an entity. Do nothing.
}
}
}

View file

@ -15,13 +15,28 @@
package google.registry.beam.common;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.testing.AppEngineExtension.makeRegistrar1;
import static google.registry.testing.DatabaseHelper.newRegistry;
import static google.registry.util.DateTimeUtils.END_OF_TIME;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import google.registry.beam.TestPipelineExtension;
import google.registry.beam.common.RegistryJpaIO.Read;
import google.registry.model.contact.ContactBase;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DomainAuthInfo;
import google.registry.model.domain.DomainBase;
import google.registry.model.domain.GracePeriod;
import google.registry.model.domain.launch.LaunchNotice;
import google.registry.model.domain.rgp.GracePeriodStatus;
import google.registry.model.domain.secdns.DelegationSignerData;
import google.registry.model.eppcommon.AuthInfo.PasswordAuth;
import google.registry.model.eppcommon.StatusValue;
import google.registry.model.registrar.Registrar;
import google.registry.model.registry.Registry;
import google.registry.model.transfer.ContactTransferData;
import google.registry.persistence.transaction.JpaTestRules;
import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension;
import google.registry.persistence.transaction.JpaTransactionManager;
@ -83,7 +98,7 @@ public class RegistryJpaReadTest {
}
@Test
void jpaRead() {
void readWithQueryComposer() {
Read<ContactResource, String> read =
RegistryJpaIO.read(
(JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class),
@ -93,4 +108,79 @@ public class RegistryJpaReadTest {
PAssert.that(repoIds).containsInAnyOrder("contact_0", "contact_1", "contact_2");
testPipeline.run();
}
@Test
void readWithStringQuery() {
setupForJoinQuery();
Read<Object[], String> read =
RegistryJpaIO.read(
"select d, r.emailAddress from Domain d join Registrar r on"
+ " d.currentSponsorClientId = r.clientIdentifier where r.type = 'REAL'"
+ " and d.deletionTime > now()",
RegistryJpaReadTest::parseRow);
PCollection<String> joinedStrings = testPipeline.apply(read);
PAssert.that(joinedStrings).containsInAnyOrder("4-COM-me@google.com");
testPipeline.run();
}
private static String parseRow(Object[] row) {
DomainBase domainBase = (DomainBase) row[0];
String emailAddress = (String) row[1];
return domainBase.getRepoId() + "-" + emailAddress;
}
private void setupForJoinQuery() {
Registry registry = newRegistry("com", "ABCD_APP");
Registrar registrar =
makeRegistrar1()
.asBuilder()
.setClientId("registrar1")
.setEmailAddress("me@google.com")
.build();
ContactResource contact =
new ContactResource.Builder()
.setRepoId("contactid_1")
.setCreationClientId(registrar.getClientId())
.setTransferData(new ContactTransferData.Builder().build())
.setPersistedCurrentSponsorClientId(registrar.getClientId())
.build();
DomainBase domain =
new DomainBase.Builder()
.setDomainName("example.com")
.setRepoId("4-COM")
.setCreationClientId(registrar.getClientId())
.setLastEppUpdateTime(fakeClock.nowUtc())
.setLastEppUpdateClientId(registrar.getClientId())
.setLastTransferTime(fakeClock.nowUtc())
.setStatusValues(
ImmutableSet.of(
StatusValue.CLIENT_DELETE_PROHIBITED,
StatusValue.SERVER_DELETE_PROHIBITED,
StatusValue.SERVER_TRANSFER_PROHIBITED,
StatusValue.SERVER_UPDATE_PROHIBITED,
StatusValue.SERVER_RENEW_PROHIBITED,
StatusValue.SERVER_HOLD))
.setRegistrant(contact.createVKey())
.setContacts(ImmutableSet.of())
.setSubordinateHosts(ImmutableSet.of("ns1.example.com"))
.setPersistedCurrentSponsorClientId(registrar.getClientId())
.setRegistrationExpirationTime(fakeClock.nowUtc().plusYears(1))
.setAuthInfo(DomainAuthInfo.create(PasswordAuth.create("password")))
.setDsData(ImmutableSet.of(DelegationSignerData.create(1, 2, 3, new byte[] {0, 1, 2})))
.setLaunchNotice(
LaunchNotice.create("tcnid", "validatorId", START_OF_TIME, START_OF_TIME))
.setSmdId("smdid")
.addGracePeriod(
GracePeriod.create(
GracePeriodStatus.ADD,
"4-COM",
END_OF_TIME,
registrar.getClientId(),
null,
100L))
.build();
jpaTm()
.transact(() -> jpaTm().insertAll(ImmutableList.of(registry, registrar, contact, domain)));
}
}