From 201d2ef95b2be414ff8bc8e7bf7c68d214b06778 Mon Sep 17 00:00:00 2001 From: Weimin Yu Date: Fri, 4 Feb 2022 10:40:52 -0500 Subject: [PATCH] Fix flaky RdeStagingActionDatastoreTest (#1514) * Fix flaky RdeStagingActionDatastoreTest Fixed the most common cause that makes one method flaky (Clock and timestamp problem). Added a TODO to rethink test case. Also added notes on tasks potentially enqueued multiple times. --- .../main/java/google/registry/beam/rde/RdeIO.java | 4 ++++ .../java/google/registry/rde/BrdaCopyAction.java | 2 ++ .../java/google/registry/rde/RdeStagingReducer.java | 4 ++++ .../registry/rde/RdeStagingActionDatastoreTest.java | 12 +++++++++++- 4 files changed, 21 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/google/registry/beam/rde/RdeIO.java b/core/src/main/java/google/registry/beam/rde/RdeIO.java index 240ddd88d..2a6e616f6 100644 --- a/core/src/main/java/google/registry/beam/rde/RdeIO.java +++ b/core/src/main/java/google/registry/beam/rde/RdeIO.java @@ -297,6 +297,10 @@ public class RdeIO { logger.atInfo().log( "Rolled forward %s on %s cursor to %s.", key.cursor(), key.tld(), newPosition); RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), revision); + // Enqueueing a task is a side effect that is not undone if the transaction rolls + // back. So this may result in multiple copies of the same task being processed. + // This is fine because the RdeUploadAction is guarded by a lock and tracks progress + // by cursor. The BrdaCopyAction writes a file to GCS, which is an atomic action. if (key.mode() == RdeMode.FULL) { cloudTasksUtils.enqueue( RDE_UPLOAD_QUEUE, diff --git a/core/src/main/java/google/registry/rde/BrdaCopyAction.java b/core/src/main/java/google/registry/rde/BrdaCopyAction.java index 8ec3495d0..cd3164d55 100644 --- a/core/src/main/java/google/registry/rde/BrdaCopyAction.java +++ b/core/src/main/java/google/registry/rde/BrdaCopyAction.java @@ -87,6 +87,8 @@ public final class BrdaCopyAction implements Runnable { } private void copyAsRyde() throws IOException { + // TODO(b/217772483): consider guarding this action with a lock and check if there is work. + // Not urgent since file writes on GCS are atomic. int revision = RdeRevision.getCurrentRevision(tld, watermark, THIN) .orElseThrow( diff --git a/core/src/main/java/google/registry/rde/RdeStagingReducer.java b/core/src/main/java/google/registry/rde/RdeStagingReducer.java index c36cfbb83..96a8acba1 100644 --- a/core/src/main/java/google/registry/rde/RdeStagingReducer.java +++ b/core/src/main/java/google/registry/rde/RdeStagingReducer.java @@ -226,6 +226,10 @@ public final class RdeStagingReducer extends Reducer alreadyExtracted = new ArrayList<>(); @@ -465,6 +473,8 @@ public class RdeStagingActionDatastoreTest extends MapreduceTestCase