diff --git a/core/src/main/java/google/registry/beam/rde/RdeIO.java b/core/src/main/java/google/registry/beam/rde/RdeIO.java index 41823125f..067846386 100644 --- a/core/src/main/java/google/registry/beam/rde/RdeIO.java +++ b/core/src/main/java/google/registry/beam/rde/RdeIO.java @@ -68,6 +68,7 @@ import org.apache.beam.sdk.values.PDone; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.openpgp.PGPPublicKey; import org.joda.time.DateTime; +import org.joda.time.Duration; public class RdeIO { @@ -261,6 +262,7 @@ public class RdeIO { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private static final long serialVersionUID = 5822176227753327224L; + private static final Duration ENQUEUE_DELAY = Duration.standardMinutes(1); private final CloudTasksUtils cloudTasksUtils; @@ -271,6 +273,7 @@ public class RdeIO { @ProcessElement public void processElement( @Element KV input, PipelineOptions options) { + tm().transact( () -> { PendingDeposit key = input.getKey(); @@ -296,26 +299,30 @@ public class RdeIO { tm().put(Cursor.create(key.cursor(), newPosition, registry)); logger.atInfo().log( "Rolled forward %s on %s cursor to %s.", key.cursor(), key.tld(), newPosition); - RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), revision); + RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), input.getValue()); // Enqueueing a task is a side effect that is not undone if the transaction rolls // back. So this may result in multiple copies of the same task being processed. // This is fine because the RdeUploadAction is guarded by a lock and tracks progress - // by cursor. The BrdaCopyAction writes a file to GCS, which is an atomic action. + // by cursor. The BrdaCopyAction writes a file to GCS, which is an atomic action. It + // is also guarded by a cursor to not run before the cursor is updated. We also + // include a delay to minimize the chance that the enqueued job executes before the + // transaction is committed, which triggers a retry. if (key.mode() == RdeMode.FULL) { cloudTasksUtils.enqueue( RDE_UPLOAD_QUEUE, - cloudTasksUtils.createPostTask( + cloudTasksUtils.createPostTaskWithDelay( RdeUploadAction.PATH, Service.BACKEND.getServiceId(), ImmutableMultimap.of( RequestParameters.PARAM_TLD, key.tld(), RdeModule.PARAM_PREFIX, - options.getJobName() + '/'))); + options.getJobName() + '/'), + ENQUEUE_DELAY)); } else { cloudTasksUtils.enqueue( BRDA_QUEUE, - cloudTasksUtils.createPostTask( + cloudTasksUtils.createPostTaskWithDelay( BrdaCopyAction.PATH, Service.BACKEND.getServiceId(), ImmutableMultimap.of( @@ -324,7 +331,8 @@ public class RdeIO { RdeModule.PARAM_WATERMARK, key.watermark().toString(), RdeModule.PARAM_PREFIX, - options.getJobName() + '/'))); + options.getJobName() + '/'), + ENQUEUE_DELAY)); } }); } diff --git a/core/src/main/java/google/registry/rde/BrdaCopyAction.java b/core/src/main/java/google/registry/rde/BrdaCopyAction.java index cd3164d55..5d2df6f9c 100644 --- a/core/src/main/java/google/registry/rde/BrdaCopyAction.java +++ b/core/src/main/java/google/registry/rde/BrdaCopyAction.java @@ -14,8 +14,13 @@ package google.registry.rde; +import static google.registry.model.common.Cursor.CursorType.BRDA; +import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime; import static google.registry.model.rde.RdeMode.THIN; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.persistence.transaction.TransactionManagerUtil.transactIfJpaTm; import static google.registry.request.Action.Method.POST; +import static google.registry.util.DateTimeUtils.isBeforeOrAt; import com.google.cloud.storage.BlobId; import com.google.common.flogger.FluentLogger; @@ -23,9 +28,11 @@ import com.google.common.io.ByteStreams; import google.registry.config.RegistryConfig.Config; import google.registry.gcs.GcsUtils; import google.registry.keyring.api.KeyModule.Key; +import google.registry.model.common.Cursor; import google.registry.model.rde.RdeNamingUtils; import google.registry.model.rde.RdeRevision; import google.registry.request.Action; +import google.registry.request.HttpException.NoContentException; import google.registry.request.Parameter; import google.registry.request.RequestParameters; import google.registry.request.auth.Auth; @@ -89,6 +96,16 @@ public final class BrdaCopyAction implements Runnable { private void copyAsRyde() throws IOException { // TODO(b/217772483): consider guarding this action with a lock and check if there is work. // Not urgent since file writes on GCS are atomic. + Optional cursor = + transactIfJpaTm(() -> tm().loadByKeyIfPresent(Cursor.createVKey(BRDA, tld))); + DateTime brdaCursorTime = getCursorTimeOrStartOfTime(cursor); + if (isBeforeOrAt(brdaCursorTime, watermark)) { + throw new NoContentException( + String.format( + "Waiting on RdeStagingAction for TLD %s to copy BRDA deposit for %s to GCS; " + + "last BRDA staging completion was before %s", + tld, watermark, brdaCursorTime)); + } int revision = RdeRevision.getCurrentRevision(tld, watermark, THIN) .orElseThrow( diff --git a/core/src/main/java/google/registry/rde/RdeStagingReducer.java b/core/src/main/java/google/registry/rde/RdeStagingReducer.java index 67e96a303..b959a5677 100644 --- a/core/src/main/java/google/registry/rde/RdeStagingReducer.java +++ b/core/src/main/java/google/registry/rde/RdeStagingReducer.java @@ -64,6 +64,7 @@ public final class RdeStagingReducer extends Reducer RdeRevision.saveRevision(tld, watermark, mode, revision)); tm().transact( () -> { Registry registry = Registry.get(tld); @@ -225,29 +234,33 @@ public final class RdeStagingReducer extends Reducer { RdeRevision.saveRevision("lol", DateTime.parse("2010-10-17TZ"), RdeMode.THIN, 0); }); + persistResource(Cursor.create(BRDA, action.watermark.plusDays(1), Registry.get("lol"))); + } + + @ParameterizedTest + @ValueSource(strings = {"", "job-name/"}) + void testRun_stagingNotFinished_throws204(String prefix) throws Exception { + persistResource(Cursor.create(BRDA, action.watermark, Registry.get("lol"))); + NoContentException thrown = assertThrows(NoContentException.class, () -> runAction(prefix)); + assertThat(thrown) + .hasMessageThat() + .isEqualTo( + "Waiting on RdeStagingAction for TLD lol to copy BRDA deposit for" + + " 2010-10-17T00:00:00.000Z to GCS; last BRDA staging completion was before" + + " 2010-10-17T00:00:00.000Z"); } @ParameterizedTest diff --git a/core/src/test/java/google/registry/rde/RdeUploadActionTest.java b/core/src/test/java/google/registry/rde/RdeUploadActionTest.java index bcb40a0c9..83d4d834f 100644 --- a/core/src/test/java/google/registry/rde/RdeUploadActionTest.java +++ b/core/src/test/java/google/registry/rde/RdeUploadActionTest.java @@ -403,7 +403,7 @@ public class RdeUploadActionTest { .hasMessageThat() .isEqualTo( "Waiting on RdeStagingAction for TLD tld to send 2010-10-17T00:00:00.000Z upload; last" - + " RDE staging completion was at 1970-01-01T00:00:00.000Z"); + + " RDE staging completion was before 1970-01-01T00:00:00.000Z"); cloudTasksHelper.assertNoTasksEnqueued("rde-upload"); assertThat(folder.list()).isEmpty(); } @@ -420,7 +420,7 @@ public class RdeUploadActionTest { .hasMessageThat() .isEqualTo( "Waiting on RdeStagingAction for TLD tld to send 2010-10-17T00:00:00.000Z upload; " - + "last RDE staging completion was at 2010-10-17T00:00:00.000Z"); + + "last RDE staging completion was before 2010-10-17T00:00:00.000Z"); } @TestOfyAndSql @@ -437,8 +437,9 @@ public class RdeUploadActionTest { assertThat(thrown) .hasMessageThat() .isEqualTo( - "Waiting on 120 minute SFTP cooldown for TLD tld to send 2010-10-17T00:00:00.000Z " - + "upload; last upload attempt was at 2010-10-16T22:23:00.000Z (97 minutes ago)"); + "Waiting on 120 minute SFTP cooldown for TLD tld to send 2010-10-17T00:00:00.000Z" + + " upload; last upload attempt was at 2010-10-16T22:23:00.000Z (97 minutes" + + " ago)"); } private String slurp(InputStream is) throws IOException {