From 917e8d5a1ed80a4181c40af00a7930f9a21acd97 Mon Sep 17 00:00:00 2001 From: Lai Jiang Date: Thu, 17 Mar 2022 16:32:07 -0400 Subject: [PATCH] Fix a subtle issue in BRDA copy caused by Cloud Tasks (#1556) * Fix a subtle issue in BRDA copy caused by Cloud Tasks After the Cloud Tasks migration and #1508, the BRDA copy job now routinely fail on the first try because the revision update is not commited by the time the Cloud Tasks job enqueued in the same transaction runs for the first time. This is because the enqueueing is a side effect and not part of the transaction. The job eventually succeeds because of retries. This PR attempts to mitigate the initial failure by adding a delay to the enqueued job, and checking the cursor in the job itself to prevent it from running before the transaction is commited. --- .../java/google/registry/beam/rde/RdeIO.java | 20 +++++++++---- .../google/registry/rde/BrdaCopyAction.java | 17 +++++++++++ .../registry/rde/RdeStagingReducer.java | 29 ++++++++++++++----- .../google/registry/rde/RdeUploadAction.java | 2 +- .../registry/rde/BrdaCopyActionTest.java | 22 ++++++++++++++ .../registry/rde/RdeUploadActionTest.java | 9 +++--- 6 files changed, 80 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/google/registry/beam/rde/RdeIO.java b/core/src/main/java/google/registry/beam/rde/RdeIO.java index 41823125f..067846386 100644 --- a/core/src/main/java/google/registry/beam/rde/RdeIO.java +++ b/core/src/main/java/google/registry/beam/rde/RdeIO.java @@ -68,6 +68,7 @@ import org.apache.beam.sdk.values.PDone; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.openpgp.PGPPublicKey; import org.joda.time.DateTime; +import org.joda.time.Duration; public class RdeIO { @@ -261,6 +262,7 @@ public class RdeIO { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private static final long serialVersionUID = 5822176227753327224L; + private static final Duration ENQUEUE_DELAY = Duration.standardMinutes(1); private final CloudTasksUtils cloudTasksUtils; @@ -271,6 +273,7 @@ public class RdeIO { @ProcessElement public void processElement( @Element KV input, PipelineOptions options) { + tm().transact( () -> { PendingDeposit key = input.getKey(); @@ -296,26 +299,30 @@ public class RdeIO { tm().put(Cursor.create(key.cursor(), newPosition, registry)); logger.atInfo().log( "Rolled forward %s on %s cursor to %s.", key.cursor(), key.tld(), newPosition); - RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), revision); + RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), input.getValue()); // Enqueueing a task is a side effect that is not undone if the transaction rolls // back. So this may result in multiple copies of the same task being processed. // This is fine because the RdeUploadAction is guarded by a lock and tracks progress - // by cursor. The BrdaCopyAction writes a file to GCS, which is an atomic action. + // by cursor. The BrdaCopyAction writes a file to GCS, which is an atomic action. It + // is also guarded by a cursor to not run before the cursor is updated. We also + // include a delay to minimize the chance that the enqueued job executes before the + // transaction is committed, which triggers a retry. if (key.mode() == RdeMode.FULL) { cloudTasksUtils.enqueue( RDE_UPLOAD_QUEUE, - cloudTasksUtils.createPostTask( + cloudTasksUtils.createPostTaskWithDelay( RdeUploadAction.PATH, Service.BACKEND.getServiceId(), ImmutableMultimap.of( RequestParameters.PARAM_TLD, key.tld(), RdeModule.PARAM_PREFIX, - options.getJobName() + '/'))); + options.getJobName() + '/'), + ENQUEUE_DELAY)); } else { cloudTasksUtils.enqueue( BRDA_QUEUE, - cloudTasksUtils.createPostTask( + cloudTasksUtils.createPostTaskWithDelay( BrdaCopyAction.PATH, Service.BACKEND.getServiceId(), ImmutableMultimap.of( @@ -324,7 +331,8 @@ public class RdeIO { RdeModule.PARAM_WATERMARK, key.watermark().toString(), RdeModule.PARAM_PREFIX, - options.getJobName() + '/'))); + options.getJobName() + '/'), + ENQUEUE_DELAY)); } }); } diff --git a/core/src/main/java/google/registry/rde/BrdaCopyAction.java b/core/src/main/java/google/registry/rde/BrdaCopyAction.java index cd3164d55..5d2df6f9c 100644 --- a/core/src/main/java/google/registry/rde/BrdaCopyAction.java +++ b/core/src/main/java/google/registry/rde/BrdaCopyAction.java @@ -14,8 +14,13 @@ package google.registry.rde; +import static google.registry.model.common.Cursor.CursorType.BRDA; +import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime; import static google.registry.model.rde.RdeMode.THIN; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.persistence.transaction.TransactionManagerUtil.transactIfJpaTm; import static google.registry.request.Action.Method.POST; +import static google.registry.util.DateTimeUtils.isBeforeOrAt; import com.google.cloud.storage.BlobId; import com.google.common.flogger.FluentLogger; @@ -23,9 +28,11 @@ import com.google.common.io.ByteStreams; import google.registry.config.RegistryConfig.Config; import google.registry.gcs.GcsUtils; import google.registry.keyring.api.KeyModule.Key; +import google.registry.model.common.Cursor; import google.registry.model.rde.RdeNamingUtils; import google.registry.model.rde.RdeRevision; import google.registry.request.Action; +import google.registry.request.HttpException.NoContentException; import google.registry.request.Parameter; import google.registry.request.RequestParameters; import google.registry.request.auth.Auth; @@ -89,6 +96,16 @@ public final class BrdaCopyAction implements Runnable { private void copyAsRyde() throws IOException { // TODO(b/217772483): consider guarding this action with a lock and check if there is work. // Not urgent since file writes on GCS are atomic. + Optional cursor = + transactIfJpaTm(() -> tm().loadByKeyIfPresent(Cursor.createVKey(BRDA, tld))); + DateTime brdaCursorTime = getCursorTimeOrStartOfTime(cursor); + if (isBeforeOrAt(brdaCursorTime, watermark)) { + throw new NoContentException( + String.format( + "Waiting on RdeStagingAction for TLD %s to copy BRDA deposit for %s to GCS; " + + "last BRDA staging completion was before %s", + tld, watermark, brdaCursorTime)); + } int revision = RdeRevision.getCurrentRevision(tld, watermark, THIN) .orElseThrow( diff --git a/core/src/main/java/google/registry/rde/RdeStagingReducer.java b/core/src/main/java/google/registry/rde/RdeStagingReducer.java index 67e96a303..b959a5677 100644 --- a/core/src/main/java/google/registry/rde/RdeStagingReducer.java +++ b/core/src/main/java/google/registry/rde/RdeStagingReducer.java @@ -64,6 +64,7 @@ public final class RdeStagingReducer extends Reducer RdeRevision.saveRevision(tld, watermark, mode, revision)); tm().transact( () -> { Registry registry = Registry.get(tld); @@ -225,29 +234,33 @@ public final class RdeStagingReducer extends Reducer { RdeRevision.saveRevision("lol", DateTime.parse("2010-10-17TZ"), RdeMode.THIN, 0); }); + persistResource(Cursor.create(BRDA, action.watermark.plusDays(1), Registry.get("lol"))); + } + + @ParameterizedTest + @ValueSource(strings = {"", "job-name/"}) + void testRun_stagingNotFinished_throws204(String prefix) throws Exception { + persistResource(Cursor.create(BRDA, action.watermark, Registry.get("lol"))); + NoContentException thrown = assertThrows(NoContentException.class, () -> runAction(prefix)); + assertThat(thrown) + .hasMessageThat() + .isEqualTo( + "Waiting on RdeStagingAction for TLD lol to copy BRDA deposit for" + + " 2010-10-17T00:00:00.000Z to GCS; last BRDA staging completion was before" + + " 2010-10-17T00:00:00.000Z"); } @ParameterizedTest diff --git a/core/src/test/java/google/registry/rde/RdeUploadActionTest.java b/core/src/test/java/google/registry/rde/RdeUploadActionTest.java index bcb40a0c9..83d4d834f 100644 --- a/core/src/test/java/google/registry/rde/RdeUploadActionTest.java +++ b/core/src/test/java/google/registry/rde/RdeUploadActionTest.java @@ -403,7 +403,7 @@ public class RdeUploadActionTest { .hasMessageThat() .isEqualTo( "Waiting on RdeStagingAction for TLD tld to send 2010-10-17T00:00:00.000Z upload; last" - + " RDE staging completion was at 1970-01-01T00:00:00.000Z"); + + " RDE staging completion was before 1970-01-01T00:00:00.000Z"); cloudTasksHelper.assertNoTasksEnqueued("rde-upload"); assertThat(folder.list()).isEmpty(); } @@ -420,7 +420,7 @@ public class RdeUploadActionTest { .hasMessageThat() .isEqualTo( "Waiting on RdeStagingAction for TLD tld to send 2010-10-17T00:00:00.000Z upload; " - + "last RDE staging completion was at 2010-10-17T00:00:00.000Z"); + + "last RDE staging completion was before 2010-10-17T00:00:00.000Z"); } @TestOfyAndSql @@ -437,8 +437,9 @@ public class RdeUploadActionTest { assertThat(thrown) .hasMessageThat() .isEqualTo( - "Waiting on 120 minute SFTP cooldown for TLD tld to send 2010-10-17T00:00:00.000Z " - + "upload; last upload attempt was at 2010-10-16T22:23:00.000Z (97 minutes ago)"); + "Waiting on 120 minute SFTP cooldown for TLD tld to send 2010-10-17T00:00:00.000Z" + + " upload; last upload attempt was at 2010-10-16T22:23:00.000Z (97 minutes" + + " ago)"); } private String slurp(InputStream is) throws IOException {