// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.backup;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static google.registry.mapreduce.MapreduceRunner.PARAM_DRY_RUN;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.util.FormattingLogger.getLoggerForCallerClass;
import static google.registry.util.PipelineUtils.createJobPath;
import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
import com.google.appengine.tools.mapreduce.Mapper;
import com.google.appengine.tools.mapreduce.Reducer;
import com.google.appengine.tools.mapreduce.ReducerInput;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultiset;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.Work;
import google.registry.config.RegistryConfig.Config;
import google.registry.mapreduce.MapreduceRunner;
import google.registry.mapreduce.inputs.CommitLogManifestInput;
import google.registry.mapreduce.inputs.EppResourceInputs;
import google.registry.model.EppResource;
import google.registry.model.ofy.CommitLogManifest;
import google.registry.model.ofy.CommitLogMutation;
import google.registry.model.translators.CommitLogRevisionsTranslatorFactory;
import google.registry.request.Action;
import google.registry.request.Parameter;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import google.registry.util.FormattingLogger;
import javax.inject.Inject;
import org.joda.time.DateTime;
import org.joda.time.Duration;
/**
* Task that garbage collects old {@link CommitLogManifest} entities.
*
*
Once commit logs have been written to GCS, we don't really need them in Datastore anymore,
* except to reconstruct point-in-time snapshots of the database. To make that possible, {@link
* EppResource}s have a {@link EppResource#getRevisions} method that returns the commit logs for
* older points in time. But that functionality is not useful after a certain amount of time, e.g.
* thirty days, so unneeded revisions are deleted
* (see {@link CommitLogRevisionsTranslatorFactory}). This leaves commit logs in the system that are
* unneeded (have no revisions pointing to them). So this task runs periodically to delete the
* "orphan" commit logs.
*
*
This action runs a mapreduce that goes over all existing {@link EppResource} and all {@link
* CommitLogManifest} older than commitLogDatastreRetention, and erases the commit logs aren't in an
* EppResource.
*
*/
@Action(
path = "/_dr/task/deleteOldCommitLogs",
auth = Auth.AUTH_INTERNAL_ONLY
)
public final class DeleteOldCommitLogsAction implements Runnable {
private static final int NUM_MAP_SHARDS = 20;
private static final int NUM_REDUCE_SHARDS = 10;
private static final FormattingLogger logger = getLoggerForCallerClass();
@Inject MapreduceRunner mrRunner;
@Inject Response response;
@Inject Clock clock;
@Inject @Config("commitLogDatastoreRetention") Duration maxAge;
@Inject @Parameter(PARAM_DRY_RUN) boolean isDryRun;
@Inject DeleteOldCommitLogsAction() {}
@Override
public void run() {
DateTime deletionThreshold = clock.nowUtc().minus(maxAge);
logger.infofmt(
"Processing asynchronous deletion of unreferenced CommitLogManifests older than %s",
deletionThreshold);
response.sendJavaScriptRedirect(createJobPath(mrRunner
.setJobName("Delete old commit logs")
.setModuleName("backend")
.setDefaultMapShards(NUM_MAP_SHARDS)
.setDefaultReduceShards(NUM_REDUCE_SHARDS)
.runMapreduce(
new DeleteOldCommitLogsMapper(),
new DeleteOldCommitLogsReducer(deletionThreshold, isDryRun),
ImmutableList.of(
new CommitLogManifestInput(deletionThreshold),
EppResourceInputs.createKeyInput(EppResource.class)))));
}
/**
* A mapper that iterates over all {@link EppResource} and {CommitLogManifest} entities.
*
*
It emits the target key and {@code false} for all revisions of each EppResources (meaning
* "don't delete this"), and {@code true} for all CommitLogRevisions (meaning "delete this").
*
*
The reducer will then delete all CommitLogRevisions that only have {@code true}.
*/
private static class DeleteOldCommitLogsMapper
extends Mapper, Key, Boolean> {
private static final long serialVersionUID = -1960845380164573834L;
private static final String KIND_MANIFEST = Key.getKind(CommitLogManifest.class);
@Override
public void map(final Key> key) {
// key is either a Key or a Key extends EppResource>.
//
// If it's a CommitLogManifest we just emit it as is (no need to load it).
if (key.getKind().equals(KIND_MANIFEST)) {
getContext().incrementCounter("old commit log manifests found");
// safe because we checked getKind
@SuppressWarnings("unchecked")
Key manifestKey = (Key) key;
emit(manifestKey, true);
return;
}
// If it isn't a Key then it should be an EppResource, which we need to
// load to emit the revisions.
//
// We want to make sure we retry any load individually to reduce the chance of the entire
// shard failing, hence we wrap it in a transactNew.
Object object = ofy().transactNew(new Work