diff --git a/core/src/main/java/google/registry/rde/RdeUploadAction.java b/core/src/main/java/google/registry/rde/RdeUploadAction.java index ddf255129..c2c585190 100644 --- a/core/src/main/java/google/registry/rde/RdeUploadAction.java +++ b/core/src/main/java/google/registry/rde/RdeUploadAction.java @@ -32,6 +32,7 @@ import static java.util.Arrays.asList; import com.google.cloud.storage.BlobId; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.HashMultimap; +import com.google.common.collect.Ordering; import com.google.common.flogger.FluentLogger; import com.google.common.io.ByteStreams; import com.jcraft.jsch.JSch; @@ -122,7 +123,6 @@ public final class RdeUploadAction implements Runnable, EscrowTask { @Inject @Key("rdeSigningKey") PGPKeyPair signingKey; @Inject @Key("rdeStagingDecryptionKey") PGPPrivateKey stagingDecryptionKey; @Inject RdeUploadAction() {} - @Override public void run() { logger.atInfo().log("Attempting to acquire RDE upload lock for TLD '%s'.", tld); @@ -140,6 +140,27 @@ public final class RdeUploadAction implements Runnable, EscrowTask { @Override public void runWithLock(final DateTime watermark) throws Exception { + // If a prefix is not provided, but we are in SQL mode, try to determine the prefix. This should + // only happen when the RDE upload cron job runs to catch up any un-retried (i. e. expected) + // RDE failures. + if (prefix.isEmpty() && !tm().isOfy()) { + // The prefix is always in the format of: rde-2022-02-21t00-00-00z-2022-02-21t00-07-33z, where + // the first datetime is the watermark and the second one is the time when the RDE beam job + // launched. We search for the latest folder that starts with "rde-[watermark]". + String partialPrefix = + String.format("rde-%s", watermark.toString("yyyy-MM-dd't'HH-mm-ss'z'")); + String latestFilenameSuffix = + gcsUtils.listFolderObjects(bucket, partialPrefix).stream() + .max(Ordering.natural()) + .orElse(null); + if (latestFilenameSuffix == null) { + throw new NoContentException( + String.format("RDE deposit for TLD %s on %s does not exist", tld, watermark)); + } + int firstSlashPosition = latestFilenameSuffix.indexOf('/'); + prefix = + Optional.of(partialPrefix + latestFilenameSuffix.substring(0, firstSlashPosition + 1)); + } logger.atInfo().log("Verifying readiness to upload the RDE deposit."); Optional cursor = transactIfJpaTm(() -> tm().loadByKeyIfPresent(Cursor.createVKey(RDE_STAGING, tld))); @@ -241,9 +262,9 @@ public final class RdeUploadAction implements Runnable, EscrowTask { .setSignatureOutput(sigOut, signingKey) .setFileMetadata(nameWithoutPrefix, xmlLength, watermark) .build()) { - long bytesCopied = ByteStreams.copy(ghostrydeDecoder, rydeEncoder); + long bytesCopied = ByteStreams.copy(ghostrydeDecoder, rydeEncoder); logger.atInfo().log("Uploaded %,d bytes to path '%s'.", bytesCopied, rydeFilename); - } + } String sigFilename = nameWithoutPrefix + ".sig"; BlobId sigGcsFilename = BlobId.of(bucket, name + ".sig"); byte[] signature = sigOut.toByteArray(); diff --git a/core/src/test/java/google/registry/rde/RdeUploadActionTest.java b/core/src/test/java/google/registry/rde/RdeUploadActionTest.java index 83638cef8..8d6dc520a 100644 --- a/core/src/test/java/google/registry/rde/RdeUploadActionTest.java +++ b/core/src/test/java/google/registry/rde/RdeUploadActionTest.java @@ -69,6 +69,8 @@ import google.registry.testing.FakeSleeper; import google.registry.testing.GpgSystemCommandExtension; import google.registry.testing.Lazies; import google.registry.testing.TestOfyAndSql; +import google.registry.testing.TestOfyOnly; +import google.registry.testing.TestSqlOnly; import google.registry.testing.sftp.SftpServerExtension; import google.registry.util.Retrier; import java.io.File; @@ -91,6 +93,7 @@ public class RdeUploadActionTest { private static final ByteSource REPORT_XML = RdeTestData.loadBytes("report.xml"); private static final ByteSource DEPOSIT_XML = RdeTestData.loadBytes("deposit_full.xml"); + private static final String JOB_PREFIX = "rde-2010-10-17t00-00-00z"; private static final BlobId GHOSTRYDE_FILE = BlobId.of("bucket", "tld_2010-10-17_full_S1_R0.xml.ghostryde"); @@ -99,11 +102,11 @@ public class RdeUploadActionTest { private static final BlobId REPORT_FILE = BlobId.of("bucket", "tld_2010-10-17_full_S1_R0-report.xml.ghostryde"); private static final BlobId GHOSTRYDE_FILE_WITH_PREFIX = - BlobId.of("bucket", "job-name/tld_2010-10-17_full_S1_R0.xml.ghostryde"); + BlobId.of("bucket", JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R0.xml.ghostryde"); private static final BlobId LENGTH_FILE_WITH_PREFIX = - BlobId.of("bucket", "job-name/tld_2010-10-17_full_S1_R0.xml.length"); + BlobId.of("bucket", JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R0.xml.length"); private static final BlobId REPORT_FILE_WITH_PREFIX = - BlobId.of("bucket", "job-name/tld_2010-10-17_full_S1_R0-report.xml.ghostryde"); + BlobId.of("bucket", JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R0-report.xml.ghostryde"); private static final BlobId GHOSTRYDE_R1_FILE = BlobId.of("bucket", "tld_2010-10-17_full_S1_R1.xml.ghostryde"); @@ -181,7 +184,6 @@ public class RdeUploadActionTest { return jschSpy; } - @BeforeEach void beforeEach() throws Exception { // Force "development" mode so we don't try to really connect to GCS. @@ -194,6 +196,13 @@ public class RdeUploadActionTest { gcsUtils.createFromBytes(LENGTH_R1_FILE, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8)); gcsUtils.createFromBytes(REPORT_FILE, Ghostryde.encode(REPORT_XML.read(), encryptKey)); gcsUtils.createFromBytes(REPORT_R1_FILE, Ghostryde.encode(REPORT_XML.read(), encryptKey)); + gcsUtils.createFromBytes( + GHOSTRYDE_FILE_WITH_PREFIX, Ghostryde.encode(DEPOSIT_XML.read(), encryptKey)); + gcsUtils.createFromBytes( + LENGTH_FILE_WITH_PREFIX, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8)); + gcsUtils.createFromBytes( + REPORT_FILE_WITH_PREFIX, Ghostryde.encode(REPORT_XML.read(), encryptKey)); + tm().transact( () -> { RdeRevision.saveRevision("lol", DateTime.parse("2010-10-17TZ"), FULL, 0); @@ -284,13 +293,36 @@ public class RdeUploadActionTest { assertThat(thrown).hasMessageThat().contains("The crow flies in square circles."); } - @TestOfyAndSql + @TestSqlOnly + void testRunWithLock_cannotGuessPrefix() throws Exception { + int port = sftpd.serve("user", "password", folder); + URI uploadUrl = URI.create(String.format("sftp://user:password@localhost:%d/", port)); + DateTime stagingCursor = DateTime.parse("2010-10-18TZ"); + DateTime uploadCursor = DateTime.parse("2010-10-17TZ"); + persistResource(Cursor.create(RDE_STAGING, stagingCursor, Registry.get("tld"))); + gcsUtils.delete(GHOSTRYDE_FILE_WITH_PREFIX); + gcsUtils.delete(LENGTH_FILE_WITH_PREFIX); + gcsUtils.delete(REPORT_FILE_WITH_PREFIX); + RdeUploadAction action = createAction(uploadUrl); + NoContentException thrown = + assertThrows(NoContentException.class, () -> action.runWithLock(uploadCursor)); + assertThat(thrown) + .hasMessageThat() + .isEqualTo("RDE deposit for TLD tld on 2010-10-17T00:00:00.000Z does not exist"); + cloudTasksHelper.assertNoTasksEnqueued("rde-upload"); + assertThat(folder.list()).isEmpty(); + } + + @TestOfyOnly void testRunWithLock_copiesOnGcs() throws Exception { int port = sftpd.serve("user", "password", folder); URI uploadUrl = URI.create(String.format("sftp://user:password@localhost:%d/", port)); DateTime stagingCursor = DateTime.parse("2010-10-18TZ"); DateTime uploadCursor = DateTime.parse("2010-10-17TZ"); persistResource(Cursor.create(RDE_STAGING, stagingCursor, Registry.get("tld"))); + gcsUtils.delete(GHOSTRYDE_FILE_WITH_PREFIX); + gcsUtils.delete(LENGTH_FILE_WITH_PREFIX); + gcsUtils.delete(REPORT_FILE_WITH_PREFIX); createAction(uploadUrl).runWithLock(uploadCursor); assertThat(response.getStatus()).isEqualTo(200); assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); @@ -306,7 +338,7 @@ public class RdeUploadActionTest { .isEqualTo(Files.toByteArray(new File(folder, sigFilename))); } - @TestOfyAndSql + @TestSqlOnly void testRunWithLock_copiesOnGcs_withPrefix() throws Exception { int port = sftpd.serve("user", "password", folder); URI uploadUrl = URI.create(String.format("sftp://user:password@localhost:%d/", port)); @@ -314,16 +346,10 @@ public class RdeUploadActionTest { DateTime uploadCursor = DateTime.parse("2010-10-17TZ"); persistResource(Cursor.create(RDE_STAGING, stagingCursor, Registry.get("tld"))); RdeUploadAction action = createAction(uploadUrl); - action.prefix = Optional.of("job-name/"); + action.prefix = Optional.of(JOB_PREFIX + "-job-name/"); gcsUtils.delete(GHOSTRYDE_FILE); - gcsUtils.createFromBytes( - GHOSTRYDE_FILE_WITH_PREFIX, Ghostryde.encode(DEPOSIT_XML.read(), encryptKey)); gcsUtils.delete(LENGTH_FILE); - gcsUtils.createFromBytes( - LENGTH_FILE_WITH_PREFIX, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8)); gcsUtils.delete(REPORT_FILE); - gcsUtils.createFromBytes( - REPORT_FILE_WITH_PREFIX, Ghostryde.encode(REPORT_XML.read(), encryptKey)); action.runWithLock(uploadCursor); assertThat(response.getStatus()).isEqualTo(200); assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); @@ -331,9 +357,49 @@ public class RdeUploadActionTest { cloudTasksHelper.assertNoTasksEnqueued("rde-upload"); // Assert that both files are written to SFTP and GCS, and that the contents are identical. String rydeFilename = "tld_2010-10-17_full_S1_R0.ryde"; - String rydeGcsFilename = "job-name/tld_2010-10-17_full_S1_R0.ryde"; + String rydeGcsFilename = JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R0.ryde"; String sigFilename = "tld_2010-10-17_full_S1_R0.sig"; - String sigGcsFilename = "job-name/tld_2010-10-17_full_S1_R0.sig"; + String sigGcsFilename = JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R0.sig"; + assertThat(folder.list()).asList().containsExactly(rydeFilename, sigFilename); + assertThat(gcsUtils.readBytesFrom(BlobId.of("bucket", rydeGcsFilename))) + .isEqualTo(Files.toByteArray(new File(folder, rydeFilename))); + assertThat(gcsUtils.readBytesFrom(BlobId.of("bucket", sigGcsFilename))) + .isEqualTo(Files.toByteArray(new File(folder, sigFilename))); + } + + @TestSqlOnly + void testRunWithLock_copiesOnGcs_withoutPrefix() throws Exception { + int port = sftpd.serve("user", "password", folder); + URI uploadUrl = URI.create(String.format("sftp://user:password@localhost:%d/", port)); + DateTime stagingCursor = DateTime.parse("2010-10-18TZ"); + DateTime uploadCursor = DateTime.parse("2010-10-17TZ"); + persistResource(Cursor.create(RDE_STAGING, stagingCursor, Registry.get("tld"))); + RdeUploadAction action = createAction(uploadUrl); + gcsUtils.delete(GHOSTRYDE_FILE); + gcsUtils.delete(LENGTH_FILE); + gcsUtils.delete(REPORT_FILE); + // Add a folder that is alphabetically before the desired folder and fill it will nonsense data. + // It should NOT be picked up. + BlobId ghostrydeFileWithPrefixBefore = + BlobId.of("bucket", JOB_PREFIX + "-job-nama/tld_2010-10-17_full_S1_R0.xml.ghostryde"); + BlobId lengthFileWithPrefixBefore = + BlobId.of("bucket", JOB_PREFIX + "-job-nama/tld_2010-10-17_full_S1_R0.xml.length"); + BlobId reportFileWithPrefixBefore = + BlobId.of( + "bucket", JOB_PREFIX + "-job-nama/tld_2010-10-17_full_S1_R0-report.xml.ghostryde"); + gcsUtils.createFromBytes(ghostrydeFileWithPrefixBefore, "foo".getBytes(UTF_8)); + gcsUtils.createFromBytes(lengthFileWithPrefixBefore, "bar".getBytes(UTF_8)); + gcsUtils.createFromBytes(reportFileWithPrefixBefore, "baz".getBytes(UTF_8)); + action.runWithLock(uploadCursor); + assertThat(response.getStatus()).isEqualTo(200); + assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).isEqualTo("OK tld 2010-10-17T00:00:00.000Z\n"); + cloudTasksHelper.assertNoTasksEnqueued("rde-upload"); + // Assert that both files are written to SFTP and GCS, and that the contents are identical. + String rydeFilename = "tld_2010-10-17_full_S1_R0.ryde"; + String rydeGcsFilename = JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R0.ryde"; + String sigFilename = "tld_2010-10-17_full_S1_R0.sig"; + String sigGcsFilename = JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R0.sig"; assertThat(folder.list()).asList().containsExactly(rydeFilename, sigFilename); assertThat(gcsUtils.readBytesFrom(BlobId.of("bucket", rydeGcsFilename))) .isEqualTo(Files.toByteArray(new File(folder, rydeFilename))); @@ -349,6 +415,19 @@ public class RdeUploadActionTest { DateTime stagingCursor = DateTime.parse("2010-10-18TZ"); DateTime uploadCursor = DateTime.parse("2010-10-17TZ"); persistSimpleResource(Cursor.create(RDE_STAGING, stagingCursor, Registry.get("tld"))); + BlobId ghostrydeR1FileWithPrefix = + BlobId.of("bucket", JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R1.xml.ghostryde"); + BlobId lengthR1FileWithPrefix = + BlobId.of("bucket", JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R1.xml.length"); + BlobId reportR1FileWithPrefix = + BlobId.of( + "bucket", JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R1-report.xml.ghostryde"); + gcsUtils.createFromBytes( + ghostrydeR1FileWithPrefix, Ghostryde.encode(DEPOSIT_XML.read(), encryptKey)); + gcsUtils.createFromBytes( + lengthR1FileWithPrefix, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8)); + gcsUtils.createFromBytes( + reportR1FileWithPrefix, Ghostryde.encode(REPORT_XML.read(), encryptKey)); createAction(uploadUrl).runWithLock(uploadCursor); assertThat(response.getStatus()).isEqualTo(200); assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);