Convert most poll message queries to QueryComposer (#1151)

* Convert most poll message queries to QueryComposer

* Add unit test and a better exception for datastore

* Remove datastorePollMessageQuery from PollFlowUtils

* Reformatted.

* Improved test equality checks

* Changes for review

* Converted concatenated string to String.format()
This commit is contained in:
Michael Muller 2021-05-19 15:58:20 -04:00 committed by GitHub
parent f713517197
commit ae45462f11
5 changed files with 118 additions and 74 deletions

View file

@ -15,57 +15,29 @@
package google.registry.flows.poll;
import static com.google.common.base.Preconditions.checkArgument;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.QueryComposer.Comparator.EQ;
import static google.registry.persistence.transaction.QueryComposer.Comparator.LTE;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.persistence.transaction.TransactionManagerUtil.transactIfJpaTm;
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
import com.googlecode.objectify.cmd.Query;
import google.registry.model.poll.PollMessage;
import google.registry.persistence.transaction.QueryComposer;
import java.util.Optional;
import org.joda.time.DateTime;
/** Static utility functions for poll flows. */
public final class PollFlowUtils {
public static final String SQL_POLL_MESSAGE_QUERY =
"FROM PollMessage WHERE clientId = :registrarId AND eventTime <= :now ORDER BY eventTime ASC";
private static final String SQL_POLL_MESSAGE_COUNT_QUERY =
"SELECT COUNT(*) FROM PollMessage WHERE clientId = :registrarId AND eventTime <= :now";
/** Returns the number of poll messages for the given registrar that are not in the future. */
public static int getPollMessageCount(String registrarId, DateTime now) {
if (tm().isOfy()) {
return datastorePollMessageQuery(registrarId, now).count();
} else {
return jpaTm()
.transact(
() ->
jpaTm()
.query(SQL_POLL_MESSAGE_COUNT_QUERY, Long.class)
.setParameter("registrarId", registrarId)
.setParameter("now", now)
.getSingleResult()
.intValue());
}
return transactIfJpaTm(() -> createPollMessageQuery(registrarId, now).count()).intValue();
}
/** Returns the first (by event time) poll message not in the future for this registrar. */
public static Optional<PollMessage> getFirstPollMessage(String registrarId, DateTime now) {
if (tm().isOfy()) {
return Optional.ofNullable(datastorePollMessageQuery(registrarId, now).first().now());
} else {
return jpaTm()
.transact(
() ->
jpaTm()
.query(SQL_POLL_MESSAGE_QUERY, PollMessage.class)
.setParameter("registrarId", registrarId)
.setParameter("now", now)
.setMaxResults(1)
.getResultStream()
.findFirst());
}
return transactIfJpaTm(
() -> createPollMessageQuery(registrarId, now).orderBy("eventTime").first());
}
/**
@ -106,14 +78,15 @@ public final class PollFlowUtils {
return includeAckedMessageInCount;
}
/** A Datastore query for poll messages from the given registrar that are not in the future. */
public static Query<PollMessage> datastorePollMessageQuery(String registrarId, DateTime now) {
return ofy()
.load()
.type(PollMessage.class)
.filter("clientId", registrarId)
.filter("eventTime <=", now.toDate())
.order("eventTime");
/**
* Returns the QueryComposer for poll messages from the given registrar that are not in the
* future.
*/
public static QueryComposer<PollMessage> createPollMessageQuery(
String registrarId, DateTime now) {
return tm().createQueryComposer(PollMessage.class)
.where("clientId", EQ, registrarId)
.where("eventTime", LTE, now);
}
private PollFlowUtils() {}

View file

@ -411,7 +411,12 @@ public class DatastoreTransactionManager implements TransactionManager {
checkOnlyOneInequalityField();
Query<T> result = auditedOfy().load().type(entityClass);
for (WhereClause pred : predicates) {
result = result.filter(pred.fieldName + pred.comparator.getDatastoreString(), pred.value);
String comparatorString = pred.comparator.getDatastoreString();
if (comparatorString == null) {
throw new UnsupportedOperationException(
String.format("The %s operation is not supported on Datastore.", pred.comparator));
}
result = result.filter(pred.fieldName + comparatorString, pred.value);
}
if (orderBy != null) {

View file

@ -152,6 +152,10 @@ public abstract class QueryComposer<T> {
return criteriaBuilder::greaterThan;
}
public static WhereOperator<String> like(CriteriaBuilder criteriaBuilder) {
return criteriaBuilder::like;
}
/**
* Enum used to specify comparison operations, e.g. {@code where("fieldName", Comparator.NE,
* "someval")'}.
@ -183,7 +187,14 @@ public abstract class QueryComposer<T> {
GTE(" >=", QueryComposer::greaterThanOrEqualTo),
/** Return only records whose field is greater than the value. */
GT(" >", QueryComposer::greaterThan);
GT(" >", QueryComposer::greaterThan),
/**
* Return only records whose field matches the pattern.
*
* <p>SQL ONLY.
*/
LIKE(null, QueryComposer::like);
private final String datastoreString;

View file

@ -15,10 +15,10 @@
package google.registry.tools;
import static com.google.common.base.Strings.isNullOrEmpty;
import static google.registry.flows.poll.PollFlowUtils.SQL_POLL_MESSAGE_QUERY;
import static google.registry.flows.poll.PollFlowUtils.datastorePollMessageQuery;
import static google.registry.flows.poll.PollFlowUtils.createPollMessageQuery;
import static google.registry.model.ofy.ObjectifyService.auditedOfy;
import static google.registry.model.poll.PollMessageExternalKeyConverter.makePollMessageExternalId;
import static google.registry.persistence.transaction.QueryComposer.Comparator.LIKE;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
@ -32,10 +32,10 @@ import google.registry.flows.poll.PollFlowUtils;
import google.registry.model.poll.PollMessage;
import google.registry.model.poll.PollMessage.Autorenew;
import google.registry.model.poll.PollMessage.OneTime;
import google.registry.persistence.transaction.QueryComposer;
import google.registry.util.Clock;
import java.util.List;
import javax.inject.Inject;
import javax.persistence.TypedQuery;
/**
* Command to acknowledge one-time poll messages for a registrar.
@ -61,10 +61,6 @@ import javax.persistence.TypedQuery;
@Parameters(separators = " =", commandDescription = "Acknowledge one-time poll messages.")
final class AckPollMessagesCommand implements CommandWithRemoteApi {
private static final String SQL_POLL_MESSAGE_QUERY_BY_MESSAGE =
"FROM PollMessage WHERE clientId = :registrarId AND eventTime <= :now AND msg LIKE :msg"
+ " ORDER BY eventTime ASC";
@Parameter(
names = {"-c", "--client"},
description = "Client identifier of the registrar whose poll messages should be ACKed",
@ -102,7 +98,14 @@ final class AckPollMessagesCommand implements CommandWithRemoteApi {
* the Datastore size limits.
*/
private void ackPollMessagesDatastore() {
QueryKeys<PollMessage> query = datastorePollMessageQuery(clientId, clock.nowUtc()).keys();
QueryKeys<PollMessage> query =
auditedOfy()
.load()
.type(PollMessage.class)
.filter("clientId", clientId)
.filter("eventTime <=", clock.nowUtc())
.order("eventTime")
.keys();
for (List<Key<PollMessage>> keys : Iterables.partition(query, BATCH_SIZE)) {
tm().transact(
() ->
@ -118,21 +121,15 @@ final class AckPollMessagesCommand implements CommandWithRemoteApi {
jpaTm()
.transact(
() -> {
TypedQuery<PollMessage> typedQuery;
if (isNullOrEmpty(message)) {
typedQuery = jpaTm().query(SQL_POLL_MESSAGE_QUERY, PollMessage.class);
} else {
typedQuery =
jpaTm()
.query(SQL_POLL_MESSAGE_QUERY_BY_MESSAGE, PollMessage.class)
.setParameter("msg", "%" + message + "%");
QueryComposer<PollMessage> query = createPollMessageQuery(clientId, clock.nowUtc());
if (!isNullOrEmpty(message)) {
query = query.where("msg", LIKE, "%" + message + "%");
}
typedQuery
.setParameter("registrarId", clientId)
.setParameter("now", clock.nowUtc())
.getResultStream()
query.stream()
// Detach it so that we can print out the old, non-acked version
// (for autorenews, acking changes the next event time)
// TODO(mmuller): remove after PR 1116 is merged.
.peek(jpaTm().getEntityManager()::detach)
.forEach(this::actOnPollMessage);
});

View file

@ -21,7 +21,6 @@ import static google.registry.persistence.transaction.TransactionManagerFactory.
import static google.registry.persistence.transaction.TransactionManagerUtil.transactIfJpaTm;
import static org.junit.Assert.assertThrows;
import com.google.common.collect.ImmutableList;
import com.googlecode.objectify.annotation.Entity;
import com.googlecode.objectify.annotation.Id;
import com.googlecode.objectify.annotation.Index;
@ -169,7 +168,7 @@ public class QueryComposerTest {
.where("name", Comparator.EQ, "alpha")
.stream()
.collect(toImmutableList())))
.isEqualTo(ImmutableList.of(alpha));
.containsExactly(alpha);
assertThat(
transactIfJpaTm(
() ->
@ -178,7 +177,7 @@ public class QueryComposerTest {
.where("name", Comparator.GT, "alpha")
.stream()
.collect(toImmutableList())))
.isEqualTo(ImmutableList.of(bravo, charlie));
.containsExactly(bravo, charlie);
assertThat(
transactIfJpaTm(
() ->
@ -187,7 +186,7 @@ public class QueryComposerTest {
.where("name", Comparator.GTE, "bravo")
.stream()
.collect(toImmutableList())))
.isEqualTo(ImmutableList.of(bravo, charlie));
.containsExactly(bravo, charlie);
assertThat(
transactIfJpaTm(
() ->
@ -196,7 +195,7 @@ public class QueryComposerTest {
.where("name", Comparator.LT, "charlie")
.stream()
.collect(toImmutableList())))
.isEqualTo(ImmutableList.of(alpha, bravo));
.containsExactly(alpha, bravo);
assertThat(
transactIfJpaTm(
() ->
@ -205,7 +204,7 @@ public class QueryComposerTest {
.where("name", Comparator.LTE, "bravo")
.stream()
.collect(toImmutableList())))
.isEqualTo(ImmutableList.of(alpha, bravo));
.containsExactly(alpha, bravo);
}
@TestOfyAndSql
@ -216,7 +215,7 @@ public class QueryComposerTest {
tm().createQueryComposer(TestEntity.class)
.where("name", Comparator.GT, "alpha")
.list()))
.isEqualTo(ImmutableList.of(bravo, charlie));
.containsExactly(bravo, charlie);
}
@TestOfyAndSql
@ -242,7 +241,7 @@ public class QueryComposerTest {
.orderBy("val")
.stream()
.collect(toImmutableList())))
.isEqualTo(ImmutableList.of(bravo, alpha));
.containsExactly(bravo, alpha);
}
@TestOfyAndSql
@ -262,7 +261,7 @@ public class QueryComposerTest {
.where("name", Comparator.GT, "foxtrot")
.stream()
.collect(toImmutableList())))
.isEqualTo(ImmutableList.of());
.isEmpty();
}
@TestOfyOnly
@ -292,6 +291,65 @@ public class QueryComposerTest {
.containsExactly(alpha);
}
@TestSqlOnly
public void testLikeQueries() {
assertThat(
transactIfJpaTm(
() ->
tm()
.createQueryComposer(TestEntity.class)
.where("name", Comparator.LIKE, "%harl%")
.stream()
.collect(toImmutableList())))
.containsExactly(charlie);
// Verify that full matches work.
assertThat(
transactIfJpaTm(
() ->
tm()
.createQueryComposer(TestEntity.class)
.where("name", Comparator.LIKE, "alpha")
.stream()
.collect(toImmutableList())))
.containsExactly(alpha);
// verify that we don't do partial matches.
assertThat(
transactIfJpaTm(
() ->
tm()
.createQueryComposer(TestEntity.class)
.where("name", Comparator.LIKE, "%harl")
.stream()
.collect(toImmutableList())))
.isEmpty();
assertThat(
transactIfJpaTm(
() ->
tm()
.createQueryComposer(TestEntity.class)
.where("name", Comparator.LIKE, "harl%")
.stream()
.collect(toImmutableList())))
.isEmpty();
}
@TestOfyOnly
public void testLikeQueries_failsOnOfy() {
UnsupportedOperationException thrown =
assertThrows(
UnsupportedOperationException.class,
() ->
tm()
.createQueryComposer(TestEntity.class)
.where("name", Comparator.LIKE, "%")
.stream());
assertThat(thrown)
.hasMessageThat()
.contains("The LIKE operation is not supported on Datastore.");
}
@javax.persistence.Entity
@Entity(name = "QueryComposerTestEntity")
private static class TestEntity extends ImmutableObject {