mirror of
https://github.com/google/nomulus.git
synced 2025-07-02 01:03:33 +02:00
Fix flaky RdeStagingActionDatastoreTest (#1514)
* Fix flaky RdeStagingActionDatastoreTest Fixed the most common cause that makes one method flaky (Clock and timestamp problem). Added a TODO to rethink test case. Also added notes on tasks potentially enqueued multiple times.
This commit is contained in:
parent
98ba687005
commit
201d2ef95b
4 changed files with 21 additions and 1 deletions
|
@ -297,6 +297,10 @@ public class RdeIO {
|
||||||
logger.atInfo().log(
|
logger.atInfo().log(
|
||||||
"Rolled forward %s on %s cursor to %s.", key.cursor(), key.tld(), newPosition);
|
"Rolled forward %s on %s cursor to %s.", key.cursor(), key.tld(), newPosition);
|
||||||
RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), revision);
|
RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), revision);
|
||||||
|
// Enqueueing a task is a side effect that is not undone if the transaction rolls
|
||||||
|
// back. So this may result in multiple copies of the same task being processed.
|
||||||
|
// This is fine because the RdeUploadAction is guarded by a lock and tracks progress
|
||||||
|
// by cursor. The BrdaCopyAction writes a file to GCS, which is an atomic action.
|
||||||
if (key.mode() == RdeMode.FULL) {
|
if (key.mode() == RdeMode.FULL) {
|
||||||
cloudTasksUtils.enqueue(
|
cloudTasksUtils.enqueue(
|
||||||
RDE_UPLOAD_QUEUE,
|
RDE_UPLOAD_QUEUE,
|
||||||
|
|
|
@ -87,6 +87,8 @@ public final class BrdaCopyAction implements Runnable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void copyAsRyde() throws IOException {
|
private void copyAsRyde() throws IOException {
|
||||||
|
// TODO(b/217772483): consider guarding this action with a lock and check if there is work.
|
||||||
|
// Not urgent since file writes on GCS are atomic.
|
||||||
int revision =
|
int revision =
|
||||||
RdeRevision.getCurrentRevision(tld, watermark, THIN)
|
RdeRevision.getCurrentRevision(tld, watermark, THIN)
|
||||||
.orElseThrow(
|
.orElseThrow(
|
||||||
|
|
|
@ -226,6 +226,10 @@ public final class RdeStagingReducer extends Reducer<PendingDeposit, DepositFrag
|
||||||
logger.atInfo().log(
|
logger.atInfo().log(
|
||||||
"Rolled forward %s on %s cursor to %s.", key.cursor(), tld, newPosition);
|
"Rolled forward %s on %s cursor to %s.", key.cursor(), tld, newPosition);
|
||||||
RdeRevision.saveRevision(tld, watermark, mode, revision);
|
RdeRevision.saveRevision(tld, watermark, mode, revision);
|
||||||
|
// Enqueueing a task is a side effect that is not undone if the transaction rolls
|
||||||
|
// back. So this may result in multiple copies of the same task being processed. This
|
||||||
|
// is fine because the RdeUploadAction is guarded by a lock and tracks progress by
|
||||||
|
// cursor. The BrdaCopyAction writes a file to GCS, which is an atomic action.
|
||||||
if (mode == RdeMode.FULL) {
|
if (mode == RdeMode.FULL) {
|
||||||
cloudTasksUtils.enqueue(
|
cloudTasksUtils.enqueue(
|
||||||
"rde-upload",
|
"rde-upload",
|
||||||
|
|
|
@ -97,7 +97,15 @@ public class RdeStagingActionDatastoreTest extends MapreduceTestCase<RdeStagingA
|
||||||
|
|
||||||
@RegisterExtension public final InjectExtension inject = new InjectExtension();
|
@RegisterExtension public final InjectExtension inject = new InjectExtension();
|
||||||
|
|
||||||
private final FakeClock clock = new FakeClock();
|
/**
|
||||||
|
* Without autoIncrement mode, the fake clock won't advance between Mapper and Reducer
|
||||||
|
* transactions when action is invoked, resulting in rolled back reducer transaction (due to
|
||||||
|
* TimestampInversionException if both transactions are mapped to the same CommitLog bucket) and
|
||||||
|
* multiple RdeUplaod/BrdaCopy tasks being enqueued (due to transaction retries, since Cloud Tasks
|
||||||
|
* enqueuing is not transactional with Datastore transactions).
|
||||||
|
*/
|
||||||
|
private final FakeClock clock = new FakeClock().setAutoIncrementByOneMilli();
|
||||||
|
|
||||||
private final FakeResponse response = new FakeResponse();
|
private final FakeResponse response = new FakeResponse();
|
||||||
private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions());
|
private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions());
|
||||||
private final List<? super XjcRdeContentType> alreadyExtracted = new ArrayList<>();
|
private final List<? super XjcRdeContentType> alreadyExtracted = new ArrayList<>();
|
||||||
|
@ -465,6 +473,8 @@ public class RdeStagingActionDatastoreTest extends MapreduceTestCase<RdeStagingA
|
||||||
clock.setTo(DateTime.parse("2000-01-04TZ")); // Tuesday
|
clock.setTo(DateTime.parse("2000-01-04TZ")); // Tuesday
|
||||||
action.run();
|
action.run();
|
||||||
executeTasksUntilEmpty("mapreduce", clock);
|
executeTasksUntilEmpty("mapreduce", clock);
|
||||||
|
// TODO(b/217773051): duplicate tasks are possible though unlikely. Consider if below calls are
|
||||||
|
// appropriate since they don't allow duplicates.
|
||||||
cloudTasksHelper.assertTasksEnqueued(
|
cloudTasksHelper.assertTasksEnqueued(
|
||||||
"rde-upload",
|
"rde-upload",
|
||||||
new TaskMatcher().url(RdeUploadAction.PATH).param(RequestParameters.PARAM_TLD, "lol"));
|
new TaskMatcher().url(RdeUploadAction.PATH).param(RequestParameters.PARAM_TLD, "lol"));
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue