diff --git a/core/src/main/java/google/registry/beam/rde/RdeIO.java b/core/src/main/java/google/registry/beam/rde/RdeIO.java index 240ddd88d..2a6e616f6 100644 --- a/core/src/main/java/google/registry/beam/rde/RdeIO.java +++ b/core/src/main/java/google/registry/beam/rde/RdeIO.java @@ -297,6 +297,10 @@ public class RdeIO { logger.atInfo().log( "Rolled forward %s on %s cursor to %s.", key.cursor(), key.tld(), newPosition); RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), revision); + // Enqueueing a task is a side effect that is not undone if the transaction rolls + // back. So this may result in multiple copies of the same task being processed. + // This is fine because the RdeUploadAction is guarded by a lock and tracks progress + // by cursor. The BrdaCopyAction writes a file to GCS, which is an atomic action. if (key.mode() == RdeMode.FULL) { cloudTasksUtils.enqueue( RDE_UPLOAD_QUEUE, diff --git a/core/src/main/java/google/registry/rde/BrdaCopyAction.java b/core/src/main/java/google/registry/rde/BrdaCopyAction.java index 8ec3495d0..cd3164d55 100644 --- a/core/src/main/java/google/registry/rde/BrdaCopyAction.java +++ b/core/src/main/java/google/registry/rde/BrdaCopyAction.java @@ -87,6 +87,8 @@ public final class BrdaCopyAction implements Runnable { } private void copyAsRyde() throws IOException { + // TODO(b/217772483): consider guarding this action with a lock and check if there is work. + // Not urgent since file writes on GCS are atomic. int revision = RdeRevision.getCurrentRevision(tld, watermark, THIN) .orElseThrow( diff --git a/core/src/main/java/google/registry/rde/RdeStagingReducer.java b/core/src/main/java/google/registry/rde/RdeStagingReducer.java index c36cfbb83..96a8acba1 100644 --- a/core/src/main/java/google/registry/rde/RdeStagingReducer.java +++ b/core/src/main/java/google/registry/rde/RdeStagingReducer.java @@ -226,6 +226,10 @@ public final class RdeStagingReducer extends Reducer alreadyExtracted = new ArrayList<>(); @@ -465,6 +473,8 @@ public class RdeStagingActionDatastoreTest extends MapreduceTestCase