mirror of
https://github.com/google/nomulus.git
synced 2025-07-02 17:23:32 +02:00
Add a a delay to task in CommitLogCheckpointAction (#1554)
* Add delay to task
This commit is contained in:
parent
d418008d27
commit
74b7ec052f
2 changed files with 25 additions and 8 deletions
|
@ -28,11 +28,11 @@ import google.registry.model.ofy.CommitLogCheckpointRoot;
|
||||||
import google.registry.request.Action;
|
import google.registry.request.Action;
|
||||||
import google.registry.request.Action.Service;
|
import google.registry.request.Action.Service;
|
||||||
import google.registry.request.auth.Auth;
|
import google.registry.request.auth.Auth;
|
||||||
import google.registry.util.Clock;
|
|
||||||
import google.registry.util.CloudTasksUtils;
|
import google.registry.util.CloudTasksUtils;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
import org.joda.time.Duration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Action that saves commit log checkpoints to Datastore and kicks off a diff export task.
|
* Action that saves commit log checkpoints to Datastore and kicks off a diff export task.
|
||||||
|
@ -57,7 +57,22 @@ public final class CommitLogCheckpointAction implements Runnable {
|
||||||
|
|
||||||
private static final String QUEUE_NAME = "export-commits";
|
private static final String QUEUE_NAME = "export-commits";
|
||||||
|
|
||||||
@Inject Clock clock;
|
/**
|
||||||
|
* The amount of time enqueueing should be delayed.
|
||||||
|
*
|
||||||
|
* <p>The {@link ExportCommitLogDiffAction} is enqueued in {@link CommitLogCheckpointAction},
|
||||||
|
* which is inside a Datastore transaction that persists the checkpoint to be exported. After the
|
||||||
|
* switch to CloudTasks API, the task may be invoked before the Datastore transaction commits.
|
||||||
|
* When this happens, the checkpoint is not found which leads to {@link
|
||||||
|
* com.google.common.base.VerifyException}.
|
||||||
|
*
|
||||||
|
* <p>In order to invoke the task after the transaction commits, a reasonable delay should be
|
||||||
|
* added to each task. The latency of the request is mostly in the range of 4-6 seconds; Choosing
|
||||||
|
* a value 30% greater than the upper bound should solve the issue invoking a task before the
|
||||||
|
* transaction commits.
|
||||||
|
*/
|
||||||
|
static final Duration ENQUEUE_DELAY_SECONDS = Duration.standardSeconds(8);
|
||||||
|
|
||||||
@Inject CommitLogCheckpointStrategy strategy;
|
@Inject CommitLogCheckpointStrategy strategy;
|
||||||
@Inject CloudTasksUtils cloudTasksUtils;
|
@Inject CloudTasksUtils cloudTasksUtils;
|
||||||
|
|
||||||
|
@ -96,14 +111,15 @@ public final class CommitLogCheckpointAction implements Runnable {
|
||||||
// Enqueue a diff task between previous and current checkpoints.
|
// Enqueue a diff task between previous and current checkpoints.
|
||||||
cloudTasksUtils.enqueue(
|
cloudTasksUtils.enqueue(
|
||||||
QUEUE_NAME,
|
QUEUE_NAME,
|
||||||
cloudTasksUtils.createPostTask(
|
cloudTasksUtils.createPostTaskWithDelay(
|
||||||
ExportCommitLogDiffAction.PATH,
|
ExportCommitLogDiffAction.PATH,
|
||||||
Service.BACKEND.toString(),
|
Service.BACKEND.toString(),
|
||||||
ImmutableMultimap.of(
|
ImmutableMultimap.of(
|
||||||
LOWER_CHECKPOINT_TIME_PARAM,
|
LOWER_CHECKPOINT_TIME_PARAM,
|
||||||
lastWrittenTime.toString(),
|
lastWrittenTime.toString(),
|
||||||
UPPER_CHECKPOINT_TIME_PARAM,
|
UPPER_CHECKPOINT_TIME_PARAM,
|
||||||
checkpoint.getCheckpointTime().toString())));
|
checkpoint.getCheckpointTime().toString()),
|
||||||
|
ENQUEUE_DELAY_SECONDS));
|
||||||
return true;
|
return true;
|
||||||
});
|
});
|
||||||
return isCheckPointPersisted ? Optional.of(checkpoint) : Optional.empty();
|
return isCheckPointPersisted ? Optional.of(checkpoint) : Optional.empty();
|
||||||
|
|
|
@ -47,11 +47,10 @@ public class CommitLogCheckpointActionTest {
|
||||||
|
|
||||||
private DateTime now = DateTime.now(UTC);
|
private DateTime now = DateTime.now(UTC);
|
||||||
private CommitLogCheckpointAction task = new CommitLogCheckpointAction();
|
private CommitLogCheckpointAction task = new CommitLogCheckpointAction();
|
||||||
private final CloudTasksHelper cloudTasksHelper = new CloudTasksHelper();
|
private final CloudTasksHelper cloudTasksHelper = new CloudTasksHelper(new FakeClock(now));
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void beforeEach() {
|
void beforeEach() {
|
||||||
task.clock = new FakeClock(now);
|
|
||||||
task.strategy = strategy;
|
task.strategy = strategy;
|
||||||
task.cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils();
|
task.cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils();
|
||||||
when(strategy.computeCheckpoint())
|
when(strategy.computeCheckpoint())
|
||||||
|
@ -68,7 +67,8 @@ public class CommitLogCheckpointActionTest {
|
||||||
new TaskMatcher()
|
new TaskMatcher()
|
||||||
.url(ExportCommitLogDiffAction.PATH)
|
.url(ExportCommitLogDiffAction.PATH)
|
||||||
.param(ExportCommitLogDiffAction.LOWER_CHECKPOINT_TIME_PARAM, START_OF_TIME.toString())
|
.param(ExportCommitLogDiffAction.LOWER_CHECKPOINT_TIME_PARAM, START_OF_TIME.toString())
|
||||||
.param(ExportCommitLogDiffAction.UPPER_CHECKPOINT_TIME_PARAM, now.toString()));
|
.param(ExportCommitLogDiffAction.UPPER_CHECKPOINT_TIME_PARAM, now.toString())
|
||||||
|
.scheduleTime(now.plus(CommitLogCheckpointAction.ENQUEUE_DELAY_SECONDS)));
|
||||||
assertThat(loadRoot().getLastWrittenTime()).isEqualTo(now);
|
assertThat(loadRoot().getLastWrittenTime()).isEqualTo(now);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,7 +82,8 @@ public class CommitLogCheckpointActionTest {
|
||||||
new TaskMatcher()
|
new TaskMatcher()
|
||||||
.url(ExportCommitLogDiffAction.PATH)
|
.url(ExportCommitLogDiffAction.PATH)
|
||||||
.param(ExportCommitLogDiffAction.LOWER_CHECKPOINT_TIME_PARAM, oneMinuteAgo.toString())
|
.param(ExportCommitLogDiffAction.LOWER_CHECKPOINT_TIME_PARAM, oneMinuteAgo.toString())
|
||||||
.param(ExportCommitLogDiffAction.UPPER_CHECKPOINT_TIME_PARAM, now.toString()));
|
.param(ExportCommitLogDiffAction.UPPER_CHECKPOINT_TIME_PARAM, now.toString())
|
||||||
|
.scheduleTime(now.plus(CommitLogCheckpointAction.ENQUEUE_DELAY_SECONDS)));
|
||||||
assertThat(loadRoot().getLastWrittenTime()).isEqualTo(now);
|
assertThat(loadRoot().getLastWrittenTime()).isEqualTo(now);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue