Flatten and inject the poll flows

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=133302791
This commit is contained in:
cgoldfeder 2016-09-15 13:46:12 -07:00 committed by Ben McIlwain
parent 939112318b
commit 4a723576d5
6 changed files with 107 additions and 131 deletions

View file

@ -15,17 +15,22 @@
package google.registry.flows.poll;
import static com.google.common.base.Preconditions.checkState;
import static google.registry.flows.poll.PollFlowUtils.getPollMessagesQuery;
import static google.registry.model.eppoutput.Result.Code.Success;
import static google.registry.model.eppoutput.Result.Code.SuccessWithNoMessages;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.Work;
import google.registry.flows.EppException;
import google.registry.flows.EppException.AuthorizationErrorException;
import google.registry.flows.EppException.ObjectDoesNotExistException;
import google.registry.flows.EppException.ParameterValueSyntaxErrorException;
import google.registry.flows.EppException.RequiredParameterMissingException;
import google.registry.flows.FlowModule.ClientId;
import google.registry.flows.FlowModule.PollMessageId;
import google.registry.flows.LoggedInFlow;
import google.registry.flows.TransactionalFlow;
import google.registry.model.eppoutput.EppOutput;
import google.registry.model.poll.MessageQueueInfo;
@ -42,34 +47,36 @@ import org.joda.time.DateTime;
* @error {@link PollAckFlow.MissingMessageIdException}
* @error {@link PollAckFlow.NotAuthorizedToAckMessageException}
*/
public class PollAckFlow extends PollFlow implements TransactionalFlow {
public class PollAckFlow extends LoggedInFlow implements TransactionalFlow {
@Inject @ClientId String clientId;
@Inject @PollMessageId String messageId;
@Inject PollAckFlow() {}
@Override
public final EppOutput run() throws EppException {
if (command.getMessageId() == null) {
if (messageId.isEmpty()) {
throw new MissingMessageIdException();
}
Key<PollMessage> pollMessageKey;
// Try parsing the messageId, and throw an exception if it's invalid.
try {
pollMessageKey = PollMessage.EXTERNAL_KEY_CONVERTER.reverse().convert(command.getMessageId());
pollMessageKey = PollMessage.EXTERNAL_KEY_CONVERTER.reverse().convert(messageId);
} catch (PollMessageExternalKeyParseException e) {
throw new InvalidMessageIdException(command.getMessageId());
throw new InvalidMessageIdException(messageId);
}
// Load the message to be acked. If a message is queued to be delivered in the future, we treat
// it as if it doesn't exist yet.
PollMessage pollMessage = ofy().load().key(pollMessageKey).now();
if (pollMessage == null || !isBeforeOrAt(pollMessage.getEventTime(), now)) {
throw new MessageDoesNotExistException(command.getMessageId());
throw new MessageDoesNotExistException(messageId);
}
// Make sure this client is authorized to ack this message. It could be that the message is
// supposed to go to a different registrar.
if (!getClientId().equals(pollMessage.getClientId())) {
if (!clientId.equals(pollMessage.getClientId())) {
throw new NotAuthorizedToAckMessageException();
}
@ -100,23 +107,23 @@ public class PollAckFlow extends PollFlow implements TransactionalFlow {
// We need to return the new queue length. If this was the last message in the queue being
// acked, then we return a special status code indicating that. Note that the query will
// include the message being acked.
int messageCount = getMessageQueueLength();
int messageCount = ofy().doTransactionless(new Work<Integer>() {
@Override
public Integer run() {
return getPollMessagesQuery(clientId, now).count();
}});
if (!includeAckedMessageInCount) {
messageCount--;
}
if (messageCount <= 0) {
return createOutput(SuccessWithNoMessages);
}
return createOutput(
Success,
MessageQueueInfo.create(
null, // eventTime
null, // msg
messageCount,
command.getMessageId()),
null, // responseData
null); // extensions
return createOutput(Success, null, null, MessageQueueInfo.create(
null, // eventTime
null, // msg
messageCount,
messageId));
}
/** Registrar is not authorized to ack this message. */