mirror of
https://github.com/google/nomulus.git
synced 2025-04-29 19:47:51 +02:00
Make RegistryJpaIO use CriteriaQuery intead of QueryComposer (#1209)
QueryComposer could be used when the transaction manager is not determined (i. e. it supports both ofy and sql), but this also imposes limits on what you can do with it. For example it does not support IN operator in the where clause. Since QueryComposer itself creates a CriteriaQuery for JPA TM it make sense to have RegistryJpaIO take a CriteriaQuery directly as it only uses JPA. Also add some more helper methods to use native queries and typed queires, and fix some generic type warnings. <!-- Reviewable:start --> --- This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/google/nomulus/1209) <!-- Reviewable:end -->
This commit is contained in:
parent
8771222d9f
commit
e4e7c5ead2
7 changed files with 170 additions and 73 deletions
|
@ -202,6 +202,8 @@ PRESUBMITS = {
|
|||
"java",
|
||||
# ActivityReportingQueryBuilder deals with Dremel queries
|
||||
{"src/test", "ActivityReportingQueryBuilder.java",
|
||||
# This class contains helper method to make queries in Beam.
|
||||
"RegistryJpaIO.java",
|
||||
# TODO(b/179158393): Remove everything below, which should be done
|
||||
# using Criteria
|
||||
"ForeignKeyIndex.java",
|
||||
|
|
|
@ -21,14 +21,15 @@ import com.google.auto.value.AutoValue;
|
|||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Streams;
|
||||
import google.registry.backup.AppEngineEnvironment;
|
||||
import google.registry.beam.common.RegistryQuery.QueryComposerFactory;
|
||||
import google.registry.beam.common.RegistryQuery.RegistryQueryFactory;
|
||||
import google.registry.beam.common.RegistryQuery.CriteriaQuerySupplier;
|
||||
import google.registry.model.ofy.ObjectifyService;
|
||||
import google.registry.persistence.transaction.JpaTransactionManager;
|
||||
import google.registry.persistence.transaction.TransactionManagerFactory;
|
||||
import java.io.Serializable;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.persistence.criteria.CriteriaQuery;
|
||||
import org.apache.beam.sdk.coders.Coder;
|
||||
import org.apache.beam.sdk.coders.SerializableCoder;
|
||||
|
@ -60,13 +61,18 @@ public final class RegistryJpaIO {
|
|||
|
||||
private RegistryJpaIO() {}
|
||||
|
||||
public static <R> Read<R, R> read(QueryComposerFactory<R> queryFactory) {
|
||||
return Read.<R, R>builder().queryFactory(queryFactory).build();
|
||||
public static <R> Read<R, R> read(CriteriaQuerySupplier<R> query) {
|
||||
return read(query, x -> x);
|
||||
}
|
||||
|
||||
public static <R, T> Read<R, T> read(
|
||||
QueryComposerFactory<R> queryFactory, SerializableFunction<R, T> resultMapper) {
|
||||
return Read.<R, T>builder().queryFactory(queryFactory).resultMapper(resultMapper).build();
|
||||
CriteriaQuerySupplier<R> query, SerializableFunction<R, T> resultMapper) {
|
||||
return Read.<R, T>builder().criteriaQuery(query).resultMapper(resultMapper).build();
|
||||
}
|
||||
|
||||
public static <R, T> Read<R, T> read(
|
||||
String sql, boolean nativeQuery, SerializableFunction<R, T> resultMapper) {
|
||||
return read(sql, null, nativeQuery, resultMapper);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -74,8 +80,39 @@ public final class RegistryJpaIO {
|
|||
*
|
||||
* <p>User should take care to prevent sql-injection attacks.
|
||||
*/
|
||||
public static <R, T> Read<R, T> read(String jpql, SerializableFunction<R, T> resultMapper) {
|
||||
return Read.<R, T>builder().jpqlQueryFactory(jpql).resultMapper(resultMapper).build();
|
||||
public static <R, T> Read<R, T> read(
|
||||
String sql,
|
||||
@Nullable Map<String, Object> parameter,
|
||||
boolean nativeQuery,
|
||||
SerializableFunction<R, T> resultMapper) {
|
||||
Read.Builder<R, T> builder = Read.builder();
|
||||
if (nativeQuery) {
|
||||
builder.nativeQuery(sql, parameter);
|
||||
} else {
|
||||
builder.jpqlQuery(sql, parameter);
|
||||
}
|
||||
return builder.resultMapper(resultMapper).build();
|
||||
}
|
||||
|
||||
public static <R, T> Read<R, T> read(
|
||||
String jpql, Class<R> clazz, SerializableFunction<R, T> resultMapper) {
|
||||
return read(jpql, null, clazz, resultMapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link Read} connector based on the given {@code jpql} typed query string.
|
||||
*
|
||||
* <p>User should take care to prevent sql-injection attacks.
|
||||
*/
|
||||
public static <R, T> Read<R, T> read(
|
||||
String jpql,
|
||||
@Nullable Map<String, Object> parameter,
|
||||
Class<R> clazz,
|
||||
SerializableFunction<R, T> resultMapper) {
|
||||
return Read.<R, T>builder()
|
||||
.jpqlQuery(jpql, clazz, parameter)
|
||||
.resultMapper(resultMapper)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static <T> Write<T> write() {
|
||||
|
@ -94,7 +131,7 @@ public final class RegistryJpaIO {
|
|||
|
||||
abstract String name();
|
||||
|
||||
abstract RegistryQueryFactory<R> queryFactory();
|
||||
abstract RegistryQuery<R> query();
|
||||
|
||||
abstract SerializableFunction<R, T> resultMapper();
|
||||
|
||||
|
@ -107,9 +144,7 @@ public final class RegistryJpaIO {
|
|||
public PCollection<T> expand(PBegin input) {
|
||||
return input
|
||||
.apply("Starting " + name(), Create.of((Void) null))
|
||||
.apply(
|
||||
"Run query for " + name(),
|
||||
ParDo.of(new QueryRunner<>(queryFactory(), resultMapper())))
|
||||
.apply("Run query for " + name(), ParDo.of(new QueryRunner<>(query(), resultMapper())))
|
||||
.setCoder(coder())
|
||||
.apply("Reshuffle", Reshuffle.viaRandomKey());
|
||||
}
|
||||
|
@ -127,9 +162,8 @@ public final class RegistryJpaIO {
|
|||
}
|
||||
|
||||
static <R, T> Builder<R, T> builder() {
|
||||
return new AutoValue_RegistryJpaIO_Read.Builder()
|
||||
return new AutoValue_RegistryJpaIO_Read.Builder<R, T>()
|
||||
.name(DEFAULT_NAME)
|
||||
.resultMapper(x -> x)
|
||||
.coder(SerializableCoder.of(Serializable.class));
|
||||
}
|
||||
|
||||
|
@ -138,7 +172,7 @@ public final class RegistryJpaIO {
|
|||
|
||||
abstract Builder<R, T> name(String name);
|
||||
|
||||
abstract Builder<R, T> queryFactory(RegistryQueryFactory<R> queryFactory);
|
||||
abstract Builder<R, T> query(RegistryQuery<R> query);
|
||||
|
||||
abstract Builder<R, T> resultMapper(SerializableFunction<R, T> mapper);
|
||||
|
||||
|
@ -146,21 +180,29 @@ public final class RegistryJpaIO {
|
|||
|
||||
abstract Read<R, T> build();
|
||||
|
||||
Builder<R, T> queryFactory(QueryComposerFactory<R> queryFactory) {
|
||||
return queryFactory(RegistryQuery.createQueryFactory(queryFactory));
|
||||
Builder<R, T> criteriaQuery(CriteriaQuerySupplier<R> criteriaQuery) {
|
||||
return query(RegistryQuery.createQuery(criteriaQuery));
|
||||
}
|
||||
|
||||
Builder<R, T> jpqlQueryFactory(String jpql) {
|
||||
return queryFactory(RegistryQuery.createQueryFactory(jpql));
|
||||
Builder<R, T> nativeQuery(String sql, Map<String, Object> parameters) {
|
||||
return query(RegistryQuery.createQuery(sql, parameters, true));
|
||||
}
|
||||
|
||||
Builder<R, T> jpqlQuery(String jpql, Map<String, Object> parameters) {
|
||||
return query(RegistryQuery.createQuery(jpql, parameters, false));
|
||||
}
|
||||
|
||||
Builder<R, T> jpqlQuery(String jpql, Class<R> clazz, Map<String, Object> parameters) {
|
||||
return query(RegistryQuery.createQuery(jpql, parameters, clazz));
|
||||
}
|
||||
}
|
||||
|
||||
static class QueryRunner<R, T> extends DoFn<Void, T> {
|
||||
private final RegistryQueryFactory<R> queryFactory;
|
||||
private final RegistryQuery<R> query;
|
||||
private final SerializableFunction<R, T> resultMapper;
|
||||
|
||||
QueryRunner(RegistryQueryFactory<R> queryFactory, SerializableFunction<R, T> resultMapper) {
|
||||
this.queryFactory = queryFactory;
|
||||
QueryRunner(RegistryQuery<R> query, SerializableFunction<R, T> resultMapper) {
|
||||
this.query = query;
|
||||
this.resultMapper = resultMapper;
|
||||
}
|
||||
|
||||
|
@ -172,10 +214,7 @@ public final class RegistryJpaIO {
|
|||
// TODO(b/187210388): JpaTransactionManager should support non-transactional query.
|
||||
jpaTm()
|
||||
.transactNoRetry(
|
||||
() ->
|
||||
queryFactory.apply(jpaTm()).stream()
|
||||
.map(resultMapper::apply)
|
||||
.forEach(outputReceiver::output));
|
||||
() -> query.stream().map(resultMapper::apply).forEach(outputReceiver::output));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,48 +14,75 @@
|
|||
|
||||
package google.registry.beam.common;
|
||||
|
||||
import google.registry.persistence.transaction.JpaTransactionManager;
|
||||
import google.registry.persistence.transaction.QueryComposer;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.Query;
|
||||
import org.apache.beam.sdk.transforms.SerializableFunction;
|
||||
import javax.persistence.TypedQuery;
|
||||
import javax.persistence.criteria.CriteriaQuery;
|
||||
|
||||
/** Interface for query instances used by {@link RegistryJpaIO.Read}. */
|
||||
public interface RegistryQuery<T> {
|
||||
public interface RegistryQuery<T> extends Serializable {
|
||||
Stream<T> stream();
|
||||
|
||||
/** Factory for {@link RegistryQuery}. */
|
||||
interface RegistryQueryFactory<T>
|
||||
extends SerializableFunction<JpaTransactionManager, RegistryQuery<T>> {}
|
||||
|
||||
// TODO(mmuller): Consider detached JpaQueryComposer that works with any JpaTransactionManager
|
||||
// instance, i.e., change composer.buildQuery() to composer.buildQuery(JpaTransactionManager).
|
||||
// This way QueryComposer becomes reusable and serializable (at least with Hibernate), and this
|
||||
// interface would no longer be necessary.
|
||||
interface QueryComposerFactory<T>
|
||||
extends SerializableFunction<JpaTransactionManager, QueryComposer<T>> {}
|
||||
interface CriteriaQuerySupplier<T> extends Supplier<CriteriaQuery<T>>, Serializable {}
|
||||
|
||||
/**
|
||||
* Returns a {@link RegistryQueryFactory} that creates a JPQL query from constant text.
|
||||
* Returns a {@link RegistryQuery} that creates a string query from constant text.
|
||||
*
|
||||
* @param nativeQuery whether the given string is to be interpreted as a native query or JPQL.
|
||||
* @param parameters parameters to be substituted in the query.
|
||||
* @param <T> Type of each row in the result set, {@link Object} in single-select queries, and
|
||||
* {@code Object[]} in multi-select queries.
|
||||
*/
|
||||
@SuppressWarnings("unchecked") // query.getResultStream: jpa api uses raw type
|
||||
static <T> RegistryQueryFactory<T> createQueryFactory(String jpql) {
|
||||
return (JpaTransactionManager jpa) ->
|
||||
() -> {
|
||||
EntityManager entityManager = jpa.getEntityManager();
|
||||
Query query = entityManager.createQuery(jpql);
|
||||
return query.getResultStream().map(e -> detach(entityManager, e));
|
||||
};
|
||||
static <T> RegistryQuery<T> createQuery(
|
||||
String sql, @Nullable Map<String, Object> parameters, boolean nativeQuery) {
|
||||
return () -> {
|
||||
EntityManager entityManager = jpaTm().getEntityManager();
|
||||
Query query =
|
||||
nativeQuery ? entityManager.createNativeQuery(sql) : entityManager.createQuery(sql);
|
||||
if (parameters != null) {
|
||||
parameters.forEach(query::setParameter);
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
Stream<T> resultStream = query.getResultStream();
|
||||
return nativeQuery ? resultStream : resultStream.map(e -> detach(entityManager, e));
|
||||
};
|
||||
}
|
||||
|
||||
static <T> RegistryQueryFactory<T> createQueryFactory(
|
||||
QueryComposerFactory<T> queryComposerFactory) {
|
||||
return (JpaTransactionManager jpa) ->
|
||||
() -> queryComposerFactory.apply(jpa).withAutoDetachOnLoad(true).stream();
|
||||
/**
|
||||
* Returns a {@link RegistryQuery} that creates a typed JPQL query from constant text.
|
||||
*
|
||||
* @param parameters parameters to be substituted in the query.
|
||||
* @param <T> Type of each row in the result set.
|
||||
*/
|
||||
static <T> RegistryQuery<T> createQuery(
|
||||
String jpql, @Nullable Map<String, Object> parameters, Class<T> clazz) {
|
||||
return () -> {
|
||||
TypedQuery<T> query = jpaTm().query(jpql, clazz);
|
||||
if (parameters != null) {
|
||||
parameters.forEach(query::setParameter);
|
||||
}
|
||||
return query.getResultStream();
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link RegistryQuery} from a {@link CriteriaQuery} supplier.
|
||||
*
|
||||
* <p>A serializable supplier is needed in because {@link CriteriaQuery} itself must be created
|
||||
* within a transaction, and we are not in a transaction yet when this function is called to set
|
||||
* up the pipeline.
|
||||
*
|
||||
* @param <T> Type of each row in the result set.
|
||||
*/
|
||||
static <T> RegistryQuery<T> createQuery(CriteriaQuerySupplier<T> criteriaQuery) {
|
||||
return () -> jpaTm().query(criteriaQuery.get()).getResultStream();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -116,6 +116,7 @@ public class Spec11Pipeline implements Serializable {
|
|||
"select d, r.emailAddress from Domain d join Registrar r on"
|
||||
+ " d.currentSponsorClientId = r.clientIdentifier where r.type = 'REAL'"
|
||||
+ " and d.deletionTime > now()",
|
||||
false,
|
||||
Spec11Pipeline::parseRow);
|
||||
|
||||
return pipeline.apply("Read active domains from Cloud SQL", read);
|
||||
|
|
|
@ -93,16 +93,6 @@ public abstract class QueryComposer<T> {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies if JPA entities should be automatically detached from the persistence context after
|
||||
* loading. The default behavior is auto-detach.
|
||||
*
|
||||
* <p>This configuration has no effect on Datastore queries.
|
||||
*/
|
||||
public QueryComposer<T> withAutoDetachOnLoad(boolean autoDetachOnLoad) {
|
||||
return this;
|
||||
}
|
||||
|
||||
/** Returns the first result of the query or an empty optional if there is none. */
|
||||
public abstract Optional<T> first();
|
||||
|
||||
|
|
|
@ -69,7 +69,6 @@ final class GenerateLordnCommand implements CommandWithRemoteApi {
|
|||
.createQueryComposer(DomainBase.class)
|
||||
.where("tld", Comparator.EQ, tld)
|
||||
.orderBy("repoId")
|
||||
.withAutoDetachOnLoad(false)
|
||||
.stream()
|
||||
.forEach(domain -> processDomain(claimsCsv, sunriseCsv, domain)));
|
||||
ImmutableList<String> claimsRows = claimsCsv.build();
|
||||
|
|
|
@ -21,6 +21,7 @@ import static google.registry.util.DateTimeUtils.END_OF_TIME;
|
|||
import static google.registry.util.DateTimeUtils.START_OF_TIME;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import google.registry.beam.TestPipelineExtension;
|
||||
import google.registry.beam.common.RegistryJpaIO.Read;
|
||||
|
@ -37,9 +38,9 @@ import google.registry.model.eppcommon.StatusValue;
|
|||
import google.registry.model.registrar.Registrar;
|
||||
import google.registry.model.registry.Registry;
|
||||
import google.registry.model.transfer.ContactTransferData;
|
||||
import google.registry.persistence.transaction.CriteriaQueryBuilder;
|
||||
import google.registry.persistence.transaction.JpaTestRules;
|
||||
import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension;
|
||||
import google.registry.persistence.transaction.JpaTransactionManager;
|
||||
import google.registry.testing.AppEngineExtension;
|
||||
import google.registry.testing.DatabaseHelper;
|
||||
import google.registry.testing.DatastoreEntityExtension;
|
||||
|
@ -98,10 +99,10 @@ public class RegistryJpaReadTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
void readWithQueryComposer() {
|
||||
void readWithCriteriaQuery() {
|
||||
Read<ContactResource, String> read =
|
||||
RegistryJpaIO.read(
|
||||
(JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class),
|
||||
() -> CriteriaQueryBuilder.create(ContactResource.class).build(),
|
||||
ContactBase::getContactId);
|
||||
PCollection<String> repoIds = testPipeline.apply(read);
|
||||
|
||||
|
@ -115,19 +116,57 @@ public class RegistryJpaReadTest {
|
|||
Read<Object[], String> read =
|
||||
RegistryJpaIO.read(
|
||||
"select d, r.emailAddress from Domain d join Registrar r on"
|
||||
+ " d.currentSponsorClientId = r.clientIdentifier where r.type = 'REAL'"
|
||||
+ " d.currentSponsorClientId = r.clientIdentifier where r.type = :type"
|
||||
+ " and d.deletionTime > now()",
|
||||
RegistryJpaReadTest::parseRow);
|
||||
ImmutableMap.of("type", Registrar.Type.REAL),
|
||||
false,
|
||||
(Object[] row) -> {
|
||||
DomainBase domainBase = (DomainBase) row[0];
|
||||
String emailAddress = (String) row[1];
|
||||
return domainBase.getRepoId() + "-" + emailAddress;
|
||||
});
|
||||
PCollection<String> joinedStrings = testPipeline.apply(read);
|
||||
|
||||
PAssert.that(joinedStrings).containsInAnyOrder("4-COM-me@google.com");
|
||||
testPipeline.run();
|
||||
}
|
||||
|
||||
private static String parseRow(Object[] row) {
|
||||
DomainBase domainBase = (DomainBase) row[0];
|
||||
String emailAddress = (String) row[1];
|
||||
return domainBase.getRepoId() + "-" + emailAddress;
|
||||
@Test
|
||||
void readWithStringNativeQuery() {
|
||||
setupForJoinQuery();
|
||||
Read<Object[], String> read =
|
||||
RegistryJpaIO.read(
|
||||
"select d.repo_id, r.email_address from \"Domain\" d join \"Registrar\" r on"
|
||||
+ " d.current_sponsor_registrar_id = r.registrar_id where r.type = :type"
|
||||
+ " and d.deletion_time > now()",
|
||||
ImmutableMap.of("type", "REAL"),
|
||||
true,
|
||||
(Object[] row) -> {
|
||||
String repoId = (String) row[0];
|
||||
String emailAddress = (String) row[1];
|
||||
return repoId + "-" + emailAddress;
|
||||
});
|
||||
PCollection<String> joinedStrings = testPipeline.apply(read);
|
||||
|
||||
PAssert.that(joinedStrings).containsInAnyOrder("4-COM-me@google.com");
|
||||
testPipeline.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
void readWithStringTypedQuery() {
|
||||
setupForJoinQuery();
|
||||
Read<DomainBase, String> read =
|
||||
RegistryJpaIO.read(
|
||||
"select d from Domain d join Registrar r on"
|
||||
+ " d.currentSponsorClientId = r.clientIdentifier where r.type = :type"
|
||||
+ " and d.deletionTime > now()",
|
||||
ImmutableMap.of("type", Registrar.Type.REAL),
|
||||
DomainBase.class,
|
||||
DomainBase::getRepoId);
|
||||
PCollection<String> repoIds = testPipeline.apply(read);
|
||||
|
||||
PAssert.that(repoIds).containsInAnyOrder("4-COM");
|
||||
testPipeline.run();
|
||||
}
|
||||
|
||||
private void setupForJoinQuery() {
|
||||
|
|
Loading…
Add table
Reference in a new issue