Make a best effort guess on the RDE folder name (prefix) when not provided. (#1574)

We have a cron job that runs the RDE upload action every 4 hours for all
TLD. Normally this should be a no-op beacuse a RDE upload is scheduled
after RDE staging is completed, and when it fails with non-2XX status it
will retry. However if for some reason it failed due to 20X status (like
waiting for the SFTP cursor), it will not retry but rely on the cron job to
catch up.

With the BEAM RDE pipeline every staging job saves all its deposits in a
uniquely named folder to avoid the need to use a lock, which is not
practical in BEAM. However the cron job has no way of knowing what the
prefixes are for each TLD so it will fail in SQL mode.

In this PR we implemented a logic to guess what the prefix should be and
use it, if we are in SQL mode and a prefix is not provided.

<!-- Reviewable:start -->
---
This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/google/nomulus/1574)
<!-- Reviewable:end -->
This commit is contained in:
Lai Jiang 2022-03-30 11:36:24 -04:00 committed by GitHub
parent 33db0d360c
commit cc244fb4df
2 changed files with 118 additions and 18 deletions

View file

@ -32,6 +32,7 @@ import static java.util.Arrays.asList;
import com.google.cloud.storage.BlobId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Ordering;
import com.google.common.flogger.FluentLogger;
import com.google.common.io.ByteStreams;
import com.jcraft.jsch.JSch;
@ -122,7 +123,6 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
@Inject @Key("rdeSigningKey") PGPKeyPair signingKey;
@Inject @Key("rdeStagingDecryptionKey") PGPPrivateKey stagingDecryptionKey;
@Inject RdeUploadAction() {}
@Override
public void run() {
logger.atInfo().log("Attempting to acquire RDE upload lock for TLD '%s'.", tld);
@ -140,6 +140,27 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
@Override
public void runWithLock(final DateTime watermark) throws Exception {
// If a prefix is not provided, but we are in SQL mode, try to determine the prefix. This should
// only happen when the RDE upload cron job runs to catch up any un-retried (i. e. expected)
// RDE failures.
if (prefix.isEmpty() && !tm().isOfy()) {
// The prefix is always in the format of: rde-2022-02-21t00-00-00z-2022-02-21t00-07-33z, where
// the first datetime is the watermark and the second one is the time when the RDE beam job
// launched. We search for the latest folder that starts with "rde-[watermark]".
String partialPrefix =
String.format("rde-%s", watermark.toString("yyyy-MM-dd't'HH-mm-ss'z'"));
String latestFilenameSuffix =
gcsUtils.listFolderObjects(bucket, partialPrefix).stream()
.max(Ordering.natural())
.orElse(null);
if (latestFilenameSuffix == null) {
throw new NoContentException(
String.format("RDE deposit for TLD %s on %s does not exist", tld, watermark));
}
int firstSlashPosition = latestFilenameSuffix.indexOf('/');
prefix =
Optional.of(partialPrefix + latestFilenameSuffix.substring(0, firstSlashPosition + 1));
}
logger.atInfo().log("Verifying readiness to upload the RDE deposit.");
Optional<Cursor> cursor =
transactIfJpaTm(() -> tm().loadByKeyIfPresent(Cursor.createVKey(RDE_STAGING, tld)));
@ -241,9 +262,9 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
.setSignatureOutput(sigOut, signingKey)
.setFileMetadata(nameWithoutPrefix, xmlLength, watermark)
.build()) {
long bytesCopied = ByteStreams.copy(ghostrydeDecoder, rydeEncoder);
long bytesCopied = ByteStreams.copy(ghostrydeDecoder, rydeEncoder);
logger.atInfo().log("Uploaded %,d bytes to path '%s'.", bytesCopied, rydeFilename);
}
}
String sigFilename = nameWithoutPrefix + ".sig";
BlobId sigGcsFilename = BlobId.of(bucket, name + ".sig");
byte[] signature = sigOut.toByteArray();

View file

@ -69,6 +69,8 @@ import google.registry.testing.FakeSleeper;
import google.registry.testing.GpgSystemCommandExtension;
import google.registry.testing.Lazies;
import google.registry.testing.TestOfyAndSql;
import google.registry.testing.TestOfyOnly;
import google.registry.testing.TestSqlOnly;
import google.registry.testing.sftp.SftpServerExtension;
import google.registry.util.Retrier;
import java.io.File;
@ -91,6 +93,7 @@ public class RdeUploadActionTest {
private static final ByteSource REPORT_XML = RdeTestData.loadBytes("report.xml");
private static final ByteSource DEPOSIT_XML = RdeTestData.loadBytes("deposit_full.xml");
private static final String JOB_PREFIX = "rde-2010-10-17t00-00-00z";
private static final BlobId GHOSTRYDE_FILE =
BlobId.of("bucket", "tld_2010-10-17_full_S1_R0.xml.ghostryde");
@ -99,11 +102,11 @@ public class RdeUploadActionTest {
private static final BlobId REPORT_FILE =
BlobId.of("bucket", "tld_2010-10-17_full_S1_R0-report.xml.ghostryde");
private static final BlobId GHOSTRYDE_FILE_WITH_PREFIX =
BlobId.of("bucket", "job-name/tld_2010-10-17_full_S1_R0.xml.ghostryde");
BlobId.of("bucket", JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R0.xml.ghostryde");
private static final BlobId LENGTH_FILE_WITH_PREFIX =
BlobId.of("bucket", "job-name/tld_2010-10-17_full_S1_R0.xml.length");
BlobId.of("bucket", JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R0.xml.length");
private static final BlobId REPORT_FILE_WITH_PREFIX =
BlobId.of("bucket", "job-name/tld_2010-10-17_full_S1_R0-report.xml.ghostryde");
BlobId.of("bucket", JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R0-report.xml.ghostryde");
private static final BlobId GHOSTRYDE_R1_FILE =
BlobId.of("bucket", "tld_2010-10-17_full_S1_R1.xml.ghostryde");
@ -181,7 +184,6 @@ public class RdeUploadActionTest {
return jschSpy;
}
@BeforeEach
void beforeEach() throws Exception {
// Force "development" mode so we don't try to really connect to GCS.
@ -194,6 +196,13 @@ public class RdeUploadActionTest {
gcsUtils.createFromBytes(LENGTH_R1_FILE, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8));
gcsUtils.createFromBytes(REPORT_FILE, Ghostryde.encode(REPORT_XML.read(), encryptKey));
gcsUtils.createFromBytes(REPORT_R1_FILE, Ghostryde.encode(REPORT_XML.read(), encryptKey));
gcsUtils.createFromBytes(
GHOSTRYDE_FILE_WITH_PREFIX, Ghostryde.encode(DEPOSIT_XML.read(), encryptKey));
gcsUtils.createFromBytes(
LENGTH_FILE_WITH_PREFIX, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8));
gcsUtils.createFromBytes(
REPORT_FILE_WITH_PREFIX, Ghostryde.encode(REPORT_XML.read(), encryptKey));
tm().transact(
() -> {
RdeRevision.saveRevision("lol", DateTime.parse("2010-10-17TZ"), FULL, 0);
@ -284,13 +293,36 @@ public class RdeUploadActionTest {
assertThat(thrown).hasMessageThat().contains("The crow flies in square circles.");
}
@TestOfyAndSql
@TestSqlOnly
void testRunWithLock_cannotGuessPrefix() throws Exception {
int port = sftpd.serve("user", "password", folder);
URI uploadUrl = URI.create(String.format("sftp://user:password@localhost:%d/", port));
DateTime stagingCursor = DateTime.parse("2010-10-18TZ");
DateTime uploadCursor = DateTime.parse("2010-10-17TZ");
persistResource(Cursor.create(RDE_STAGING, stagingCursor, Registry.get("tld")));
gcsUtils.delete(GHOSTRYDE_FILE_WITH_PREFIX);
gcsUtils.delete(LENGTH_FILE_WITH_PREFIX);
gcsUtils.delete(REPORT_FILE_WITH_PREFIX);
RdeUploadAction action = createAction(uploadUrl);
NoContentException thrown =
assertThrows(NoContentException.class, () -> action.runWithLock(uploadCursor));
assertThat(thrown)
.hasMessageThat()
.isEqualTo("RDE deposit for TLD tld on 2010-10-17T00:00:00.000Z does not exist");
cloudTasksHelper.assertNoTasksEnqueued("rde-upload");
assertThat(folder.list()).isEmpty();
}
@TestOfyOnly
void testRunWithLock_copiesOnGcs() throws Exception {
int port = sftpd.serve("user", "password", folder);
URI uploadUrl = URI.create(String.format("sftp://user:password@localhost:%d/", port));
DateTime stagingCursor = DateTime.parse("2010-10-18TZ");
DateTime uploadCursor = DateTime.parse("2010-10-17TZ");
persistResource(Cursor.create(RDE_STAGING, stagingCursor, Registry.get("tld")));
gcsUtils.delete(GHOSTRYDE_FILE_WITH_PREFIX);
gcsUtils.delete(LENGTH_FILE_WITH_PREFIX);
gcsUtils.delete(REPORT_FILE_WITH_PREFIX);
createAction(uploadUrl).runWithLock(uploadCursor);
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
@ -306,7 +338,7 @@ public class RdeUploadActionTest {
.isEqualTo(Files.toByteArray(new File(folder, sigFilename)));
}
@TestOfyAndSql
@TestSqlOnly
void testRunWithLock_copiesOnGcs_withPrefix() throws Exception {
int port = sftpd.serve("user", "password", folder);
URI uploadUrl = URI.create(String.format("sftp://user:password@localhost:%d/", port));
@ -314,16 +346,10 @@ public class RdeUploadActionTest {
DateTime uploadCursor = DateTime.parse("2010-10-17TZ");
persistResource(Cursor.create(RDE_STAGING, stagingCursor, Registry.get("tld")));
RdeUploadAction action = createAction(uploadUrl);
action.prefix = Optional.of("job-name/");
action.prefix = Optional.of(JOB_PREFIX + "-job-name/");
gcsUtils.delete(GHOSTRYDE_FILE);
gcsUtils.createFromBytes(
GHOSTRYDE_FILE_WITH_PREFIX, Ghostryde.encode(DEPOSIT_XML.read(), encryptKey));
gcsUtils.delete(LENGTH_FILE);
gcsUtils.createFromBytes(
LENGTH_FILE_WITH_PREFIX, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8));
gcsUtils.delete(REPORT_FILE);
gcsUtils.createFromBytes(
REPORT_FILE_WITH_PREFIX, Ghostryde.encode(REPORT_XML.read(), encryptKey));
action.runWithLock(uploadCursor);
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
@ -331,9 +357,49 @@ public class RdeUploadActionTest {
cloudTasksHelper.assertNoTasksEnqueued("rde-upload");
// Assert that both files are written to SFTP and GCS, and that the contents are identical.
String rydeFilename = "tld_2010-10-17_full_S1_R0.ryde";
String rydeGcsFilename = "job-name/tld_2010-10-17_full_S1_R0.ryde";
String rydeGcsFilename = JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R0.ryde";
String sigFilename = "tld_2010-10-17_full_S1_R0.sig";
String sigGcsFilename = "job-name/tld_2010-10-17_full_S1_R0.sig";
String sigGcsFilename = JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R0.sig";
assertThat(folder.list()).asList().containsExactly(rydeFilename, sigFilename);
assertThat(gcsUtils.readBytesFrom(BlobId.of("bucket", rydeGcsFilename)))
.isEqualTo(Files.toByteArray(new File(folder, rydeFilename)));
assertThat(gcsUtils.readBytesFrom(BlobId.of("bucket", sigGcsFilename)))
.isEqualTo(Files.toByteArray(new File(folder, sigFilename)));
}
@TestSqlOnly
void testRunWithLock_copiesOnGcs_withoutPrefix() throws Exception {
int port = sftpd.serve("user", "password", folder);
URI uploadUrl = URI.create(String.format("sftp://user:password@localhost:%d/", port));
DateTime stagingCursor = DateTime.parse("2010-10-18TZ");
DateTime uploadCursor = DateTime.parse("2010-10-17TZ");
persistResource(Cursor.create(RDE_STAGING, stagingCursor, Registry.get("tld")));
RdeUploadAction action = createAction(uploadUrl);
gcsUtils.delete(GHOSTRYDE_FILE);
gcsUtils.delete(LENGTH_FILE);
gcsUtils.delete(REPORT_FILE);
// Add a folder that is alphabetically before the desired folder and fill it will nonsense data.
// It should NOT be picked up.
BlobId ghostrydeFileWithPrefixBefore =
BlobId.of("bucket", JOB_PREFIX + "-job-nama/tld_2010-10-17_full_S1_R0.xml.ghostryde");
BlobId lengthFileWithPrefixBefore =
BlobId.of("bucket", JOB_PREFIX + "-job-nama/tld_2010-10-17_full_S1_R0.xml.length");
BlobId reportFileWithPrefixBefore =
BlobId.of(
"bucket", JOB_PREFIX + "-job-nama/tld_2010-10-17_full_S1_R0-report.xml.ghostryde");
gcsUtils.createFromBytes(ghostrydeFileWithPrefixBefore, "foo".getBytes(UTF_8));
gcsUtils.createFromBytes(lengthFileWithPrefixBefore, "bar".getBytes(UTF_8));
gcsUtils.createFromBytes(reportFileWithPrefixBefore, "baz".getBytes(UTF_8));
action.runWithLock(uploadCursor);
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("OK tld 2010-10-17T00:00:00.000Z\n");
cloudTasksHelper.assertNoTasksEnqueued("rde-upload");
// Assert that both files are written to SFTP and GCS, and that the contents are identical.
String rydeFilename = "tld_2010-10-17_full_S1_R0.ryde";
String rydeGcsFilename = JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R0.ryde";
String sigFilename = "tld_2010-10-17_full_S1_R0.sig";
String sigGcsFilename = JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R0.sig";
assertThat(folder.list()).asList().containsExactly(rydeFilename, sigFilename);
assertThat(gcsUtils.readBytesFrom(BlobId.of("bucket", rydeGcsFilename)))
.isEqualTo(Files.toByteArray(new File(folder, rydeFilename)));
@ -349,6 +415,19 @@ public class RdeUploadActionTest {
DateTime stagingCursor = DateTime.parse("2010-10-18TZ");
DateTime uploadCursor = DateTime.parse("2010-10-17TZ");
persistSimpleResource(Cursor.create(RDE_STAGING, stagingCursor, Registry.get("tld")));
BlobId ghostrydeR1FileWithPrefix =
BlobId.of("bucket", JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R1.xml.ghostryde");
BlobId lengthR1FileWithPrefix =
BlobId.of("bucket", JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R1.xml.length");
BlobId reportR1FileWithPrefix =
BlobId.of(
"bucket", JOB_PREFIX + "-job-name/tld_2010-10-17_full_S1_R1-report.xml.ghostryde");
gcsUtils.createFromBytes(
ghostrydeR1FileWithPrefix, Ghostryde.encode(DEPOSIT_XML.read(), encryptKey));
gcsUtils.createFromBytes(
lengthR1FileWithPrefix, Long.toString(DEPOSIT_XML.size()).getBytes(UTF_8));
gcsUtils.createFromBytes(
reportR1FileWithPrefix, Ghostryde.encode(REPORT_XML.read(), encryptKey));
createAction(uploadUrl).runWithLock(uploadCursor);
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);