Automated g4 rollback of changelist 199643208.

*** Reason for rollback ***

We suspect that this is breaking RDE more, so we're going to rollout a cherrypick of this reversion.

*** Original change description ***

Upload to GCS before uploading to FTP

Currently we encode and upload the deposite to GCS and the FTP server at the
same time. This makes debugging harder as there are many possible points of
failure, some of which are external and some internal.

In this CL we start by encoding + uploading the deposit to GCS, and once
that's done we copy the data from GCS to the FTP server. This will (hopefully)
allow us to distinguish between errors on the FTP server and errors with the
GCS connection.

***

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=201005260
This commit is contained in:
mcilwain 2018-06-18 09:12:38 -07:00 committed by Ben McIlwain
parent 4098487b80
commit 03f8090886
5 changed files with 181 additions and 43 deletions

View file

@ -24,6 +24,7 @@ import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.model.rde.RdeMode.FULL;
import static google.registry.request.Action.Method.POST;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.tools.cloudstorage.GcsFilename;
@ -52,6 +53,7 @@ import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import google.registry.util.Retrier;
import google.registry.util.TaskQueueUtils;
import google.registry.util.TeeOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@ -130,8 +132,7 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
@Override
public void runWithLock(final DateTime watermark) throws Exception {
logger.atInfo().log(
"Verifying readiness to upload the RDE deposit for tld=%s, watermark=%s.", tld, watermark);
logger.atInfo().log("Verifying readiness to upload the RDE deposit.");
DateTime stagingCursorTime = getCursorTimeOrStartOfTime(
ofy().load().key(Cursor.createKey(CursorType.RDE_STAGING, Registry.get(tld))).now());
if (!stagingCursorTime.isAfter(watermark)) {
@ -204,46 +205,34 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
protected void upload(
GcsFilename xmlFile, long xmlLength, DateTime watermark, String name) throws Exception {
logger.atInfo().log("Uploading XML file '%s' to remote path '%s'.", xmlFile, uploadUrl);
byte[] signature;
String rydeFilename = name + ".ryde";
String sigFilename = name + ".sig";
GcsFilename rydeGcsFilename = new GcsFilename(bucket, rydeFilename);
// Encode and save the files to GCS
try (InputStream gcsInput = gcsUtils.openInputStream(xmlFile);
Ghostryde.Decryptor decryptor = ghostryde.openDecryptor(gcsInput, stagingDecryptionKey);
Ghostryde.Decompressor decompressor = ghostryde.openDecompressor(decryptor);
Ghostryde.Input xmlInput = ghostryde.openInput(decompressor)) {
try (OutputStream gcsOutput = gcsUtils.openOutputStream(rydeGcsFilename);
RydePgpSigningOutputStream signer = pgpSigningFactory.create(gcsOutput, signingKey)) {
try (OutputStream encryptLayer = pgpEncryptionFactory.create(signer, receiverKey);
OutputStream kompressor = pgpCompressionFactory.create(encryptLayer);
OutputStream fileLayer = pgpFileFactory.create(kompressor, watermark, name + ".tar");
OutputStream tarLayer =
tarFactory.create(fileLayer, xmlLength, watermark, name + ".xml")) {
ByteStreams.copy(xmlInput, tarLayer);
try (JSchSshSession session = jschSshSessionFactory.create(lazyJsch.get(), uploadUrl);
JSchSftpChannel ftpChan = session.openSftpChannel()) {
byte[] signature;
String rydeFilename = name + ".ryde";
GcsFilename rydeGcsFilename = new GcsFilename(bucket, rydeFilename);
try (OutputStream ftpOutput = ftpChan.get().put(rydeFilename, OVERWRITE);
OutputStream gcsOutput = gcsUtils.openOutputStream(rydeGcsFilename);
TeeOutputStream teeOutput = new TeeOutputStream(asList(ftpOutput, gcsOutput));
RydePgpSigningOutputStream signer = pgpSigningFactory.create(teeOutput, signingKey)) {
try (OutputStream encryptLayer = pgpEncryptionFactory.create(signer, receiverKey);
OutputStream kompressor = pgpCompressionFactory.create(encryptLayer);
OutputStream fileLayer = pgpFileFactory.create(kompressor, watermark, name + ".tar");
OutputStream tarLayer =
tarFactory.create(fileLayer, xmlLength, watermark, name + ".xml")) {
ByteStreams.copy(xmlInput, tarLayer);
}
signature = signer.getSignature();
logger.atInfo().log("uploaded %,d bytes: %s.ryde", signer.getBytesWritten(), name);
}
signature = signer.getSignature();
logger.atInfo().log(
"uploaded %,d bytes to gcs bucket %s, file %s",
signer.getBytesWritten(), bucket, rydeFilename);
String sigFilename = name + ".sig";
gcsUtils.createFromBytes(new GcsFilename(bucket, sigFilename), signature);
ftpChan.get().put(new ByteArrayInputStream(signature), sigFilename);
logger.atInfo().log("uploaded %,d bytes: %s.sig", signature.length, name);
}
gcsUtils.createFromBytes(new GcsFilename(bucket, sigFilename), signature);
logger.atInfo().log(
"uploaded %,d bytes to gcs bucket %s, file %s", signature.length, bucket, sigFilename);
}
// Copy the file from GCS to the FTP server
try (JSchSshSession session = jschSshSessionFactory.create(lazyJsch.get(), uploadUrl);
JSchSftpChannel ftpChan = session.openSftpChannel()) {
try (OutputStream ftpOutput = ftpChan.get().put(rydeFilename, OVERWRITE);
InputStream gcsInput = gcsUtils.openInputStream(rydeGcsFilename)) {
long bytesCopied = ByteStreams.copy(gcsInput, ftpOutput);
logger.atInfo().log(
"uploaded %,d bytes to ftp path %s, file %s", bytesCopied, uploadUrl, rydeFilename);
}
ftpChan.get().put(new ByteArrayInputStream(signature), sigFilename);
logger.atInfo().log(
"uploaded %,d bytes to ftp path %s, file %s", signature.length, uploadUrl, sigFilename);
}
}