mirror of
https://github.com/google/nomulus.git
synced 2025-04-29 11:37:51 +02:00
Find the most recent prefix for RdeReportAction (#2043)
When RdeReportAction is invoked without a prefix parameter (as in the case when it is kicked off by cron jobs for potential catch ups), we need to used the same heuristics that's employed in RdeUploadAction to find the most recent prefix for the given watermark, otherwise the job will not find any deposits to upload. Also renamed RdeUtil to RdeUtils, to be consistent with our naming conventions.
This commit is contained in:
parent
2df1fbc418
commit
04d72dabdf
9 changed files with 102 additions and 42 deletions
|
@ -43,7 +43,7 @@ import google.registry.rde.RdeMarshaller;
|
|||
import google.registry.rde.RdeModule;
|
||||
import google.registry.rde.RdeResourceType;
|
||||
import google.registry.rde.RdeUploadAction;
|
||||
import google.registry.rde.RdeUtil;
|
||||
import google.registry.rde.RdeUtils;
|
||||
import google.registry.request.Action.Service;
|
||||
import google.registry.request.RequestParameters;
|
||||
import google.registry.tldconfig.idn.IdnTableEnum;
|
||||
|
@ -166,7 +166,7 @@ public class RdeIO {
|
|||
final int revision =
|
||||
Optional.ofNullable(key.revision())
|
||||
.orElseGet(() -> RdeRevision.getNextRevision(tld, watermark, mode));
|
||||
String id = RdeUtil.timestampToId(watermark);
|
||||
String id = RdeUtils.timestampToId(watermark);
|
||||
String prefix =
|
||||
options.getJobName()
|
||||
+ '/'
|
||||
|
|
|
@ -112,8 +112,8 @@ final class ContactToXjcConverter {
|
|||
private static XjcRdeContactTransferDataType convertTransferData(TransferData model) {
|
||||
XjcRdeContactTransferDataType bean = new XjcRdeContactTransferDataType();
|
||||
bean.setTrStatus(XjcEppcomTrStatusType.fromValue(model.getTransferStatus().getXmlName()));
|
||||
bean.setReRr(RdeUtil.makeXjcRdeRrType(model.getGainingRegistrarId()));
|
||||
bean.setAcRr(RdeUtil.makeXjcRdeRrType(model.getLosingRegistrarId()));
|
||||
bean.setReRr(RdeUtils.makeXjcRdeRrType(model.getGainingRegistrarId()));
|
||||
bean.setAcRr(RdeUtils.makeXjcRdeRrType(model.getLosingRegistrarId()));
|
||||
bean.setReDate(model.getTransferRequestTime());
|
||||
bean.setAcDate(model.getPendingTransferExpirationTime());
|
||||
return bean;
|
||||
|
|
|
@ -262,8 +262,8 @@ final class DomainToXjcConverter {
|
|||
XjcRdeDomainTransferDataType bean = new XjcRdeDomainTransferDataType();
|
||||
bean.setTrStatus(
|
||||
XjcEppcomTrStatusType.fromValue(model.getTransferStatus().getXmlName()));
|
||||
bean.setReRr(RdeUtil.makeXjcRdeRrType(model.getGainingRegistrarId()));
|
||||
bean.setAcRr(RdeUtil.makeXjcRdeRrType(model.getLosingRegistrarId()));
|
||||
bean.setReRr(RdeUtils.makeXjcRdeRrType(model.getGainingRegistrarId()));
|
||||
bean.setAcRr(RdeUtils.makeXjcRdeRrType(model.getLosingRegistrarId()));
|
||||
bean.setReDate(model.getTransferRequestTime());
|
||||
bean.setAcDate(model.getPendingTransferExpirationTime());
|
||||
bean.setExDate(model.getTransferredRegistrationExpirationTime());
|
||||
|
|
|
@ -19,6 +19,7 @@ import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8;
|
|||
import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
|
||||
import static google.registry.model.rde.RdeMode.FULL;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
||||
import static google.registry.rde.RdeUtils.findMostRecentPrefixForWatermark;
|
||||
import static google.registry.request.Action.Method.POST;
|
||||
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
|
||||
|
||||
|
@ -98,8 +99,10 @@ public final class RdeReportAction implements Runnable, EscrowTask {
|
|||
RdeRevision.getCurrentRevision(tld, watermark, FULL)
|
||||
.orElseThrow(
|
||||
() -> new IllegalStateException("RdeRevision was not set on generated deposit"));
|
||||
String name =
|
||||
prefix.orElse("") + RdeNamingUtils.makeRydeFilename(tld, watermark, FULL, 1, revision);
|
||||
if (!prefix.isPresent()) {
|
||||
prefix = Optional.of(findMostRecentPrefixForWatermark(watermark, bucket, tld, gcsUtils));
|
||||
}
|
||||
String name = prefix.get() + RdeNamingUtils.makeRydeFilename(tld, watermark, FULL, 1, revision);
|
||||
BlobId reportFilename = BlobId.of(bucket, name + "-report.xml.ghostryde");
|
||||
verify(gcsUtils.existsAndNotEmpty(reportFilename), "Missing file: %s", reportFilename);
|
||||
reporter.send(readReportFromGcs(reportFilename));
|
||||
|
|
|
@ -23,6 +23,7 @@ import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
|
|||
import static google.registry.model.rde.RdeMode.FULL;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
||||
import static google.registry.rde.RdeModule.RDE_REPORT_QUEUE;
|
||||
import static google.registry.rde.RdeUtils.findMostRecentPrefixForWatermark;
|
||||
import static google.registry.request.Action.Method.POST;
|
||||
import static google.registry.util.DateTimeUtils.START_OF_TIME;
|
||||
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
|
||||
|
@ -31,7 +32,6 @@ import static java.util.Arrays.asList;
|
|||
import com.google.cloud.storage.BlobId;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.flogger.FluentLogger;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import com.jcraft.jsch.JSch;
|
||||
|
@ -136,26 +136,10 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
|
|||
|
||||
@Override
|
||||
public void runWithLock(final DateTime watermark) throws Exception {
|
||||
// If a prefix is not provided, but we are in SQL mode, try to determine the prefix. This should
|
||||
// only happen when the RDE upload cron job runs to catch up any un-retried (i. e. expected)
|
||||
// RDE failures.
|
||||
// If a prefix is not provided,try to determine the prefix. This should only happen when the RDE
|
||||
// upload cron job runs to catch up any un-retried (i. e. expected) RDE failures.
|
||||
if (!prefix.isPresent()) {
|
||||
// The prefix is always in the format of: rde-2022-02-21t00-00-00z-2022-02-21t00-07-33z, where
|
||||
// the first datetime is the watermark and the second one is the time when the RDE beam job
|
||||
// launched. We search for the latest folder that starts with "rde-[watermark]".
|
||||
String partialPrefix =
|
||||
String.format("rde-%s", watermark.toString("yyyy-MM-dd't'HH-mm-ss'z'"));
|
||||
String latestFilenameSuffix =
|
||||
gcsUtils.listFolderObjects(bucket, partialPrefix).stream()
|
||||
.max(Ordering.natural())
|
||||
.orElse(null);
|
||||
if (latestFilenameSuffix == null) {
|
||||
throw new NoContentException(
|
||||
String.format("RDE deposit for TLD %s on %s does not exist", tld, watermark));
|
||||
}
|
||||
int firstSlashPosition = latestFilenameSuffix.indexOf('/');
|
||||
prefix =
|
||||
Optional.of(partialPrefix + latestFilenameSuffix.substring(0, firstSlashPosition + 1));
|
||||
prefix = Optional.of(findMostRecentPrefixForWatermark(watermark, bucket, tld, gcsUtils));
|
||||
}
|
||||
logger.atInfo().log("Verifying readiness to upload the RDE deposit.");
|
||||
Optional<Cursor> cursor =
|
||||
|
@ -193,7 +177,7 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
|
|||
() -> new IllegalStateException("RdeRevision was not set on generated deposit"));
|
||||
final String nameWithoutPrefix =
|
||||
RdeNamingUtils.makeRydeFilename(tld, watermark, FULL, 1, revision);
|
||||
final String name = prefix.orElse("") + nameWithoutPrefix;
|
||||
final String name = prefix.get() + nameWithoutPrefix;
|
||||
final BlobId xmlFilename = BlobId.of(bucket, name + ".xml.ghostryde");
|
||||
final BlobId xmlLengthFilename = BlobId.of(bucket, name + ".xml.length");
|
||||
BlobId reportFilename = BlobId.of(bucket, name + "-report.xml.ghostryde");
|
||||
|
|
|
@ -17,9 +17,12 @@ package google.registry.rde;
|
|||
import static google.registry.util.HexDumper.dumpHex;
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.io.BaseEncoding;
|
||||
import com.google.re2j.Matcher;
|
||||
import com.google.re2j.Pattern;
|
||||
import google.registry.gcs.GcsUtils;
|
||||
import google.registry.request.HttpException.NoContentException;
|
||||
import google.registry.xjc.rde.XjcRdeRrType;
|
||||
import google.registry.xml.XmlException;
|
||||
import java.io.BufferedInputStream;
|
||||
|
@ -31,7 +34,7 @@ import org.joda.time.format.DateTimeFormatter;
|
|||
import org.joda.time.format.ISODateTimeFormat;
|
||||
|
||||
/** Helper methods for RDE. */
|
||||
public final class RdeUtil {
|
||||
public final class RdeUtils {
|
||||
|
||||
/** Number of bytes in head of XML deposit that will contain the information we want. */
|
||||
private static final int PEEK_SIZE = 2048;
|
||||
|
@ -70,6 +73,32 @@ public final class RdeUtil {
|
|||
return DATETIME_FORMATTER.parseDateTime(watermarkMatcher.group(1));
|
||||
}
|
||||
|
||||
/** Find the most recent folder in the given GCS bucket for the given watermark. */
|
||||
public static String findMostRecentPrefixForWatermark(
|
||||
DateTime watermark, String bucket, String tld, GcsUtils gcsUtils) throws NoContentException {
|
||||
// The prefix is always in the format of: rde-2022-02-21t00-00-00z-2022-02-21t00-07-33z, where
|
||||
// the first datetime is the watermark and the second one is the time when the RDE beam job
|
||||
// launched. We search for the latest folder that starts with "rde-[watermark]".
|
||||
String partialPrefix = String.format("rde-%s", watermark.toString("yyyy-MM-dd't'HH-mm-ss'z'"));
|
||||
String latestFilenameSuffix = null;
|
||||
try {
|
||||
latestFilenameSuffix =
|
||||
gcsUtils.listFolderObjects(bucket, partialPrefix).stream()
|
||||
.max(Ordering.natural())
|
||||
.orElse(null);
|
||||
} catch (IOException e) {
|
||||
throw new NoContentException(
|
||||
String.format(
|
||||
"Error reading folders starting with %s in bucket %s", partialPrefix, bucket));
|
||||
}
|
||||
if (latestFilenameSuffix == null) {
|
||||
throw new NoContentException(
|
||||
String.format("RDE deposit for TLD %s on %s does not exist", tld, watermark));
|
||||
}
|
||||
int firstSlashPosition = latestFilenameSuffix.indexOf('/');
|
||||
return partialPrefix + latestFilenameSuffix.substring(0, firstSlashPosition + 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates an ID matching the regex {@code \w{1,13} } from a millisecond
|
||||
* timestamp.
|
||||
|
@ -89,5 +118,5 @@ public final class RdeUtil {
|
|||
return bean;
|
||||
}
|
||||
|
||||
private RdeUtil() {}
|
||||
private RdeUtils() {}
|
||||
}
|
|
@ -20,7 +20,7 @@ import com.google.common.io.ByteStreams;
|
|||
import google.registry.keyring.api.KeyModule.Key;
|
||||
import google.registry.model.rde.RdeMode;
|
||||
import google.registry.model.rde.RdeNamingUtils;
|
||||
import google.registry.rde.RdeUtil;
|
||||
import google.registry.rde.RdeUtils;
|
||||
import google.registry.rde.RydeEncoder;
|
||||
import google.registry.xml.XmlException;
|
||||
import java.io.BufferedInputStream;
|
||||
|
@ -59,7 +59,7 @@ final class EscrowDepositEncryptor {
|
|||
throws IOException, XmlException {
|
||||
try (InputStream xmlFileInput = Files.newInputStream(xmlFile);
|
||||
BufferedInputStream xmlInput = new BufferedInputStream(xmlFileInput, PEEK_BUFFER_SIZE)) {
|
||||
DateTime watermark = RdeUtil.peekWatermark(xmlInput);
|
||||
DateTime watermark = RdeUtils.peekWatermark(xmlInput);
|
||||
String name = RdeNamingUtils.makeRydeFilename(tld, watermark, mode, 1, revision);
|
||||
Path rydePath = outdir.resolve(name + ".ryde");
|
||||
Path sigPath = outdir.resolve(name + ".sig");
|
||||
|
|
|
@ -63,6 +63,7 @@ import google.registry.xjc.rdereport.XjcRdeReportReport;
|
|||
import google.registry.xml.XmlException;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import org.bouncycastle.openpgp.PGPPublicKey;
|
||||
|
@ -95,7 +96,7 @@ public class RdeReportActionTest {
|
|||
new FakeKeyringModule().get().getRdeStagingEncryptionKey();
|
||||
private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions());
|
||||
private final BlobId reportFile =
|
||||
BlobId.of("tub", "test_2006-06-06_full_S1_R0-report.xml.ghostryde");
|
||||
BlobId.of("tub", "job-name/test_2006-06-06_full_S1_R0-report.xml.ghostryde");
|
||||
private Tld registry;
|
||||
|
||||
private RdeReportAction createAction() {
|
||||
|
@ -114,7 +115,7 @@ public class RdeReportActionTest {
|
|||
action.timeout = standardSeconds(30);
|
||||
action.stagingDecryptionKey = new FakeKeyringModule().get().getRdeStagingDecryptionKey();
|
||||
action.runner = runner;
|
||||
action.prefix = Optional.empty();
|
||||
action.prefix = Optional.of("job-name/");
|
||||
return action;
|
||||
}
|
||||
|
||||
|
@ -170,11 +171,54 @@ public class RdeReportActionTest {
|
|||
when(httpResponse.getContent()).thenReturn(IIRDEA_GOOD_XML.read());
|
||||
when(urlFetchService.fetch(request.capture())).thenReturn(httpResponse);
|
||||
RdeReportAction action = createAction();
|
||||
action.prefix = Optional.of("job-name/");
|
||||
action.runWithLock(loadRdeReportCursor());
|
||||
assertThat(response.getStatus()).isEqualTo(200);
|
||||
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
|
||||
assertThat(response.getPayload()).isEqualTo("OK test 2006-06-06T00:00:00.000Z\n");
|
||||
|
||||
// Verify the HTTP request was correct.
|
||||
assertThat(request.getValue().getMethod()).isSameInstanceAs(PUT);
|
||||
assertThat(request.getValue().getURL().getProtocol()).isEqualTo("https");
|
||||
assertThat(request.getValue().getURL().getPath()).endsWith("/test/20101017001");
|
||||
Map<String, String> headers = mapifyHeaders(request.getValue().getHeaders());
|
||||
assertThat(headers).containsEntry("CONTENT_TYPE", "text/xml");
|
||||
assertThat(headers).containsEntry("AUTHORIZATION", "Basic dGVzdF9yeTpmb28=");
|
||||
|
||||
// Verify the payload XML was the same as what's in testdata/report.xml.
|
||||
XjcRdeReportReport report = parseReport(request.getValue().getPayload());
|
||||
assertThat(report.getId()).isEqualTo("20101017001");
|
||||
assertThat(report.getCrDate()).isEqualTo(DateTime.parse("2010-10-17T00:15:00.0Z"));
|
||||
assertThat(report.getWatermark()).isEqualTo(DateTime.parse("2010-10-17T00:00:00Z"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRunWithLock_withoutPrefix_noPrefixFound() throws Exception {
|
||||
RdeReportAction action = createAction();
|
||||
action.prefix = Optional.empty();
|
||||
assertThrows(NoContentException.class, () -> action.runWithLock(loadRdeReportCursor()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRunWithLock_withoutPrefix() throws Exception {
|
||||
when(httpResponse.getResponseCode()).thenReturn(SC_OK);
|
||||
when(httpResponse.getContent()).thenReturn(IIRDEA_GOOD_XML.read());
|
||||
when(urlFetchService.fetch(request.capture())).thenReturn(httpResponse);
|
||||
RdeReportAction action = createAction();
|
||||
action.prefix = Optional.empty();
|
||||
gcsUtils.delete(reportFile);
|
||||
BlobId otherReportFile1 =
|
||||
BlobId.of(
|
||||
"tub", "rde-2006-06-06t00-00-00z-1/test_2006-06-06_full_S1_R0-report.xml.ghostryde");
|
||||
BlobId otherReportFile2 =
|
||||
BlobId.of(
|
||||
"tub", "rde-2006-06-06t00-00-00z-2/test_2006-06-06_full_S1_R1-report.xml.ghostryde");
|
||||
// This file's content is not correct, if it is read, the action should throw.
|
||||
gcsUtils.createFromBytes(
|
||||
BlobId.of("tub", "job-name/test_2006-06-06_full_S1_R0-report.xml.ghostryde"),
|
||||
Ghostryde.encode(REPORT_XML.read(), encryptKey));
|
||||
otherReportFile1,
|
||||
Ghostryde.encode(
|
||||
ByteSource.wrap("BAD DATA".getBytes(StandardCharsets.UTF_8)).read(), encryptKey));
|
||||
gcsUtils.createFromBytes(otherReportFile2, Ghostryde.encode(REPORT_XML.read(), encryptKey));
|
||||
tm().transact(() -> RdeRevision.saveRevision("test", DateTime.parse("2006-06-06TZ"), FULL, 1));
|
||||
action.runWithLock(loadRdeReportCursor());
|
||||
assertThat(response.getStatus()).isEqualTo(200);
|
||||
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
|
||||
|
@ -198,7 +242,7 @@ public class RdeReportActionTest {
|
|||
@Test
|
||||
void testRunWithLock_regeneratedReport() throws Exception {
|
||||
gcsUtils.delete(reportFile);
|
||||
BlobId newReport = BlobId.of("tub", "test_2006-06-06_full_S1_R1-report.xml.ghostryde");
|
||||
BlobId newReport = BlobId.of("tub", "job-name/test_2006-06-06_full_S1_R1-report.xml.ghostryde");
|
||||
PGPPublicKey encryptKey = new FakeKeyringModule().get().getRdeStagingEncryptionKey();
|
||||
gcsUtils.createFromBytes(newReport, Ghostryde.encode(REPORT_XML.read(), encryptKey));
|
||||
tm().transact(() -> RdeRevision.saveRevision("test", DateTime.parse("2006-06-06TZ"), FULL, 1));
|
||||
|
|
|
@ -343,7 +343,7 @@ public class RdeUploadActionTest {
|
|||
gcsUtils.delete(GHOSTRYDE_FILE);
|
||||
gcsUtils.delete(LENGTH_FILE);
|
||||
gcsUtils.delete(REPORT_FILE);
|
||||
// Add a folder that is alphabetically before the desired folder and fill it will nonsense data.
|
||||
// Add a folder that is alphabetically before the desired folder and fill it with nonsense data.
|
||||
// It should NOT be picked up.
|
||||
BlobId ghostrydeFileWithPrefixBefore =
|
||||
BlobId.of("bucket", JOB_PREFIX + "-job-nama/tld_2010-10-17_full_S1_R0.xml.ghostryde");
|
||||
|
@ -464,7 +464,7 @@ public class RdeUploadActionTest {
|
|||
action.sftpCooldown = standardHours(2);
|
||||
DateTime stagingCursor = DateTime.parse("2010-10-18TZ");
|
||||
DateTime uploadCursor = DateTime.parse("2010-10-17TZ");
|
||||
DateTime sftpCursor = uploadCursor.minusMinutes(97); // Within the 2 hour cooldown period.
|
||||
DateTime sftpCursor = uploadCursor.minusMinutes(97); // Within the 2-hour cooldown period.
|
||||
persistResource(Cursor.createScoped(RDE_STAGING, stagingCursor, Tld.get("tld")));
|
||||
persistResource(Cursor.createScoped(RDE_UPLOAD_SFTP, sftpCursor, Tld.get("tld")));
|
||||
NoContentException thrown =
|
||||
|
@ -477,7 +477,7 @@ public class RdeUploadActionTest {
|
|||
+ " ago)");
|
||||
}
|
||||
|
||||
private String slurp(InputStream is) throws IOException {
|
||||
private static String slurp(InputStream is) throws IOException {
|
||||
return CharStreams.toString(new InputStreamReader(is, UTF_8));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue