mirror of
https://github.com/google/nomulus.git
synced 2025-07-25 12:08:36 +02:00
Fix a subtle issue in BRDA copy caused by Cloud Tasks (#1556)
* Fix a subtle issue in BRDA copy caused by Cloud Tasks After the Cloud Tasks migration and #1508, the BRDA copy job now routinely fail on the first try because the revision update is not commited by the time the Cloud Tasks job enqueued in the same transaction runs for the first time. This is because the enqueueing is a side effect and not part of the transaction. The job eventually succeeds because of retries. This PR attempts to mitigate the initial failure by adding a delay to the enqueued job, and checking the cursor in the job itself to prevent it from running before the transaction is commited.
This commit is contained in:
parent
2a2652c7f5
commit
917e8d5a1e
6 changed files with 80 additions and 19 deletions
|
@ -68,6 +68,7 @@ import org.apache.beam.sdk.values.PDone;
|
|||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||
import org.bouncycastle.openpgp.PGPPublicKey;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
public class RdeIO {
|
||||
|
||||
|
@ -261,6 +262,7 @@ public class RdeIO {
|
|||
|
||||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||
private static final long serialVersionUID = 5822176227753327224L;
|
||||
private static final Duration ENQUEUE_DELAY = Duration.standardMinutes(1);
|
||||
|
||||
private final CloudTasksUtils cloudTasksUtils;
|
||||
|
||||
|
@ -271,6 +273,7 @@ public class RdeIO {
|
|||
@ProcessElement
|
||||
public void processElement(
|
||||
@Element KV<PendingDeposit, Integer> input, PipelineOptions options) {
|
||||
|
||||
tm().transact(
|
||||
() -> {
|
||||
PendingDeposit key = input.getKey();
|
||||
|
@ -296,26 +299,30 @@ public class RdeIO {
|
|||
tm().put(Cursor.create(key.cursor(), newPosition, registry));
|
||||
logger.atInfo().log(
|
||||
"Rolled forward %s on %s cursor to %s.", key.cursor(), key.tld(), newPosition);
|
||||
RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), revision);
|
||||
RdeRevision.saveRevision(key.tld(), key.watermark(), key.mode(), input.getValue());
|
||||
// Enqueueing a task is a side effect that is not undone if the transaction rolls
|
||||
// back. So this may result in multiple copies of the same task being processed.
|
||||
// This is fine because the RdeUploadAction is guarded by a lock and tracks progress
|
||||
// by cursor. The BrdaCopyAction writes a file to GCS, which is an atomic action.
|
||||
// by cursor. The BrdaCopyAction writes a file to GCS, which is an atomic action. It
|
||||
// is also guarded by a cursor to not run before the cursor is updated. We also
|
||||
// include a delay to minimize the chance that the enqueued job executes before the
|
||||
// transaction is committed, which triggers a retry.
|
||||
if (key.mode() == RdeMode.FULL) {
|
||||
cloudTasksUtils.enqueue(
|
||||
RDE_UPLOAD_QUEUE,
|
||||
cloudTasksUtils.createPostTask(
|
||||
cloudTasksUtils.createPostTaskWithDelay(
|
||||
RdeUploadAction.PATH,
|
||||
Service.BACKEND.getServiceId(),
|
||||
ImmutableMultimap.of(
|
||||
RequestParameters.PARAM_TLD,
|
||||
key.tld(),
|
||||
RdeModule.PARAM_PREFIX,
|
||||
options.getJobName() + '/')));
|
||||
options.getJobName() + '/'),
|
||||
ENQUEUE_DELAY));
|
||||
} else {
|
||||
cloudTasksUtils.enqueue(
|
||||
BRDA_QUEUE,
|
||||
cloudTasksUtils.createPostTask(
|
||||
cloudTasksUtils.createPostTaskWithDelay(
|
||||
BrdaCopyAction.PATH,
|
||||
Service.BACKEND.getServiceId(),
|
||||
ImmutableMultimap.of(
|
||||
|
@ -324,7 +331,8 @@ public class RdeIO {
|
|||
RdeModule.PARAM_WATERMARK,
|
||||
key.watermark().toString(),
|
||||
RdeModule.PARAM_PREFIX,
|
||||
options.getJobName() + '/')));
|
||||
options.getJobName() + '/'),
|
||||
ENQUEUE_DELAY));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -14,8 +14,13 @@
|
|||
|
||||
package google.registry.rde;
|
||||
|
||||
import static google.registry.model.common.Cursor.CursorType.BRDA;
|
||||
import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
|
||||
import static google.registry.model.rde.RdeMode.THIN;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
||||
import static google.registry.persistence.transaction.TransactionManagerUtil.transactIfJpaTm;
|
||||
import static google.registry.request.Action.Method.POST;
|
||||
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
|
||||
|
||||
import com.google.cloud.storage.BlobId;
|
||||
import com.google.common.flogger.FluentLogger;
|
||||
|
@ -23,9 +28,11 @@ import com.google.common.io.ByteStreams;
|
|||
import google.registry.config.RegistryConfig.Config;
|
||||
import google.registry.gcs.GcsUtils;
|
||||
import google.registry.keyring.api.KeyModule.Key;
|
||||
import google.registry.model.common.Cursor;
|
||||
import google.registry.model.rde.RdeNamingUtils;
|
||||
import google.registry.model.rde.RdeRevision;
|
||||
import google.registry.request.Action;
|
||||
import google.registry.request.HttpException.NoContentException;
|
||||
import google.registry.request.Parameter;
|
||||
import google.registry.request.RequestParameters;
|
||||
import google.registry.request.auth.Auth;
|
||||
|
@ -89,6 +96,16 @@ public final class BrdaCopyAction implements Runnable {
|
|||
private void copyAsRyde() throws IOException {
|
||||
// TODO(b/217772483): consider guarding this action with a lock and check if there is work.
|
||||
// Not urgent since file writes on GCS are atomic.
|
||||
Optional<Cursor> cursor =
|
||||
transactIfJpaTm(() -> tm().loadByKeyIfPresent(Cursor.createVKey(BRDA, tld)));
|
||||
DateTime brdaCursorTime = getCursorTimeOrStartOfTime(cursor);
|
||||
if (isBeforeOrAt(brdaCursorTime, watermark)) {
|
||||
throw new NoContentException(
|
||||
String.format(
|
||||
"Waiting on RdeStagingAction for TLD %s to copy BRDA deposit for %s to GCS; "
|
||||
+ "last BRDA staging completion was before %s",
|
||||
tld, watermark, brdaCursorTime));
|
||||
}
|
||||
int revision =
|
||||
RdeRevision.getCurrentRevision(tld, watermark, THIN)
|
||||
.orElseThrow(
|
||||
|
|
|
@ -64,6 +64,7 @@ public final class RdeStagingReducer extends Reducer<PendingDeposit, DepositFrag
|
|||
private static final long serialVersionUID = 60326234579091203L;
|
||||
|
||||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||
private static final Duration ENQUEUE_DELAY = Duration.standardMinutes(1);
|
||||
|
||||
private final CloudTasksUtils cloudTasksUtils;
|
||||
private final LockHandler lockHandler;
|
||||
|
@ -202,6 +203,14 @@ public final class RdeStagingReducer extends Reducer<PendingDeposit, DepositFrag
|
|||
logger.atInfo().log("Manual operation; not advancing cursor or enqueuing upload task.");
|
||||
return;
|
||||
}
|
||||
// We need to save the revision in a separate transaction because the subsequent upload/copy
|
||||
// action reads the most current revision from the database. If it is done in the same
|
||||
// transaction with the enqueueing, the action might start running before the transaction is
|
||||
// committed, due to Cloud Tasks not being transaction aware, unlike Task Queue. The downside
|
||||
// is that if for some reason the second transaction is rolled back, the revision update is not
|
||||
// undone. But this should be fine since the next run will just increment the revision and start
|
||||
// over.
|
||||
tm().transact(() -> RdeRevision.saveRevision(tld, watermark, mode, revision));
|
||||
tm().transact(
|
||||
() -> {
|
||||
Registry registry = Registry.get(tld);
|
||||
|
@ -225,29 +234,33 @@ public final class RdeStagingReducer extends Reducer<PendingDeposit, DepositFrag
|
|||
tm().put(Cursor.create(key.cursor(), newPosition, registry));
|
||||
logger.atInfo().log(
|
||||
"Rolled forward %s on %s cursor to %s.", key.cursor(), tld, newPosition);
|
||||
RdeRevision.saveRevision(tld, watermark, mode, revision);
|
||||
// Enqueueing a task is a side effect that is not undone if the transaction rolls
|
||||
// back. So this may result in multiple copies of the same task being processed. This
|
||||
// is fine because the RdeUploadAction is guarded by a lock and tracks progress by
|
||||
// cursor. The BrdaCopyAction writes a file to GCS, which is an atomic action.
|
||||
// back. So this may result in multiple copies of the same task being processed.
|
||||
// This is fine because the RdeUploadAction is guarded by a lock and tracks progress
|
||||
// by cursor. The BrdaCopyAction writes a file to GCS, which is an atomic action. It
|
||||
// is also guarded by a cursor to not run before the cursor is updated. We also
|
||||
// include a delay to minimize the chance that the enqueued job executes before the
|
||||
// transaction is committed, which triggers a retry.
|
||||
if (mode == RdeMode.FULL) {
|
||||
cloudTasksUtils.enqueue(
|
||||
"rde-upload",
|
||||
cloudTasksUtils.createPostTask(
|
||||
cloudTasksUtils.createPostTaskWithDelay(
|
||||
RdeUploadAction.PATH,
|
||||
Service.BACKEND.toString(),
|
||||
ImmutableMultimap.of(RequestParameters.PARAM_TLD, tld)));
|
||||
ImmutableMultimap.of(RequestParameters.PARAM_TLD, tld),
|
||||
ENQUEUE_DELAY));
|
||||
} else {
|
||||
cloudTasksUtils.enqueue(
|
||||
"brda",
|
||||
cloudTasksUtils.createPostTask(
|
||||
cloudTasksUtils.createPostTaskWithDelay(
|
||||
BrdaCopyAction.PATH,
|
||||
Service.BACKEND.toString(),
|
||||
ImmutableMultimap.of(
|
||||
RequestParameters.PARAM_TLD,
|
||||
tld,
|
||||
RdeModule.PARAM_WATERMARK,
|
||||
watermark.toString())));
|
||||
watermark.toString()),
|
||||
ENQUEUE_DELAY));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -148,7 +148,7 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
|
|||
throw new NoContentException(
|
||||
String.format(
|
||||
"Waiting on RdeStagingAction for TLD %s to send %s upload; "
|
||||
+ "last RDE staging completion was at %s",
|
||||
+ "last RDE staging completion was before %s",
|
||||
tld, watermark, stagingCursorTime));
|
||||
}
|
||||
DateTime sftpCursorTime =
|
||||
|
|
|
@ -16,9 +16,13 @@ package google.registry.rde;
|
|||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static com.google.common.truth.Truth.assertWithMessage;
|
||||
import static google.registry.model.common.Cursor.CursorType.BRDA;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
||||
import static google.registry.testing.DatabaseHelper.createTld;
|
||||
import static google.registry.testing.DatabaseHelper.persistResource;
|
||||
import static google.registry.testing.SystemInfo.hasCommand;
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assumptions.assumeTrue;
|
||||
|
||||
import com.google.cloud.storage.BlobId;
|
||||
|
@ -28,8 +32,11 @@ import com.google.common.io.CharStreams;
|
|||
import com.google.common.io.Files;
|
||||
import google.registry.gcs.GcsUtils;
|
||||
import google.registry.keyring.api.Keyring;
|
||||
import google.registry.model.common.Cursor;
|
||||
import google.registry.model.rde.RdeMode;
|
||||
import google.registry.model.rde.RdeRevision;
|
||||
import google.registry.model.tld.Registry;
|
||||
import google.registry.request.HttpException.NoContentException;
|
||||
import google.registry.testing.AppEngineExtension;
|
||||
import google.registry.testing.BouncyCastleProviderExtension;
|
||||
import google.registry.testing.FakeKeyringModule;
|
||||
|
@ -104,6 +111,7 @@ public class BrdaCopyActionTest {
|
|||
|
||||
@BeforeEach
|
||||
void beforeEach() throws Exception {
|
||||
createTld("lol");
|
||||
action.gcsUtils = gcsUtils;
|
||||
action.tld = "lol";
|
||||
action.watermark = DateTime.parse("2010-10-17TZ");
|
||||
|
@ -116,6 +124,20 @@ public class BrdaCopyActionTest {
|
|||
() -> {
|
||||
RdeRevision.saveRevision("lol", DateTime.parse("2010-10-17TZ"), RdeMode.THIN, 0);
|
||||
});
|
||||
persistResource(Cursor.create(BRDA, action.watermark.plusDays(1), Registry.get("lol")));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {"", "job-name/"})
|
||||
void testRun_stagingNotFinished_throws204(String prefix) throws Exception {
|
||||
persistResource(Cursor.create(BRDA, action.watermark, Registry.get("lol")));
|
||||
NoContentException thrown = assertThrows(NoContentException.class, () -> runAction(prefix));
|
||||
assertThat(thrown)
|
||||
.hasMessageThat()
|
||||
.isEqualTo(
|
||||
"Waiting on RdeStagingAction for TLD lol to copy BRDA deposit for"
|
||||
+ " 2010-10-17T00:00:00.000Z to GCS; last BRDA staging completion was before"
|
||||
+ " 2010-10-17T00:00:00.000Z");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
|
|
@ -403,7 +403,7 @@ public class RdeUploadActionTest {
|
|||
.hasMessageThat()
|
||||
.isEqualTo(
|
||||
"Waiting on RdeStagingAction for TLD tld to send 2010-10-17T00:00:00.000Z upload; last"
|
||||
+ " RDE staging completion was at 1970-01-01T00:00:00.000Z");
|
||||
+ " RDE staging completion was before 1970-01-01T00:00:00.000Z");
|
||||
cloudTasksHelper.assertNoTasksEnqueued("rde-upload");
|
||||
assertThat(folder.list()).isEmpty();
|
||||
}
|
||||
|
@ -420,7 +420,7 @@ public class RdeUploadActionTest {
|
|||
.hasMessageThat()
|
||||
.isEqualTo(
|
||||
"Waiting on RdeStagingAction for TLD tld to send 2010-10-17T00:00:00.000Z upload; "
|
||||
+ "last RDE staging completion was at 2010-10-17T00:00:00.000Z");
|
||||
+ "last RDE staging completion was before 2010-10-17T00:00:00.000Z");
|
||||
}
|
||||
|
||||
@TestOfyAndSql
|
||||
|
@ -437,8 +437,9 @@ public class RdeUploadActionTest {
|
|||
assertThat(thrown)
|
||||
.hasMessageThat()
|
||||
.isEqualTo(
|
||||
"Waiting on 120 minute SFTP cooldown for TLD tld to send 2010-10-17T00:00:00.000Z "
|
||||
+ "upload; last upload attempt was at 2010-10-16T22:23:00.000Z (97 minutes ago)");
|
||||
"Waiting on 120 minute SFTP cooldown for TLD tld to send 2010-10-17T00:00:00.000Z"
|
||||
+ " upload; last upload attempt was at 2010-10-16T22:23:00.000Z (97 minutes"
|
||||
+ " ago)");
|
||||
}
|
||||
|
||||
private String slurp(InputStream is) throws IOException {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue