Cut RegistryCursor over to global cursors

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=125797729
This commit is contained in:
ctingue 2016-06-24 11:12:46 -07:00 committed by Ben McIlwain
parent 6fa1c2d91c
commit 14522eac0c
18 changed files with 195 additions and 152 deletions

View file

@ -18,9 +18,9 @@ import static google.registry.model.ofy.ObjectifyService.ofy;
import com.googlecode.objectify.VoidWork;
import google.registry.model.common.Cursor;
import google.registry.model.common.Cursor.CursorType;
import google.registry.model.registry.Registry;
import google.registry.model.registry.RegistryCursor;
import google.registry.model.registry.RegistryCursor.CursorType;
import google.registry.model.server.Lock;
import google.registry.request.HttpException.NoContentException;
import google.registry.request.HttpException.ServiceUnavailableException;
@ -50,13 +50,13 @@ import javax.inject.Inject;
* {@link NoContentException} is thrown to cancel the task.
*
* <p>The specific date for which the deposit is generated depends on the current position of the
* {@link RegistryCursor}. If the cursor is set to tomorrow, we do nothing and return 204 No
* Content. If the cursor is set to today, then we create a deposit for today and advance the
* cursor. If the cursor is set to yesterday or earlier, then we create a deposit for that date,
* advance the cursor, but we <i>do not</i> make any attempt to catch the cursor up to the current
* time. Therefore <b>you must</b> set the cron interval to something less than the desired
* interval, so the cursor can catch up. For example, if the task is supposed to run daily, you
* should configure cron to execute it every twelve hours, or possibly less.
* {@link Cursor}. If the cursor is set to tomorrow, we do nothing and return 204 No Content. If the
* cursor is set to today, then we create a deposit for today and advance the cursor. If the cursor
* is set to yesterday or earlier, then we create a deposit for that date, advance the cursor, but
* we <i>do not</i> make any attempt to catch the cursor up to the current time. Therefore <b>you
* must</b> set the cron interval to something less than the desired interval, so the cursor can
* catch up. For example, if the task is supposed to run daily, you should configure cron to execute
* it every twelve hours, or possibly less.
*/
class EscrowTaskRunner {
@ -97,7 +97,8 @@ class EscrowTaskRunner {
public Void call() throws Exception {
logger.info("tld=" + registry.getTld());
DateTime startOfToday = clock.nowUtc().withTimeAtStartOfDay();
final DateTime nextRequiredRun = RegistryCursor.load(registry, cursorType).or(startOfToday);
Cursor cursor = ofy().load().key(Cursor.createKey(cursorType, registry)).now();
final DateTime nextRequiredRun = (cursor == null ? startOfToday : cursor.getCursorTime());
if (nextRequiredRun.isAfter(startOfToday)) {
throw new NoContentException("Already completed");
}
@ -106,7 +107,8 @@ class EscrowTaskRunner {
ofy().transact(new VoidWork() {
@Override
public void vrun() {
RegistryCursor.save(registry, cursorType, nextRequiredRun.plus(interval));
ofy().save().entity(
Cursor.create(cursorType, nextRequiredRun.plus(interval), registry));
}});
return null;
}};

View file

@ -16,8 +16,8 @@ package google.registry.rde;
import com.google.auto.value.AutoValue;
import google.registry.model.common.Cursor.CursorType;
import google.registry.model.rde.RdeMode;
import google.registry.model.registry.RegistryCursor.CursorType;
import org.joda.time.DateTime;
import org.joda.time.Duration;

View file

@ -18,18 +18,17 @@ import static com.google.common.base.Preconditions.checkArgument;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSetMultimap;
import com.googlecode.objectify.Work;
import google.registry.config.ConfigModule.Config;
import google.registry.model.common.Cursor;
import google.registry.model.common.Cursor.CursorType;
import google.registry.model.rde.RdeMode;
import google.registry.model.registry.Registries;
import google.registry.model.registry.Registry;
import google.registry.model.registry.Registry.TldType;
import google.registry.model.registry.RegistryCursor;
import google.registry.model.registry.RegistryCursor.CursorType;
import google.registry.util.Clock;
import org.joda.time.DateTime;
@ -83,7 +82,7 @@ public final class PendingDepositChecker {
}
private ImmutableSetMultimap<String, PendingDeposit> getTldsAndWatermarksPendingDeposit(
RdeMode mode, CursorType cursor, Duration interval, DateTime startingPoint) {
RdeMode mode, CursorType cursorType, Duration interval, DateTime startingPoint) {
checkArgument(interval.isLongerThan(Duration.ZERO));
ImmutableSetMultimap.Builder<String, PendingDeposit> builder =
new ImmutableSetMultimap.Builder<>();
@ -94,16 +93,14 @@ public final class PendingDepositChecker {
continue;
}
// Avoid creating a transaction unless absolutely necessary.
Optional<DateTime> cursorValue = RegistryCursor.load(registry, cursor);
if (isBeforeOrAt(cursorValue.or(startingPoint), now)) {
DateTime watermark;
if (cursorValue.isPresent()) {
watermark = cursorValue.get();
} else {
watermark = transactionallyInitializeCursor(registry, cursor, startingPoint);
}
Cursor cursor = ofy().load().key(Cursor.createKey(cursorType, registry)).now();
DateTime cursorValue = (cursor != null ? cursor.getCursorTime() : startingPoint);
if (isBeforeOrAt(cursorValue, now)) {
DateTime watermark = (cursor != null
? cursor.getCursorTime()
: transactionallyInitializeCursor(registry, cursorType, startingPoint));
if (isBeforeOrAt(watermark, now)) {
builder.put(tld, PendingDeposit.create(tld, watermark, mode, cursor, interval));
builder.put(tld, PendingDeposit.create(tld, watermark, mode, cursorType, interval));
}
}
}
@ -112,15 +109,16 @@ public final class PendingDepositChecker {
private DateTime transactionallyInitializeCursor(
final Registry registry,
final CursorType cursor,
final CursorType cursorType,
final DateTime initialValue) {
return ofy().transact(new Work<DateTime>() {
@Override
public DateTime run() {
for (DateTime value : RegistryCursor.load(registry, cursor).asSet()) {
return value;
Cursor cursor = ofy().load().key(Cursor.createKey(cursorType, registry)).now();
if (cursor != null) {
return cursor.getCursorTime();
}
RegistryCursor.save(registry, cursor, initialValue);
ofy().save().entity(Cursor.create(cursorType, initialValue, registry));
return initialValue;
}});
}

View file

@ -16,9 +16,10 @@ package google.registry.rde;
import static com.google.common.base.Verify.verify;
import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8;
import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.model.rde.RdeMode.FULL;
import static google.registry.request.Action.Method.POST;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import com.google.appengine.tools.cloudstorage.GcsFilename;
import com.google.common.io.ByteStreams;
@ -26,10 +27,10 @@ import com.google.common.io.ByteStreams;
import google.registry.config.ConfigModule.Config;
import google.registry.gcs.GcsUtils;
import google.registry.keyring.api.KeyModule.Key;
import google.registry.model.common.Cursor;
import google.registry.model.common.Cursor.CursorType;
import google.registry.model.rde.RdeNamingUtils;
import google.registry.model.registry.Registry;
import google.registry.model.registry.RegistryCursor;
import google.registry.model.registry.RegistryCursor.CursorType;
import google.registry.rde.EscrowTaskRunner.EscrowTask;
import google.registry.request.Action;
import google.registry.request.HttpException.NoContentException;
@ -77,10 +78,11 @@ public final class RdeReportAction implements Runnable, EscrowTask {
@Override
public void runWithLock(DateTime watermark) throws Exception {
DateTime stagingCursor =
RegistryCursor.load(Registry.get(tld), CursorType.RDE_UPLOAD).or(START_OF_TIME);
if (!stagingCursor.isAfter(watermark)) {
logger.infofmt("tld=%s reportCursor=%s uploadCursor=%s", tld, watermark, stagingCursor);
DateTime cursorTime =
getCursorTimeOrStartOfTime(
ofy().load().key(Cursor.createKey(CursorType.RDE_UPLOAD, Registry.get(tld))).now());
if (!cursorTime.isAfter(watermark)) {
logger.infofmt("tld=%s reportCursor=%s uploadCursor=%s", tld, watermark, cursorTime);
throw new NoContentException("Waiting for RdeUploadAction to complete");
}
String prefix = RdeNamingUtils.makeRydeFilename(tld, watermark, FULL, 1, 0);

View file

@ -27,13 +27,13 @@ import google.registry.mapreduce.MapreduceRunner;
import google.registry.mapreduce.inputs.EppResourceInputs;
import google.registry.mapreduce.inputs.NullInput;
import google.registry.model.EppResource;
import google.registry.model.common.Cursor;
import google.registry.model.common.Cursor.CursorType;
import google.registry.model.contact.ContactResource;
import google.registry.model.host.HostResource;
import google.registry.model.index.EppResourceIndex;
import google.registry.model.rde.RdeMode;
import google.registry.model.registrar.Registrar;
import google.registry.model.registry.RegistryCursor;
import google.registry.model.registry.RegistryCursor.CursorType;
import google.registry.request.Action;
import google.registry.request.Response;
import google.registry.util.Clock;
@ -89,8 +89,8 @@ import javax.inject.Inject;
* key and shows you its representation in lenient XML.
*
* <p>Failed deposits will be retried indefinitely. This is because RDE and BRDA each have a
* {@link RegistryCursor} for each TLD. Even if the cursor lags for days, it'll catch up gradually
* on its own, once the data becomes valid.
* {@link Cursor} for each TLD. Even if the cursor lags for days, it'll catch up gradually on its
* own, once the data becomes valid.
*
* <p>The third-party escrow provider will validate each deposit we send them. They do both schema
* validation and reference checking.

View file

@ -18,6 +18,7 @@ import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl;
import static com.google.appengine.tools.cloudstorage.GcsServiceFactory.createGcsService;
import static com.google.common.base.Verify.verify;
import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.nio.charset.StandardCharsets.UTF_8;
@ -33,11 +34,11 @@ import google.registry.config.ConfigModule.Config;
import google.registry.gcs.GcsUtils;
import google.registry.keyring.api.KeyModule;
import google.registry.keyring.api.PgpHelper;
import google.registry.model.common.Cursor;
import google.registry.model.rde.RdeMode;
import google.registry.model.rde.RdeNamingUtils;
import google.registry.model.rde.RdeRevision;
import google.registry.model.registry.Registry;
import google.registry.model.registry.RegistryCursor;
import google.registry.model.server.Lock;
import google.registry.request.RequestParameters;
import google.registry.tldconfig.idn.IdnTableEnum;
@ -201,7 +202,8 @@ public final class RdeStagingReducer extends Reducer<PendingDeposit, DepositFrag
@Override
public void vrun() {
Registry registry = Registry.get(tld);
DateTime position = RegistryCursor.load(registry, key.cursor()).get();
DateTime position = getCursorTimeOrStartOfTime(
ofy().load().key(Cursor.createKey(key.cursor(), registry)).now());
DateTime newPosition = key.watermark().plus(key.interval());
if (!position.isBefore(newPosition)) {
logger.warning("Cursor has already been rolled forward.");
@ -209,7 +211,7 @@ public final class RdeStagingReducer extends Reducer<PendingDeposit, DepositFrag
}
verify(position.equals(key.watermark()),
"Partial ordering of RDE deposits broken: %s %s", position, key);
RegistryCursor.save(registry, key.cursor(), newPosition);
ofy().save().entity(Cursor.create(key.cursor(), newPosition, registry)).now();
logger.infofmt("Rolled forward %s on %s cursor to %s", key.cursor(), tld, newPosition);
RdeRevision.saveRevision(tld, watermark, mode, revision);
if (mode == RdeMode.FULL) {

View file

@ -18,10 +18,10 @@ import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl;
import static com.google.common.base.Verify.verify;
import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8;
import static com.jcraft.jsch.ChannelSftp.OVERWRITE;
import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.model.rde.RdeMode.FULL;
import static google.registry.request.Action.Method.POST;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
@ -35,11 +35,11 @@ import com.jcraft.jsch.JSch;
import google.registry.config.ConfigModule.Config;
import google.registry.gcs.GcsUtils;
import google.registry.keyring.api.KeyModule.Key;
import google.registry.model.common.Cursor;
import google.registry.model.common.Cursor.CursorType;
import google.registry.model.rde.RdeNamingUtils;
import google.registry.model.rde.RdeRevision;
import google.registry.model.registry.Registry;
import google.registry.model.registry.RegistryCursor;
import google.registry.model.registry.RegistryCursor.CursorType;
import google.registry.rde.EscrowTaskRunner.EscrowTask;
import google.registry.rde.JSchSshSession.JSchSshSessionFactory;
import google.registry.request.Action;
@ -119,17 +119,17 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
@Override
public void runWithLock(DateTime watermark) throws Exception {
DateTime stagingCursor =
RegistryCursor.load(Registry.get(tld), CursorType.RDE_STAGING).or(START_OF_TIME);
if (!stagingCursor.isAfter(watermark)) {
logger.infofmt("tld=%s uploadCursor=%s stagingCursor=%s", tld, watermark, stagingCursor);
DateTime stagingCursorTime = getCursorTimeOrStartOfTime(
ofy().load().key(Cursor.createKey(CursorType.RDE_STAGING, Registry.get(tld))).now());
if (!stagingCursorTime.isAfter(watermark)) {
logger.infofmt("tld=%s uploadCursor=%s stagingCursor=%s", tld, watermark, stagingCursorTime);
throw new ServiceUnavailableException("Waiting for RdeStagingAction to complete");
}
DateTime sftpCursor =
RegistryCursor.load(Registry.get(tld), CursorType.RDE_UPLOAD_SFTP).or(START_OF_TIME);
if (sftpCursor.plus(sftpCooldown).isAfter(clock.nowUtc())) {
DateTime sftpCursorTime = getCursorTimeOrStartOfTime(
ofy().load().key(Cursor.createKey(CursorType.RDE_UPLOAD_SFTP, Registry.get(tld))).now());
if (sftpCursorTime.plus(sftpCooldown).isAfter(clock.nowUtc())) {
// Fail the task good and hard so it retries until the cooldown passes.
logger.infofmt("tld=%s cursor=%s sftpCursor=%s", tld, watermark, sftpCursor);
logger.infofmt("tld=%s cursor=%s sftpCursor=%s", tld, watermark, sftpCursorTime);
throw new ServiceUnavailableException("SFTP cooldown has not yet passed");
}
int revision = RdeRevision.getNextRevision(tld, watermark, FULL) - 1;
@ -145,8 +145,10 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
ofy().transact(new VoidWork() {
@Override
public void vrun() {
RegistryCursor.save(
Registry.get(tld), CursorType.RDE_UPLOAD_SFTP, ofy().getTransactionTime());
Cursor cursor =
Cursor.create(
CursorType.RDE_UPLOAD_SFTP, ofy().getTransactionTime(), Registry.get(tld));
ofy().save().entity(cursor).now();
}
});
response.setContentType(PLAIN_TEXT_UTF_8);