Add retry logic to RDE upload

We experience intermittent exceptions when connecting to Iron Mountain. Just retrying is probably the easiest solution to the problem.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=141221179
This commit is contained in:
mountford 2016-12-06 13:49:50 -08:00 committed by Ben McIlwain
parent c7cc97d62b
commit 50aad72383
3 changed files with 60 additions and 8 deletions

View file

@ -27,9 +27,11 @@ import static java.util.Arrays.asList;
import com.google.appengine.api.taskqueue.Queue; import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.tools.cloudstorage.GcsFilename; import com.google.appengine.tools.cloudstorage.GcsFilename;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.googlecode.objectify.VoidWork; import com.googlecode.objectify.VoidWork;
import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import google.registry.config.ConfigModule.Config; import google.registry.config.ConfigModule.Config;
import google.registry.gcs.GcsUtils; import google.registry.gcs.GcsUtils;
import google.registry.keyring.api.KeyModule.Key; import google.registry.keyring.api.KeyModule.Key;
@ -47,6 +49,7 @@ import google.registry.request.RequestParameters;
import google.registry.request.Response; import google.registry.request.Response;
import google.registry.util.Clock; import google.registry.util.Clock;
import google.registry.util.FormattingLogger; import google.registry.util.FormattingLogger;
import google.registry.util.Retrier;
import google.registry.util.TaskEnqueuer; import google.registry.util.TaskEnqueuer;
import google.registry.util.TeeOutputStream; import google.registry.util.TeeOutputStream;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -54,6 +57,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URI; import java.net.URI;
import java.util.concurrent.Callable;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Named; import javax.inject.Named;
import org.bouncycastle.openpgp.PGPKeyPair; import org.bouncycastle.openpgp.PGPKeyPair;
@ -92,6 +96,7 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
@Inject RydePgpSigningOutputStreamFactory pgpSigningFactory; @Inject RydePgpSigningOutputStreamFactory pgpSigningFactory;
@Inject RydeTarOutputStreamFactory tarFactory; @Inject RydeTarOutputStreamFactory tarFactory;
@Inject TaskEnqueuer taskEnqueuer; @Inject TaskEnqueuer taskEnqueuer;
@Inject Retrier retrier;
@Inject @Parameter(RequestParameters.PARAM_TLD) String tld; @Inject @Parameter(RequestParameters.PARAM_TLD) String tld;
@Inject @Config("rdeBucket") String bucket; @Inject @Config("rdeBucket") String bucket;
@Inject @Config("rdeInterval") Duration interval; @Inject @Config("rdeInterval") Duration interval;
@ -113,7 +118,7 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
} }
@Override @Override
public void runWithLock(DateTime watermark) throws Exception { public void runWithLock(final DateTime watermark) throws Exception {
DateTime stagingCursorTime = getCursorTimeOrStartOfTime( DateTime stagingCursorTime = getCursorTimeOrStartOfTime(
ofy().load().key(Cursor.createKey(CursorType.RDE_STAGING, Registry.get(tld))).now()); ofy().load().key(Cursor.createKey(CursorType.RDE_STAGING, Registry.get(tld))).now());
if (!stagingCursorTime.isAfter(watermark)) { if (!stagingCursorTime.isAfter(watermark)) {
@ -129,14 +134,22 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
} }
int revision = RdeRevision.getNextRevision(tld, watermark, FULL) - 1; int revision = RdeRevision.getNextRevision(tld, watermark, FULL) - 1;
verify(revision >= 0, "RdeRevision was not set on generated deposit"); verify(revision >= 0, "RdeRevision was not set on generated deposit");
String name = RdeNamingUtils.makeRydeFilename(tld, watermark, FULL, 1, revision); final String name = RdeNamingUtils.makeRydeFilename(tld, watermark, FULL, 1, revision);
GcsFilename xmlFilename = new GcsFilename(bucket, name + ".xml.ghostryde"); final GcsFilename xmlFilename = new GcsFilename(bucket, name + ".xml.ghostryde");
GcsFilename xmlLengthFilename = new GcsFilename(bucket, name + ".xml.length"); final GcsFilename xmlLengthFilename = new GcsFilename(bucket, name + ".xml.length");
GcsFilename reportFilename = new GcsFilename(bucket, name + "-report.xml.ghostryde"); GcsFilename reportFilename = new GcsFilename(bucket, name + "-report.xml.ghostryde");
verifyFileExists(xmlFilename); verifyFileExists(xmlFilename);
verifyFileExists(xmlLengthFilename); verifyFileExists(xmlLengthFilename);
verifyFileExists(reportFilename); verifyFileExists(reportFilename);
upload(xmlFilename, readXmlLength(xmlLengthFilename), watermark, name); final long xmlLength = readXmlLength(xmlLengthFilename);
retrier.callWithRetry(
new Callable<Void>() {
@Override
public Void call() throws Exception {
upload(xmlFilename, xmlLength, watermark, name);
return null;
}},
JSchException.class);
ofy().transact(new VoidWork() { ofy().transact(new VoidWork() {
@Override @Override
public void vrun() { public void vrun() {
@ -172,7 +185,8 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
* && cat /tmp/sig > gs://bucket/$rydeFilename.sig # Save a copy of signature to GCS. * && cat /tmp/sig > gs://bucket/$rydeFilename.sig # Save a copy of signature to GCS.
* }</pre> * }</pre>
*/ */
private void upload( @VisibleForTesting
protected void upload(
GcsFilename xmlFile, long xmlLength, DateTime watermark, String name) throws Exception { GcsFilename xmlFile, long xmlLength, DateTime watermark, String name) throws Exception {
logger.infofmt("Uploading %s to %s", xmlFile, uploadUrl); logger.infofmt("Uploading %s to %s", xmlFile, uploadUrl);
try (InputStream gcsInput = gcsUtils.openInputStream(xmlFile); try (InputStream gcsInput = gcsUtils.openInputStream(xmlFile);

View file

@ -40,6 +40,7 @@ java_library(
"//third_party/java/hamcrest", "//third_party/java/hamcrest",
"//third_party/java/joda_money", "//third_party/java/joda_money",
"//third_party/java/joda_time", "//third_party/java/joda_time",
"//third_party/java/jsch/v0_1_44_google",
"//third_party/java/jsr305_annotations", "//third_party/java/jsr305_annotations",
"//third_party/java/jsr330_inject", "//third_party/java/jsr330_inject",
"//third_party/java/junit", "//third_party/java/junit",

View file

@ -31,9 +31,13 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.joda.time.Duration.standardDays; import static org.joda.time.Duration.standardDays;
import static org.joda.time.Duration.standardSeconds; import static org.joda.time.Duration.standardSeconds;
import static org.junit.Assume.assumeTrue; import static org.junit.Assume.assumeTrue;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.appengine.api.taskqueue.QueueFactory; import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.tools.cloudstorage.GcsFilename; import com.google.appengine.tools.cloudstorage.GcsFilename;
@ -43,6 +47,9 @@ import com.google.common.io.ByteSource;
import com.google.common.io.CharStreams; import com.google.common.io.CharStreams;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.googlecode.objectify.VoidWork; import com.googlecode.objectify.VoidWork;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import google.registry.gcs.GcsUtils; import google.registry.gcs.GcsUtils;
import google.registry.keyring.api.Keyring; import google.registry.keyring.api.Keyring;
import google.registry.model.common.Cursor; import google.registry.model.common.Cursor;
@ -57,6 +64,7 @@ import google.registry.testing.BouncyCastleProviderRule;
import google.registry.testing.ExceptionRule; import google.registry.testing.ExceptionRule;
import google.registry.testing.FakeClock; import google.registry.testing.FakeClock;
import google.registry.testing.FakeResponse; import google.registry.testing.FakeResponse;
import google.registry.testing.FakeSleeper;
import google.registry.testing.GpgSystemCommandRule; import google.registry.testing.GpgSystemCommandRule;
import google.registry.testing.IoSpyRule; import google.registry.testing.IoSpyRule;
import google.registry.testing.Providers; import google.registry.testing.Providers;
@ -81,6 +89,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.OngoingStubbing;
/** Unit tests for {@link RdeUploadAction}. */ /** Unit tests for {@link RdeUploadAction}. */
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
@ -202,10 +211,22 @@ public class RdeUploadActionTest {
action.reportQueue = QueueFactory.getQueue("rde-report"); action.reportQueue = QueueFactory.getQueue("rde-report");
action.runner = runner; action.runner = runner;
action.taskEnqueuer = new TaskEnqueuer(new Retrier(null, 1)); action.taskEnqueuer = new TaskEnqueuer(new Retrier(null, 1));
action.retrier = new Retrier(new FakeSleeper(clock), 3);
return action; return action;
} }
} }
private static JSch createThrowingJSchSpy(JSch jsch, int numTimesToThrow) throws JSchException {
JSch jschSpy = spy(jsch);
OngoingStubbing<Session> stubbing =
when(jschSpy.getSession(anyString(), anyString(), anyInt()));
for (int i = 0; i < numTimesToThrow; i++) {
stubbing = stubbing.thenThrow(new JSchException("The crow flies in square circles."));
}
stubbing.thenCallRealMethod();
return jschSpy;
}
@Before @Before
public void before() throws Exception { public void before() throws Exception {
createTld("tld"); createTld("tld");
@ -253,7 +274,7 @@ public class RdeUploadActionTest {
} }
@Test @Test
public void testRunWithLock() throws Exception { public void testRunWithLock_succeedsOnThirdTry() throws Exception {
// XXX: For any port other than 22, JSch will reformat the hostname IPv6 style which causes // XXX: For any port other than 22, JSch will reformat the hostname IPv6 style which causes
// known host matching to fail. // known host matching to fail.
int port = sftpd.serve("user", "password", folder.getRoot()); int port = sftpd.serve("user", "password", folder.getRoot());
@ -262,7 +283,9 @@ public class RdeUploadActionTest {
DateTime uploadCursor = DateTime.parse("2010-10-17TZ"); DateTime uploadCursor = DateTime.parse("2010-10-17TZ");
persistResource( persistResource(
Cursor.create(CursorType.RDE_STAGING, stagingCursor, Registry.get("tld"))); Cursor.create(CursorType.RDE_STAGING, stagingCursor, Registry.get("tld")));
createAction(uploadUrl).runWithLock(uploadCursor); RdeUploadAction action = createAction(uploadUrl);
action.jsch = createThrowingJSchSpy(action.jsch, 2);
action.runWithLock(uploadCursor);
assertThat(response.getStatus()).isEqualTo(200); assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("OK tld 2010-10-17T00:00:00.000Z\n"); assertThat(response.getPayload()).isEqualTo("OK tld 2010-10-17T00:00:00.000Z\n");
@ -273,6 +296,20 @@ public class RdeUploadActionTest {
"tld_2010-10-17_full_S1_R0.sig"); "tld_2010-10-17_full_S1_R0.sig");
} }
@Test
public void testRunWithLock_failsAfterThreeAttempts() throws Exception {
int port = sftpd.serve("user", "password", folder.getRoot());
URI uploadUrl = URI.create(String.format("sftp://user:password@127.0.0.1:%d/", port));
DateTime stagingCursor = DateTime.parse("2010-10-18TZ");
DateTime uploadCursor = DateTime.parse("2010-10-17TZ");
persistResource(
Cursor.create(CursorType.RDE_STAGING, stagingCursor, Registry.get("tld")));
RdeUploadAction action = createAction(uploadUrl);
action.jsch = createThrowingJSchSpy(action.jsch, 3);
thrown.expect(RuntimeException.class, "The crow flies in square circles.");
action.runWithLock(uploadCursor);
}
@Test @Test
public void testRunWithLock_copiesOnGcs() throws Exception { public void testRunWithLock_copiesOnGcs() throws Exception {
int port = sftpd.serve("user", "password", folder.getRoot()); int port = sftpd.serve("user", "password", folder.getRoot());