Batch NORDN pull queue task deletions

They were failing because the maximum App Engine task batch size is 1,000, and
we currently have more than 4,000 tasks in the pull queue. We keep re-uploading
those to NORDN because we're unable to delete the tasks after successful upload,
so the leases expire and they get processed again.

Also renames TaskEnqueuer to TaskQueueUtils to reflect its newly expanded role.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=197060903
This commit is contained in:
mcilwain 2018-05-17 15:22:34 -07:00 committed by jianglai
parent 1248a7722b
commit c989911526
21 changed files with 129 additions and 73 deletions

View file

@ -44,7 +44,7 @@ import google.registry.request.RequestParameters;
import google.registry.request.lock.LockHandler;
import google.registry.tldconfig.idn.IdnTableEnum;
import google.registry.util.FormattingLogger;
import google.registry.util.TaskEnqueuer;
import google.registry.util.TaskQueueUtils;
import google.registry.xjc.rdeheader.XjcRdeHeader;
import google.registry.xjc.rdeheader.XjcRdeHeaderElement;
import google.registry.xml.XmlException;
@ -70,7 +70,7 @@ public final class RdeStagingReducer extends Reducer<PendingDeposit, DepositFrag
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
private final TaskEnqueuer taskEnqueuer;
private final TaskQueueUtils taskQueueUtils;
private final LockHandler lockHandler;
private final int gcsBufferSize;
private final String bucket;
@ -81,7 +81,7 @@ public final class RdeStagingReducer extends Reducer<PendingDeposit, DepositFrag
@Inject
RdeStagingReducer(
TaskEnqueuer taskEnqueuer,
TaskQueueUtils taskQueueUtils,
LockHandler lockHandler,
@Config("gcsBufferSize") int gcsBufferSize,
@Config("rdeBucket") String bucket,
@ -89,7 +89,7 @@ public final class RdeStagingReducer extends Reducer<PendingDeposit, DepositFrag
@Config("rdeStagingLockTimeout") Duration lockTimeout,
@KeyModule.Key("rdeStagingEncryptionKey") byte[] stagingKeyBytes,
@Parameter(RdeModule.PARAM_LENIENT) boolean lenient) {
this.taskEnqueuer = taskEnqueuer;
this.taskQueueUtils = taskQueueUtils;
this.lockHandler = lockHandler;
this.gcsBufferSize = gcsBufferSize;
this.bucket = bucket;
@ -248,11 +248,11 @@ public final class RdeStagingReducer extends Reducer<PendingDeposit, DepositFrag
"Rolled forward %s on %s cursor to %s", key.cursor(), tld, newPosition);
RdeRevision.saveRevision(tld, watermark, mode, revision);
if (mode == RdeMode.FULL) {
taskEnqueuer.enqueue(
taskQueueUtils.enqueue(
getQueue("rde-upload"),
withUrl(RdeUploadAction.PATH).param(RequestParameters.PARAM_TLD, tld));
} else {
taskEnqueuer.enqueue(
taskQueueUtils.enqueue(
getQueue("brda"),
withUrl(BrdaCopyAction.PATH)
.param(RequestParameters.PARAM_TLD, tld)

View file

@ -52,7 +52,7 @@ import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import google.registry.util.FormattingLogger;
import google.registry.util.Retrier;
import google.registry.util.TaskEnqueuer;
import google.registry.util.TaskQueueUtils;
import google.registry.util.TeeOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -108,7 +108,7 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
@Inject RydePgpFileOutputStreamFactory pgpFileFactory;
@Inject RydePgpSigningOutputStreamFactory pgpSigningFactory;
@Inject RydeTarOutputStreamFactory tarFactory;
@Inject TaskEnqueuer taskEnqueuer;
@Inject TaskQueueUtils taskQueueUtils;
@Inject Retrier retrier;
@Inject @Parameter(RequestParameters.PARAM_TLD) String tld;
@Inject @Config("rdeBucket") String bucket;
@ -125,7 +125,7 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
@Override
public void run() {
runner.lockRunAndRollForward(this, Registry.get(tld), timeout, CursorType.RDE_UPLOAD, interval);
taskEnqueuer.enqueue(
taskQueueUtils.enqueue(
reportQueue,
withUrl(RdeReportAction.PATH).param(RequestParameters.PARAM_TLD, tld));
}