diff --git a/java/google/registry/backup/BUILD b/java/google/registry/backup/BUILD index 8310802f3..b3334d755 100644 --- a/java/google/registry/backup/BUILD +++ b/java/google/registry/backup/BUILD @@ -10,6 +10,8 @@ java_library( deps = [ "//java/google/registry/config", "//java/google/registry/cron", + "//java/google/registry/mapreduce", + "//java/google/registry/mapreduce/inputs", "//java/google/registry/model", "//java/google/registry/request", "//java/google/registry/request/auth", @@ -17,6 +19,7 @@ java_library( "//third_party/java/objectify:objectify-v4_1", "@com_google_appengine_api_1_0_sdk", "@com_google_appengine_tools_appengine_gcs_client", + "@com_google_appengine_tools_appengine_mapreduce", "@com_google_code_findbugs_jsr305", "@com_google_dagger", "@com_google_guava", diff --git a/java/google/registry/backup/DeleteOldCommitLogsAction.java b/java/google/registry/backup/DeleteOldCommitLogsAction.java index fd61bcf85..33dcb6f7f 100644 --- a/java/google/registry/backup/DeleteOldCommitLogsAction.java +++ b/java/google/registry/backup/DeleteOldCommitLogsAction.java @@ -14,27 +14,31 @@ package google.registry.backup; -import static com.google.common.collect.ImmutableList.copyOf; -import static com.google.common.collect.Iterables.concat; -import static com.google.common.collect.Iterables.transform; -import static google.registry.model.ofy.CommitLogBucket.getBucketKey; -import static google.registry.request.Action.Method.POST; +import static google.registry.model.ofy.ObjectifyService.ofy; +import static google.registry.util.FormattingLogger.getLoggerForCallerClass; +import static google.registry.util.PipelineUtils.createJobPath; -import com.google.common.base.Function; +import com.google.appengine.tools.mapreduce.Mapper; +import com.google.appengine.tools.mapreduce.Reducer; +import com.google.appengine.tools.mapreduce.ReducerInput; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; import com.googlecode.objectify.Key; import com.googlecode.objectify.Work; -import com.googlecode.objectify.cmd.Loader; -import com.googlecode.objectify.cmd.Query; import google.registry.config.RegistryConfig.Config; -import google.registry.model.ofy.CommitLogBucket; +import google.registry.mapreduce.MapreduceRunner; +import google.registry.mapreduce.inputs.CommitLogManifestInput; +import google.registry.mapreduce.inputs.EppResourceInputs; +import google.registry.model.EppResource; +import google.registry.model.ImmutableObject; import google.registry.model.ofy.CommitLogManifest; import google.registry.model.ofy.CommitLogMutation; -import google.registry.model.ofy.Ofy; +import google.registry.model.translators.CommitLogRevisionsTranslatorFactory; import google.registry.request.Action; -import google.registry.request.Parameter; +import google.registry.request.Response; import google.registry.util.Clock; import google.registry.util.FormattingLogger; -import java.util.List; import javax.inject.Inject; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -43,114 +47,124 @@ import org.joda.time.Duration; * Task that garbage collects old {@link CommitLogManifest} entities. * *

Once commit logs have been written to GCS, we don't really need them in Datastore anymore, - * except to reconstruct point-in-time snapshots of the database. But that functionality is not - * useful after a certain amount of time, e.g. thirty days. So this task runs periodically to delete - * the old data. + * except to reconstruct point-in-time snapshots of the database. To make that possible, {@link + * EppResource}s have a {@link EppResource#getRevisions} method that returns the commit logs for + * older points in time. But that functionality is not useful after a certain amount of time, e.g. + * thirty days, so unneeded revisions are deleted + * (see {@link CommitLogRevisionsTranslatorFactory}). This leaves commit logs in the system that are + * unneeded (have no revisions pointing to them). So this task runs periodically to delete the + * "orphan" commit logs. * - *

This task should be invoked in a fanout style for each {@link CommitLogBucket} ID. It then - * queries {@code CommitLogManifest} entities older than the threshold, using an ancestor query - * operating under the assumption under the assumption that the ID is the transaction timestamp in - * milliseconds since the UNIX epoch. It then deletes them inside a transaction, along with their - * associated {@link CommitLogMutation} entities. + *

This action runs a mapreduce that goes over all existing {@link EppResource} and all {@link + * CommitLogManifest} older than commitLogDatastreRetention, and erases the commit logs aren't in an + * EppResource. * - *

If additional data is leftover, we show a warning at the INFO level, because it's not - * actionable. If anything, it just shows that the system was under high load thirty days ago, and - * therefore serves little use as an early warning to increase the number of buckets. - * - *

Before running, this task will perform an eventually consistent count query outside of a - * transaction to see how much data actually exists to delete. If it's less than a tenth of {@link - * #maxDeletes}, then we don't bother running the task. This is to minimize contention on the bucket - * and avoid wasting resources. - * - *

Dimensioning

- * - *

This entire operation operates on a single entity group, within a single transaction. Since - * there's a 10mB upper bound on transaction size and a four minute time limit, we can only delete - * so many commit logs at once. So given the above constraints, five hundred would make a safe - * default value for {@code maxDeletes}. See {@linkplain - * google.registry.config.RegistryConfig.ConfigModule#provideCommitLogMaxDeletes() - * commitLogMaxDeletes} for further documentation on this matter. - * - *

Finally, we need to pick an appropriate cron interval time for this task. Since a bucket - * represents a single Datastore entity group, it's only guaranteed to have one transaction per - * second. So we just need to divide {@code maxDeletes} by sixty to get an appropriate minute - * interval. Assuming {@code maxDeletes} is five hundred, this rounds up to ten minutes, which we'll - * double, since this task can always catch up in off-peak hours. - * - *

There's little harm in keeping the data around a little longer, since this task is engaged in - * a zero-sum resource struggle with the EPP transactions. Each transaction we perform here, is one - * less transaction that's available to EPP. Furthermore, a well-administered system should have - * enough buckets that we'll never brush up against the 1/s entity group transaction SLA. */ -@Action(path = "/_dr/task/deleteOldCommitLogs", method = POST, automaticallyPrintOk = true) +@Action(path = "/_dr/task/deleteOldCommitLogs") public final class DeleteOldCommitLogsAction implements Runnable { - private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); + private static final int NUM_REDUCE_SHARDS = 10; + private static final FormattingLogger logger = getLoggerForCallerClass(); + @Inject MapreduceRunner mrRunner; + @Inject Response response; @Inject Clock clock; - @Inject Ofy ofy; - @Inject @Parameter("bucket") int bucketNum; @Inject @Config("commitLogDatastoreRetention") Duration maxAge; - @Inject @Config("commitLogMaxDeletes") int maxDeletes; @Inject DeleteOldCommitLogsAction() {} @Override public void run() { - if (!doesEnoughDataExistThatThisTaskIsWorthRunning()) { - return; - } - Integer deleted = ofy.transact(new Work() { - @Override - public Integer run() { - // Load at most maxDeletes manifest keys of commit logs older than the deletion threshold. - List> manifestKeys = - queryManifests(ofy.load()) - .limit(maxDeletes) - .keys() - .list(); - // transform() is lazy so copyOf() ensures all the subqueries happen in parallel, because - // the queries are launched by iterable(), put into a list, and then the list of iterables - // is consumed and concatenated. - ofy.deleteWithoutBackup().keys(concat(copyOf(transform(manifestKeys, - new Function, Iterable>>() { - @Override - public Iterable> apply(Key manifestKey) { - return ofy.load() - .type(CommitLogMutation.class) - .ancestor(manifestKey) - .keys() - .iterable(); // launches the query asynchronously - }})))); - ofy.deleteWithoutBackup().keys(manifestKeys); - return manifestKeys.size(); - }}); - if (deleted == maxDeletes) { - logger.infofmt("Additional old commit logs might exist in bucket %d", bucketNum); + DateTime deletionThreshold = clock.nowUtc().minus(maxAge); + logger.infofmt( + "Processing asynchronous deletion of unreferenced CommitLogManifests older than %s", + deletionThreshold); + + response.sendJavaScriptRedirect(createJobPath(mrRunner + .setJobName("Delete old commit logs") + .setModuleName("backend") + .setDefaultReduceShards(NUM_REDUCE_SHARDS) + .runMapreduce( + new DeleteOldCommitLogsMapper(), + new DeleteOldCommitLogsReducer(), + ImmutableList.of( + new CommitLogManifestInput(Optional.of(deletionThreshold)), + EppResourceInputs.createEntityInput(EppResource.class))))); + } + + /** + * A mapper that iterates over all {@link EppResource} and {CommitLogManifest} entities. + * + *

It emits the target key and {@code false} for all revisions of each EppResources (meaning + * "don't delete this"), and {@code true} for all CommitLogRevisions (meaning "delete this"). + * + *

The reducer will then delete all CommitLogRevisions that only have {@code true}. + */ + private static class DeleteOldCommitLogsMapper + extends Mapper, Boolean> { + + private static final long serialVersionUID = -1960845380164573834L; + + @Override + public void map(ImmutableObject object) { + if (object instanceof EppResource) { + getContext().incrementCounter("Epp resources found"); + EppResource eppResource = (EppResource) object; + for (Key manifestKey : eppResource.getRevisions().values()) { + emit(manifestKey, false); + } + getContext() + .incrementCounter("Epp resource revisions found", eppResource.getRevisions().size()); + } else if (object instanceof CommitLogManifest) { + getContext().incrementCounter("old commit log manifests found"); + emit(Key.create((CommitLogManifest) object), true); + } else { + throw new IllegalArgumentException( + String.format( + "Received object of type %s, expected either EppResource or CommitLogManifest", + object.getClass().getName())); + } } } - /** Returns the point in time at which commit logs older than that point will be deleted. */ - private DateTime getDeletionThreshold() { - return clock.nowUtc().minus(maxAge); - } + /** + * Reducer that deletes unreferenced {@link CommitLogManifest} + child {@link CommitLogMutation}. + * + *

It receives the manifestKey to possibly delete, and a list of boolean 'verdicts' from + * various sources (the "old manifests" source and the "still referenced" source) on whether it's + * OK to delete this manifestKey. If even one source returns "false" (meaning "it's not OK to + * delete this manifest") then it won't be deleted. + */ + private static class DeleteOldCommitLogsReducer + extends Reducer, Boolean, Void> { - private boolean doesEnoughDataExistThatThisTaskIsWorthRunning() { - int tenth = Math.max(1, maxDeletes / 10); - int count = queryManifests(ofy.load()) - .limit(tenth) - .count(); - if (0 < count && count < tenth) { - logger.infofmt("Not enough old commit logs to bother running: %d < %d", count, tenth); + private static final long serialVersionUID = 6652129589918923333L; + + @Override + public void reduce( + final Key manifestKey, + ReducerInput canDeleteVerdicts) { + if (Iterators.contains(canDeleteVerdicts, false)) { + getContext().incrementCounter("old commit log manifests still referenced"); + return; + } + Integer deletedCount = ofy().transact(new Work() { + @Override + public Integer run() { + Iterable> commitLogMutationKeys = ofy().load() + .type(CommitLogMutation.class) + .ancestor(manifestKey) + .keys() + .iterable(); + ImmutableList> keysToDelete = ImmutableList.>builder() + .addAll(commitLogMutationKeys) + .add(manifestKey) + .build(); + ofy().deleteWithoutBackup().keys(keysToDelete); + return keysToDelete.size(); + } + }); + getContext().incrementCounter("old commit log manifests deleted"); + getContext().incrementCounter("total entities deleted", deletedCount); } - return count >= tenth; - } - - private Query queryManifests(Loader loader) { - long thresholdMillis = getDeletionThreshold().getMillis(); - Key bucketKey = getBucketKey(bucketNum); - return loader - .type(CommitLogManifest.class) - .ancestor(bucketKey) - .filterKey("<", Key.create(bucketKey, CommitLogManifest.class, thresholdMillis)); } } diff --git a/java/google/registry/config/RegistryConfig.java b/java/google/registry/config/RegistryConfig.java index a40e91632..7b836c185 100644 --- a/java/google/registry/config/RegistryConfig.java +++ b/java/google/registry/config/RegistryConfig.java @@ -212,27 +212,6 @@ public final class RegistryConfig { return projectId + "-domain-lists"; } - /** - * Maximum number of commit logs to delete per transaction. - * - *

If we assume that the average key size is 256 bytes and that each manifest has six - * mutations, we can do about 5,000 deletes in a single transaction before hitting the 10mB - * limit. Therefore 500 should be a safe number, since it's an order of a magnitude less space - * than we need. - * - *

Transactions also have a four minute time limit. Since we have to perform N subqueries to - * fetch mutation keys, 500 would be a safe number if those queries were performed in serial, - * since each query would have about 500ms to complete, which is an order a magnitude more time - * than we need. However this does not apply, since the subqueries are performed asynchronously. - * - * @see google.registry.backup.DeleteOldCommitLogsAction - */ - @Provides - @Config("commitLogMaxDeletes") - public static int provideCommitLogMaxDeletes() { - return 500; - } - /** * Batch size for the number of transactions' worth of commit log data to process at once when * exporting a commit log diff. diff --git a/javatests/google/registry/backup/BUILD b/javatests/google/registry/backup/BUILD index c9417ca44..fb70d2e0e 100644 --- a/javatests/google/registry/backup/BUILD +++ b/javatests/google/registry/backup/BUILD @@ -13,10 +13,10 @@ java_library( resources = glob(["testdata/*"]), deps = [ "//java/google/registry/backup", - "//java/google/registry/config", "//java/google/registry/model", "//java/google/registry/util", "//javatests/google/registry/testing", + "//javatests/google/registry/testing/mapreduce", "//third_party/java/objectify:objectify-v4_1", "@com_google_appengine_api_1_0_sdk//:testonly", "@com_google_appengine_tools_appengine_gcs_client", diff --git a/javatests/google/registry/backup/DeleteOldCommitLogsActionTest.java b/javatests/google/registry/backup/DeleteOldCommitLogsActionTest.java index 1c495d474..99f26888f 100644 --- a/javatests/google/registry/backup/DeleteOldCommitLogsActionTest.java +++ b/javatests/google/registry/backup/DeleteOldCommitLogsActionTest.java @@ -15,20 +15,21 @@ package google.registry.backup; import static com.google.common.truth.Truth.assertThat; -import static google.registry.config.RegistryConfig.getCommitLogBucketCount; import static google.registry.model.ofy.ObjectifyService.ofy; -import static org.joda.time.Duration.millis; -import com.googlecode.objectify.VoidWork; +import com.google.common.collect.ImmutableList; +import google.registry.model.contact.ContactResource; import google.registry.model.ofy.CommitLogManifest; import google.registry.model.ofy.CommitLogMutation; import google.registry.model.ofy.Ofy; -import google.registry.model.registrar.Registrar; -import google.registry.testing.AppEngineRule; -import google.registry.testing.ExceptionRule; +import google.registry.testing.DatastoreHelper; import google.registry.testing.FakeClock; +import google.registry.testing.FakeResponse; +import google.registry.testing.InjectRule; +import google.registry.testing.mapreduce.MapreduceTestCase; import org.joda.time.DateTime; import org.joda.time.Duration; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -36,137 +37,103 @@ import org.junit.runners.JUnit4; /** Unit tests for {@link DeleteOldCommitLogsAction}. */ @RunWith(JUnit4.class) -public class DeleteOldCommitLogsActionTest { - - @Rule - public final AppEngineRule appEngine = AppEngineRule.builder() - .withDatastore() - .build(); - - @Rule - public final ExceptionRule thrown = new ExceptionRule(); +public class DeleteOldCommitLogsActionTest + extends MapreduceTestCase { private final FakeClock clock = new FakeClock(DateTime.parse("2000-01-01TZ")); - private final Ofy ofy = new Ofy(clock); + private final FakeResponse response = new FakeResponse(); + private ContactResource contact; - private void runInAllBuckets(int maxDeletes) { - for (int bucketNum = 1; bucketNum <= getCommitLogBucketCount(); bucketNum++) { - DeleteOldCommitLogsAction task = new DeleteOldCommitLogsAction(); - task.bucketNum = bucketNum; - task.clock = clock; - task.maxAge = Duration.millis(2); - task.maxDeletes = maxDeletes; - task.ofy = ofy; - task.run(); + @Rule + public final InjectRule inject = new InjectRule(); + + @Before + public void setup() throws Exception { + inject.setStaticField(Ofy.class, "clock", clock); + action = new DeleteOldCommitLogsAction(); + action.mrRunner = makeDefaultRunner(); + action.response = response; + action.clock = clock; + action.maxAge = Duration.standardDays(30); + + ContactResource contact = DatastoreHelper.persistActiveContact("TheRegistrar"); + clock.advanceBy(Duration.standardDays(1)); + DatastoreHelper.persistResourceWithCommitLog(contact); + + prepareData(); + } + + private void runMapreduce(Duration maxAge) throws Exception { + action.maxAge = maxAge; + action.run(); + executeTasksUntilEmpty("mapreduce"); + ofy().clearSessionCache(); + } + + private void mutateContact(String email) { + ofy().clearSessionCache(); + ContactResource contact = ofy().load() + .type(ContactResource.class) + .first() + .now() + .asBuilder() + .setEmailAddress(email) + .build(); + DatastoreHelper.persistResourceWithCommitLog(contact); + } + + private void prepareData() { + + for (int i = 0; i < 10; i++) { + clock.advanceBy(Duration.standardDays(7)); + String email = String.format("pumpkin_%d@cat.test", i); + mutateContact(email); } + ofy().clearSessionCache(); + + contact = ofy().load().type(ContactResource.class).first().now(); + + // The following value might change if {@link CommitLogRevisionsTranslatorFactory} changes. + assertThat(contact.getRevisions().size()).isEqualTo(6); + + // Before deleting the unneeded manifests - we have 11 of them (one for the first + // creation, and 10 more for the mutateContacts) + assertThat(ofy().load().type(CommitLogManifest.class).count()).isEqualTo(11); + // And each DatastoreHelper.persistResourceWithCommitLog creates 3 mutations + assertThat(ofy().load().type(CommitLogMutation.class).count()).isEqualTo(33); } + /** + * Check that with very short maxAge, only the referenced elements remain. + */ @Test - public void testRun_noCommitLogs_doesNothing() throws Exception { - assertManifestAndMutationCounts(0, 0); - runInAllBuckets(4); - assertManifestAndMutationCounts(0, 0); + public void test_shortMaxAge() throws Exception { + runMapreduce(Duration.millis(1)); + + assertThat(ofy().load().type(CommitLogManifest.class).keys().iterable()) + .containsExactlyElementsIn(contact.getRevisions().values()); + + // And each DatastoreHelper.persistResourceWithCommitLog creates 3 mutations + assertThat(ofy().load().type(CommitLogMutation.class).keys().iterable()) + .hasSize(contact.getRevisions().size() * 3); } + /** + * Check that with very long maxAge, all the elements remain. + */ @Test - public void testRun_commitLogNewerThanThreshold_doesntGetDeleted() throws Exception { - createCommitLog(); - clock.advanceOneMilli(); - assertManifestAndMutationCounts(1, 2); - runInAllBuckets(4); - assertManifestAndMutationCounts(1, 2); - } + public void test_longMaxAge() throws Exception { - @Test - public void testRun_commitLogEqualToThreshold_doesntGetDeleted() throws Exception { - createCommitLog(); - clock.advanceBy(millis(2)); - runInAllBuckets(4); - assertManifestAndMutationCounts(1, 2); - } + ImmutableList initialManifests = + ImmutableList.copyOf(ofy().load().type(CommitLogManifest.class).iterable()); + ImmutableList initialMutations = + ImmutableList.copyOf(ofy().load().type(CommitLogMutation.class).iterable()); - @Test - public void testRun_commitLogOlderThanThreshold_getsDeleted() throws Exception { - createCommitLog(); - clock.advanceBy(millis(3)); - runInAllBuckets(4); - assertManifestAndMutationCounts(0, 0); - } + runMapreduce(Duration.standardDays(1000)); - @Test - public void testRun_oneOlderThanThresholdAndOneNewer_onlyOldOneIsDeleted() throws Exception { - createCommitLog(); - clock.advanceBy(millis(3)); - createCommitLog(); - assertManifestAndMutationCounts(2, 4); - runInAllBuckets(4); - assertManifestAndMutationCounts(1, 2); - } - - @Test - public void testRun_twoOlderThanThreshold_bothGetDeletedInSameTransaction() throws Exception { - createCommitLog(); - clock.advanceOneMilli(); - createCommitLog(); - clock.advanceBy(millis(3)); - assertManifestAndMutationCounts(2, 4); - runInAllBuckets(2); - assertManifestAndMutationCounts(0, 0); - } - - @Test - public void testRun_twoOlderThanThreshold_bothGetDeletedInTwoTransactions() throws Exception { - createCommitLog(); - clock.advanceOneMilli(); - createCommitLog(); - clock.advanceBy(millis(3)); - createCommitLog(); - assertManifestAndMutationCounts(3, 6); - runInAllBuckets(1); - runInAllBuckets(1); - assertManifestAndMutationCounts(1, 2); - } - - @Test - public void testRun_commitLogOlderButInADifferentBucket_doesntGetDeleted() throws Exception { - createCommitLog(); - clock.advanceBy(millis(31337)); - int usedBucketNum = ofy().load().type(CommitLogManifest.class).list().get(0).getBucketId(); - DeleteOldCommitLogsAction task = new DeleteOldCommitLogsAction(); - task.bucketNum = (usedBucketNum % getCommitLogBucketCount()) + 1; - task.clock = clock; - task.maxAge = Duration.millis(2); - task.maxDeletes = 20; - task.ofy = ofy; - task.run(); - assertManifestAndMutationCounts(1, 2); - } - - @Test - public void testRun_lessThanATenthOfOldData_doesntGetDeleted() throws Exception { - createCommitLog(); - clock.advanceBy(millis(2)); - runInAllBuckets(20); - assertManifestAndMutationCounts(1, 2); - } - - private void assertManifestAndMutationCounts(int manifestCount, int mutationCount) { - assertThat(ofy.load().type(CommitLogManifest.class).count()).isEqualTo(manifestCount); - assertThat(ofy.load().type(CommitLogMutation.class).count()).isEqualTo(mutationCount); - } - - private void createCommitLog() { - ofy.transact(new VoidWork() { - @Override - public void vrun() { - ofy.save().entity( - Registrar.loadByClientId("NewRegistrar").asBuilder() - .setEmailAddress("pumpkin@cat.test") - .build()); - ofy.save().entity( - Registrar.loadByClientId("TheRegistrar").asBuilder() - .setReferralUrl("http://justine.test") - .build()); - }}); + assertThat(ofy().load().type(CommitLogManifest.class).iterable()) + .containsExactlyElementsIn(initialManifests); + assertThat(ofy().load().type(CommitLogMutation.class).iterable()) + .containsExactlyElementsIn(initialMutations); } } diff --git a/javatests/google/registry/module/backend/testdata/backend_routing.txt b/javatests/google/registry/module/backend/testdata/backend_routing.txt index 1e9be530d..c02665e05 100644 --- a/javatests/google/registry/module/backend/testdata/backend_routing.txt +++ b/javatests/google/registry/module/backend/testdata/backend_routing.txt @@ -7,7 +7,7 @@ PATH CLASS METHOD /_dr/task/brdaCopy BrdaCopyAction POST y INTERNAL APP IGNORED /_dr/task/checkSnapshot CheckSnapshotAction POST,GET y INTERNAL APP IGNORED /_dr/task/deleteContactsAndHosts DeleteContactsAndHostsAction GET n INTERNAL APP IGNORED -/_dr/task/deleteOldCommitLogs DeleteOldCommitLogsAction POST y INTERNAL APP IGNORED +/_dr/task/deleteOldCommitLogs DeleteOldCommitLogsAction GET n INTERNAL APP IGNORED /_dr/task/deleteProberData DeleteProberDataAction POST n INTERNAL APP IGNORED /_dr/task/expandRecurringBillingEvents ExpandRecurringBillingEventsAction GET n INTERNAL APP IGNORED /_dr/task/exportCommitLogDiff ExportCommitLogDiffAction POST y INTERNAL APP IGNORED