Delete mapreduce entity cleanup util

This is obsoleted by the upcoming Registry 3.0 migration, after which we will be
using neither the App Engine Mapreduce library nor Cloud Datastore.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=212864845
This commit is contained in:
mcilwain 2018-09-13 13:09:38 -07:00 committed by Ben McIlwain
parent e19a431fab
commit 8de36732cb
7 changed files with 0 additions and 1079 deletions

View file

@ -1,241 +0,0 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.batch;
import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8;
import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST;
import com.google.appengine.api.datastore.DatastoreService;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.FluentLogger;
import google.registry.batch.MapreduceEntityCleanupUtil.EligibleJobResults;
import google.registry.mapreduce.MapreduceRunner;
import google.registry.request.Action;
import google.registry.request.Parameter;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import java.util.Optional;
import java.util.Set;
import javax.inject.Inject;
import org.joda.time.DateTime;
/**
* Action to delete entities associated with the App Engine Mapreduce library.
*
* <p>To delete a specific job, set the jobId parameter. To delete all jobs with a specific job name
* which are older than the specified age, set the jobName parameter. Otherwise, all jobs older than
* the specified age are deleted. Examples:
*
* <ul>
* <li>jobId=12345: delete only the root pipeline job with ID 12345, and all descendant jobs
* <li>jobName=Generate+Important+Files: delete all root pipeline jobs with the display name
* "Generate Important Files" (subject to the limits imposed by the daysOld and numJobsToDelete
* parameters), and all descendant jobs
* <li>(neither specified): delete all jobs (subject to the limits imposed by the daysOld and
* numJobsToDelete parameters)
* </ul>
*
* <p>More about display names: The pipeline library assigns each root pipeline job a "display
* name". You can see the display name of each job using the pipeline Web interface, available at
* /_ah/pipeline/list, where the display name column is confusingly labeled "Class Path". Usually,
* the display name is set to a fixed value by the mapreduce code. For instance, when a pipeline job
* is created by the {@link MapreduceRunner} class, the display name is set by the
* {@link MapreduceRunner#setJobName} method. When formulating a URL to invoke {@link
* MapreduceEntityCleanupAction}, the display name must of course be URL-encoded -- spaces are
* replaced by the plus sign, and so forth. For more information, see <a
* href="https://en.wikipedia.org/wiki/Percent-encoding">the Wikipedia article on percent
* encoding.</a>
*
* <p>The daysOld parameter specifies the minimum allowable age of a job in days for it to be
* eligible for deletion. Jobs will not be deleted if they are newer than this threshold, unless
* specifically named using the jobId parameter.
*
* <p>The numJobsToDelete parameter specifies the maximum number of jobs to delete. If this is fewer
* than would ordinarily be deleted, the jobs to be deleted are chosen arbitrarily.
*
* <p>The force parameter, if present and true, indicates that jobs should be deleted even if they
* are not in FINALIZED or STOPPED state.
*/
@Action(
path = "/_dr/task/mapreduceEntityCleanup",
auth = Auth.AUTH_INTERNAL_ONLY
)
public class MapreduceEntityCleanupAction implements Runnable {
private static final int DEFAULT_DAYS_OLD = 180;
private static final int DEFAULT_MAX_NUM_JOBS_TO_DELETE = 5;
private static final String ERROR_BOTH_JOB_ID_AND_NAME =
"Do not specify both a job ID and a job name";
private static final String ERROR_BOTH_JOB_ID_AND_NUMBER_OF_JOBS =
"Do not specify both a job ID and a number of jobs to delete";
private static final String ERROR_BOTH_JOB_ID_AND_DAYS_OLD =
"Do not specify both a job ID and a days old threshold";
private static final String ERROR_NON_POSITIVE_JOBS_TO_DELETE =
"Do not specify a non-positive integer for the number of jobs to delete";
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final Optional<String> jobId;
private final Optional<String> jobName;
private final Optional<Integer> numJobsToDelete;
private final Optional<Integer> daysOld;
private final Optional<Boolean> force;
private final MapreduceEntityCleanupUtil mapreduceEntityCleanupUtil;
private final Clock clock;
private final DatastoreService datastore;
private final Response response;
@Inject
MapreduceEntityCleanupAction(
@Parameter("jobId") Optional<String> jobId,
@Parameter("jobName") Optional<String> jobName,
@Parameter("numJobsToDelete") Optional<Integer> numJobsToDelete,
@Parameter("daysOld") Optional<Integer> daysOld,
@Parameter("force") Optional<Boolean> force,
MapreduceEntityCleanupUtil mapreduceEntityCleanupUtil,
Clock clock,
DatastoreService datastore,
Response response) {
this.jobId = jobId;
this.jobName = jobName;
this.numJobsToDelete = numJobsToDelete;
this.daysOld = daysOld;
this.force = force;
this.mapreduceEntityCleanupUtil = mapreduceEntityCleanupUtil;
this.clock = clock;
this.datastore = datastore;
this.response = response;
}
@Override
public void run() {
response.setContentType(PLAIN_TEXT_UTF_8);
if (jobId.isPresent()) {
runWithJobId();
} else {
runWithoutJobId();
}
}
private void handleBadRequest(String message) {
logger.atSevere().log(message);
response.setPayload(message);
response.setStatus(SC_BAD_REQUEST);
}
/** Delete the job with the specified job ID, checking for conflicting parameters. */
private void runWithJobId() {
if (jobName.isPresent()) {
handleBadRequest(ERROR_BOTH_JOB_ID_AND_NAME);
return;
}
if (numJobsToDelete.isPresent()) {
handleBadRequest(ERROR_BOTH_JOB_ID_AND_NUMBER_OF_JOBS);
return;
}
if (daysOld.isPresent()) {
handleBadRequest(ERROR_BOTH_JOB_ID_AND_DAYS_OLD);
return;
}
response.setPayload(requestDeletion(ImmutableSet.of(jobId.get()), true /* verbose */));
}
/**
* Delete jobs with a matching display name, or all jobs if no name is specified. Only pick jobs
* which are old enough.
*/
private void runWithoutJobId() {
if (numJobsToDelete.isPresent() && numJobsToDelete.get() <= 0) {
handleBadRequest(ERROR_NON_POSITIVE_JOBS_TO_DELETE);
return;
}
int defaultedDaysOld = daysOld.orElse(DEFAULT_DAYS_OLD);
// Only generate the detailed response payload if there aren't too many jobs involved.
boolean verbose =
numJobsToDelete.isPresent() && (numJobsToDelete.get() <= DEFAULT_MAX_NUM_JOBS_TO_DELETE);
StringBuilder payload = new StringBuilder();
// Since findEligibleJobsByJobName returns only a certain number of jobs, we must loop through
// until we find enough, requesting deletion as we go.
int numJobsProcessed = 0;
DateTime cutoffDate = clock.nowUtc().minusDays(defaultedDaysOld);
Optional<String> cursor = Optional.empty();
do {
Optional<Integer> numJobsToRequest =
Optional.ofNullable(
numJobsToDelete.isPresent() ? numJobsToDelete.get() - numJobsProcessed : null);
EligibleJobResults batch =
mapreduceEntityCleanupUtil.findEligibleJobsByJobName(
jobName.orElse(null), cutoffDate, numJobsToRequest, force.orElse(false), cursor);
cursor = batch.cursor();
// Individual batches can come back empty if none of the returned jobs meet the requirements
// or if all jobs have been exhausted.
if (!batch.eligibleJobs().isEmpty()) {
String payloadChunk = requestDeletion(batch.eligibleJobs(), verbose);
if (verbose) {
payload.append(payloadChunk);
}
numJobsProcessed += batch.eligibleJobs().size();
}
// Stop iterating when all jobs have been exhausted (cursor is absent) or enough have been
// processed.
} while (cursor.isPresent()
&& (!numJobsToDelete.isPresent() || (numJobsProcessed < numJobsToDelete.get())));
if (numJobsProcessed == 0) {
logger.atInfo().log(
"No eligible jobs found with name '%s' older than %d days old.",
jobName.orElse("(any)"), defaultedDaysOld);
payload.append("No eligible jobs found");
} else {
logger.atInfo().log("A total of %d job(s) processed.", numJobsProcessed);
payload.append(String.format("A total of %d job(s) processed", numJobsProcessed));
}
response.setPayload(payload.toString());
}
private String requestDeletion(Set<String> actualJobIds, boolean verbose) {
Optional<StringBuilder> payloadChunkBuilder =
verbose ? Optional.of(new StringBuilder()) : Optional.empty();
int errorCount = 0;
for (String actualJobId : actualJobIds) {
Optional<String> error =
mapreduceEntityCleanupUtil.deleteJobAsync(datastore, actualJobId, force.orElse(false));
if (error.isPresent()) {
errorCount++;
}
logger.atInfo().log("%s: %s", actualJobId, error.orElse("deletion requested"));
payloadChunkBuilder.ifPresent(
stringBuilder ->
stringBuilder.append(
String.format("%s: %s\n", actualJobId, error.orElse("deletion requested"))));
}
logger.atInfo().log(
"successfully requested async deletion of %d job(s); errors received on %d",
actualJobIds.size() - errorCount, errorCount);
if (payloadChunkBuilder.isPresent()) {
payloadChunkBuilder.get().append(String.format(
"successfully requested async deletion of %d job(s); errors received on %d\n",
actualJobIds.size() - errorCount,
errorCount));
return payloadChunkBuilder.get().toString();
} else {
return "";
}
}
}

View file

@ -1,210 +0,0 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.batch;
import com.google.appengine.api.datastore.BaseDatastoreService;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobServiceFactory;
import com.google.appengine.tools.pipeline.NoSuchObjectException;
import com.google.appengine.tools.pipeline.impl.PipelineManager;
import com.google.appengine.tools.pipeline.impl.model.JobRecord;
import com.google.appengine.tools.pipeline.util.Pair;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableSet;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import javax.inject.Inject;
import org.joda.time.DateTime;
/** Utilities used in mapreduce datastore entity cleanup. */
class MapreduceEntityCleanupUtil {
/** Number of jobs to fetch at a time using PipelineManager.queryRootPipelines. */
protected static final int MAX_NUMBER_OF_JOBS_PER_SEARCH = 100;
private static final ImmutableSet<String> JOB_PREFIXES =
ImmutableSet.of("", "map-", "sort-", "merge-", "reduce-");
@Inject
MapreduceEntityCleanupUtil() {}
/** Return value from {@link #findEligibleJobsByJobName} */
@AutoValue
abstract static class EligibleJobResults {
static EligibleJobResults create(ImmutableSet<String> jobs, Optional<String> cursor) {
return new AutoValue_MapreduceEntityCleanupUtil_EligibleJobResults(jobs, cursor);
}
abstract ImmutableSet<String> eligibleJobs();
abstract Optional<String> cursor();
}
/**
* Returns the maximum number of jobs to return per search request.
*
* <p>This method is present to allow overriding by test subclasses.
*/
protected int getMaxNumberOfJobsPerSearch() {
return MAX_NUMBER_OF_JOBS_PER_SEARCH;
}
/**
* Finds the requested number of root pipeline jobs eligible for deletion.
*
* <p>Loops through the root jobs returned by the pipeline API, searching for those with a
* matching name in an appropriate state, and older than the specified cutoff date.
*
* <p>Regardless of the setting of maxJobs, a maximum of {@link
* #MAX_NUMBER_OF_JOBS_PER_SEARCH} will be returned. If there might be more jobs available to
* find, a cursor will be returned, which can be used in a subsequent call to {@link
* #findEligibleJobsByJobName} to continue the search.
*
* @param jobName the desired job name; if null, all jobs are considered to match
* @param cutoffDate eligible jobs must have both startTime and endTime before cutoffDate; if
* startTime and/or endTime are null, they are considered to be old enough -- this is because
* many jobs do lack at least one of these, and we don't want such jobs to stick around
* forever and not get deleted
* @param maxJobs the maximum number of jobs to return; if absent, return all eligible jobs (see
* note above about {@link #MAX_NUMBER_OF_JOBS_PER_SEARCH})
* @param ignoreState if true, jobs will be included regardless of the state
* @param cursor if present, a cursor returned from a previous call to the method; the search will
* be picked up where it left off
* @return job IDs of the eligible jobs
*/
EligibleJobResults findEligibleJobsByJobName(
@Nullable String jobName,
DateTime cutoffDate,
Optional<Integer> maxJobs,
boolean ignoreState,
Optional<String> cursor) {
if (maxJobs.isPresent() && (maxJobs.get() <= 0)) {
return EligibleJobResults.create(ImmutableSet.of(), Optional.empty());
}
Set<String> eligibleJobs = new HashSet<>();
Pair<? extends Iterable<JobRecord>, String> pair =
PipelineManager.queryRootPipelines(jobName, cursor.orElse(null), getMaxNumberOfJobsPerSearch());
for (JobRecord jobRecord : pair.getFirst()) {
if (((jobRecord.getStartTime() == null)
|| jobRecord.getStartTime().before(cutoffDate.toDate()))
&& ((jobRecord.getEndTime() == null)
|| jobRecord.getEndTime().before(cutoffDate.toDate()))
&& (ignoreState
|| (jobRecord.getState() == JobRecord.State.FINALIZED)
|| (jobRecord.getState() == JobRecord.State.STOPPED))) {
eligibleJobs.add(jobRecord.getRootJobKey().getName());
if (maxJobs.isPresent() && (eligibleJobs.size() >= maxJobs.get())) {
return EligibleJobResults.create(
ImmutableSet.copyOf(eligibleJobs), Optional.empty());
}
}
}
return EligibleJobResults.create(
ImmutableSet.copyOf(eligibleJobs), Optional.ofNullable(pair.getSecond()));
}
/**
* Requests asynchronous deletion of entities associated with the specified job ID.
*
* <p>The mapreduce API is used to delete the MR-* entities, and the pipeline API is used to
* delete the main job records. No attempt is made to check whether the deletion succeeds, only
* whether it appeared to be a valid deletion request up front.
*
* @param datastore The datastore service, which can be either synchronous or asynchronous, since
* the only interaction with the database is via prepared queries
* @param jobId the root pipeline job ID to be deleted; if the jobId does not exist, the deletion
* will be apparently successful, because the underlying library routines do not complain
* @param force passed to the pipeline API, indicating whether jobs should be forcibly deleted
* even if they are not in a completed state; however, there is no force flag on the mapreduce
* API call, meaning that running jobs cannot be deleted
* @return an error string, or absent if no error was detected
*/
Optional<String> deleteJobAsync(
BaseDatastoreService datastore, String jobId, boolean force) {
// Try to delete the MR-* entities. This is always done asynchronously. A return value of false
// indicates that the job is in RUNNING state, and nothing has been done.
// TODO(mountford) check the state of all sharded jobs before deleting any
for (String mrShardedJobId : getPossibleIdsForPipelineJob(datastore, jobId)) {
if (!ShardedJobServiceFactory.getShardedJobService().cleanupJob(mrShardedJobId)) {
return Optional.of(String.format("Skipping; job %s is in running state", mrShardedJobId));
}
}
// If we are successful (meaning, MR-* entity deletion has been kicked off asynchronously),
// delete the pipeline-* entities as well.
try {
PipelineManager.deletePipelineRecords(jobId, force, true /* async */);
return Optional.empty();
} catch (NoSuchObjectException ex) {
return Optional.of("No such pipeline job");
} catch (IllegalStateException ex) {
return Optional.of("Job is not in FINALIZED or STOPPED state");
}
}
/**
* Returns the possible MR-ShardedJob IDs associated with the specified pipeline job and any child
* jobs.
*
* @param datastore The datastore service, which can be either synchronous or asynchronous, since
* the only interaction with the database is via prepared queries
* @param jobId The pipeline job ID
* @return the IDs of MR-ShardedJob entities that the Mapreduce library might have created,
* depending on which steps of the mapreduce were used
*/
private ImmutableSet<String> getPossibleIdsForPipelineJob(
BaseDatastoreService datastore, String jobId) {
return getPossibleIdsForPipelineJobRecur(datastore, jobId, new HashSet<>());
}
/**
* Called by getPossibleIdsForPipelineJob(), and by itself recursively.
*
* @param datastore The datastore service, which can be either synchronous or asynchronous, since
* the only interaction with the database is via prepared queries
* @param jobId The pipeline job ID
* @param handledJobIds The set of job IDs which have been handled so far; this is a sanity check
* to prevent an infinite loop if, for some crazy reason, the job dependency graph is cyclic
* @return the IDs of MR-ShardedJob entities that the Mapreduce library might have created,
* depending on which steps of the mapreduce were used
*/
private ImmutableSet<String> getPossibleIdsForPipelineJobRecur(
BaseDatastoreService datastore, String jobId, Set<String> handledJobIds) {
if (handledJobIds.contains(jobId)) {
return ImmutableSet.of();
}
handledJobIds.add(jobId);
JobRecord jobRecord;
try {
jobRecord = PipelineManager.getJob(jobId);
} catch (NoSuchObjectException e) {
return ImmutableSet.of();
}
ImmutableSet.Builder<String> idSetBuilder = new ImmutableSet.Builder<>();
for (String jobPrefix : JOB_PREFIXES) {
idSetBuilder.add("MR-ShardedJob", jobPrefix + jobId);
}
for (Key childKey : jobRecord.getChildKeys()) {
idSetBuilder
.addAll(getPossibleIdsForPipelineJobRecur(datastore, childKey.getName(), handledJobIds));
}
return idSetBuilder.build();
}
}

View file

@ -210,12 +210,6 @@
<url-pattern>/_dr/task/pollBigqueryJob</url-pattern>
</servlet-mapping>
<!-- Cleans up old mapreduce entities. -->
<servlet-mapping>
<servlet-name>backend-servlet</servlet-name>
<url-pattern>/_dr/task/mapreduceEntityCleanup</url-pattern>
</servlet-mapping>
<!-- Fans out a cron task over an adjustable range of TLDs. -->
<servlet-mapping>
<servlet-name>backend-servlet</servlet-name>

View file

@ -25,7 +25,6 @@ import google.registry.batch.DeleteContactsAndHostsAction;
import google.registry.batch.DeleteLoadTestDataAction;
import google.registry.batch.DeleteProberDataAction;
import google.registry.batch.ExpandRecurringBillingEventsAction;
import google.registry.batch.MapreduceEntityCleanupAction;
import google.registry.batch.RefreshDnsOnHostRenameAction;
import google.registry.batch.ResaveAllEppResourcesAction;
import google.registry.batch.ResaveEntityAction;
@ -138,7 +137,6 @@ interface BackendRequestComponent {
IcannReportingStagingAction icannReportingStagingAction();
IcannReportingUploadAction icannReportingUploadAction();
LoadSnapshotAction loadSnapshotAction();
MapreduceEntityCleanupAction mapreduceEntityCleanupAction();
MetricsExportAction metricsExportAction();
NordnUploadAction nordnUploadAction();
NordnVerifyAction nordnVerifyAction();

View file

@ -1,559 +0,0 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.batch;
import static com.google.appengine.api.datastore.DatastoreServiceFactory.getDatastoreService;
import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8;
import static com.google.common.truth.Truth.assertThat;
import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import static org.joda.time.DateTimeZone.UTC;
import com.google.appengine.api.datastore.DatastoreService;
import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.FetchOptions;
import com.google.appengine.api.datastore.Query;
import com.google.appengine.tools.mapreduce.MapReduceJob;
import com.google.appengine.tools.mapreduce.MapReduceSettings;
import com.google.appengine.tools.mapreduce.MapReduceSpecification;
import com.google.appengine.tools.mapreduce.Mapper;
import com.google.appengine.tools.mapreduce.Reducer;
import com.google.appengine.tools.mapreduce.ReducerInput;
import com.google.appengine.tools.mapreduce.impl.shardedjob.IncrementalTaskState;
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardRetryState;
import com.google.appengine.tools.mapreduce.inputs.InMemoryInput;
import com.google.appengine.tools.mapreduce.outputs.InMemoryOutput;
import com.google.appengine.tools.pipeline.JobSetting;
import com.google.appengine.tools.pipeline.PipelineService;
import com.google.appengine.tools.pipeline.PipelineServiceFactory;
import com.google.appengine.tools.pipeline.impl.model.Barrier;
import com.google.appengine.tools.pipeline.impl.model.FanoutTaskRecord;
import com.google.appengine.tools.pipeline.impl.model.JobInstanceRecord;
import com.google.appengine.tools.pipeline.impl.model.JobRecord;
import com.google.appengine.tools.pipeline.impl.model.ShardedValue;
import com.google.appengine.tools.pipeline.impl.model.Slot;
import com.google.common.collect.ImmutableList;
import google.registry.testing.FakeClock;
import google.registry.testing.FakeResponse;
import google.registry.testing.mapreduce.MapreduceTestCase;
import java.util.List;
import java.util.Optional;
import org.joda.time.DateTime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class MapreduceEntityCleanupActionTest
extends MapreduceTestCase<MapreduceEntityCleanupAction> {
private static final DatastoreService datastore = getDatastoreService();
private static final FetchOptions FETCH_OPTIONS = FetchOptions.Builder.withChunkSize(200);
private static final String QUEUE_NAME = "mapreduce";
private final TestMapreduceEntityCleanupUtil mapreduceEntityCleanupUtil =
new TestMapreduceEntityCleanupUtil();
private final FakeClock clock = new FakeClock(DateTime.now(UTC));
private final FakeResponse response = new FakeResponse();
private static final ImmutableList<List<String>> inputStrings = ImmutableList.of(
ImmutableList.of("a", "b", "c"),
ImmutableList.of("d", "e", "f", "g", "h"),
ImmutableList.of("i", "j", "k"),
ImmutableList.of("l"),
ImmutableList.of("m", "n"));
private static final InMemoryInput<String> input = new InMemoryInput<>(inputStrings);
private static class TestMapper extends Mapper<String, String, String> {
private static final long serialVersionUID = 8472979502634351364L;
@Override
public void map(String s) {
emit(s, s);
}
}
private static class TestReducer extends Reducer<String, String, String> {
private static final long serialVersionUID = 5344368517893904668L;
@Override
public void reduce(final String key, ReducerInput<String> values) {
while (values.hasNext()) {
emit(values.next());
}
}
}
private static String createMapreduce(String jobName) {
MapReduceJob<String, String, String, String, List<List<String>>> mapReduceJob =
new MapReduceJob<>(
new MapReduceSpecification.Builder<String, String, String, String, List<List<String>>>()
.setJobName(jobName)
.setInput(input)
.setMapper(new TestMapper())
.setReducer(new TestReducer())
.setOutput(new InMemoryOutput<>())
.setNumReducers(2)
.build(),
new MapReduceSettings.Builder().setWorkerQueueName(QUEUE_NAME).build());
PipelineService pipelineService = PipelineServiceFactory.newPipelineService();
return pipelineService.startNewPipeline(mapReduceJob, new JobSetting.OnQueue(QUEUE_NAME));
}
private void setAnyJobAndDaysOld(int daysOld) {
setJobIdJobNameAndDaysOld(Optional.empty(), Optional.empty(), Optional.of(daysOld));
}
private void setJobId(String jobId) {
setJobIdJobNameAndDaysOld(Optional.of(jobId), Optional.empty(), Optional.empty());
}
private void setJobName(String jobName) {
setJobIdJobNameAndDaysOld(Optional.empty(), Optional.of(jobName), Optional.empty());
}
private void setJobNameAndDaysOld(String jobName, int daysOld) {
setJobIdJobNameAndDaysOld(Optional.empty(), Optional.of(jobName), Optional.of(daysOld));
}
private void setJobIdJobNameAndDaysOld(
Optional<String> jobId, Optional<String> jobName, Optional<Integer> daysOld) {
clock.setTo(DateTime.now(UTC));
action = new MapreduceEntityCleanupAction(
jobId,
jobName,
Optional.empty(), // numJobsToDelete
daysOld,
Optional.empty(), // force
mapreduceEntityCleanupUtil,
clock,
DatastoreServiceFactory.getDatastoreService(),
response);
}
/**
* Get the keys of a particular kind.
*
* <p>We really just care about the count of keys, but the exception messages are much clearer if
* we print out the complete list, rather than just the count.
*/
private static List<Entity> getKeys(String kind) {
return datastore.prepare(new Query(kind).setKeysOnly()).asList(FETCH_OPTIONS);
}
private static void assertNumMapreducesAndShardedJobs(
int numberOfMapreduces, int numberOfShardedJobEntities) {
assertThat(getKeys(JobRecord.DATA_STORE_KIND)).hasSize(14 * numberOfMapreduces);
assertThat(getKeys(Slot.DATA_STORE_KIND)).hasSize(32 * numberOfMapreduces);
assertThat(getKeys(ShardedValue.DATA_STORE_KIND)).isEmpty();
assertThat(getKeys(Barrier.DATA_STORE_KIND)).hasSize(28 * numberOfMapreduces);
assertThat(getKeys(JobInstanceRecord.DATA_STORE_KIND)).hasSize(14 * numberOfMapreduces);
assertThat(getKeys(FanoutTaskRecord.DATA_STORE_KIND)).hasSize(18 * numberOfMapreduces);
assertThat(getKeys("MR-ShardedJob")).hasSize(numberOfShardedJobEntities);
assertThat(getKeys("MR-ShardedValue")).isEmpty();
assertThat(getKeys(ShardRetryState.Serializer.makeKey("dummy").getKind())).isEmpty();
assertThat(getKeys(IncrementalTaskState.Serializer.makeKey("dummy").getKind()))
.hasSize(numberOfMapreduces * (2 + 2 * inputStrings.size()));
}
@Test
public void testCleanup_succeeds() throws Exception {
// Create and run the mapreduce.
String jobId = createMapreduce("jobname");
executeTasksUntilEmpty(QUEUE_NAME, clock);
// The desired entities should be present.
assertNumMapreducesAndShardedJobs(1, 3);
// Now run the cleanup action.
setJobId(jobId);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo(
jobId
+ ": deletion requested\n"
+ "successfully requested async deletion of 1 job(s); errors received on 0\n");
// The entities should still be there, because everything executes asynchronously, and in fact
// there should be more pipeline entities, because the deletion tasks are there as well.
// However, the MR-* sharded entities are gone for some reason.
assertThat(getKeys(JobRecord.DATA_STORE_KIND)).hasSize(17);
assertThat(getKeys(Slot.DATA_STORE_KIND)).hasSize(38);
assertThat(getKeys(ShardedValue.DATA_STORE_KIND)).isEmpty();
assertThat(getKeys(Barrier.DATA_STORE_KIND)).hasSize(34);
assertThat(getKeys(JobInstanceRecord.DATA_STORE_KIND)).hasSize(17);
assertThat(getKeys(FanoutTaskRecord.DATA_STORE_KIND)).hasSize(21);
assertThat(getKeys("MR-ShardedJob")).isEmpty();
assertThat(getKeys("MR-ShardedValue")).isEmpty();
assertThat(getKeys(ShardRetryState.Serializer.makeKey("dummy").getKind())).isEmpty();
assertThat(getKeys(IncrementalTaskState.Serializer.makeKey("dummy").getKind()))
.hasSize(2 + 2 * inputStrings.size());
// Run the async deletion.
executeTasksUntilEmpty(QUEUE_NAME, clock);
// Everything should be gone.
assertNumMapreducesAndShardedJobs(0, 0);
}
@Test
public void testNonexistentJobName_fails() {
setJobName("nonexistent");
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("No eligible jobs found");
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
}
@Test
public void testJobNameOfRecentJob_fails() throws Exception {
createMapreduce("jobname");
executeTasksUntilEmpty(QUEUE_NAME, clock);
setJobName("jobname");
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
assertThat(response.getPayload()).isEqualTo("No eligible jobs found");
}
@Test
public void testJobNameOfRecentJob_succeedsWithDaysOldParameter() throws Exception {
createMapreduce("jobname");
executeTasksUntilEmpty(QUEUE_NAME, clock);
setJobNameAndDaysOld("jobname", 0);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("A total of 1 job(s) processed");
executeTasksUntilEmpty(QUEUE_NAME, clock);
assertNumMapreducesAndShardedJobs(0, 0);
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
}
@Test
public void testAnyJob_fails() throws Exception {
createMapreduce("jobname");
executeTasksUntilEmpty(QUEUE_NAME, clock);
setJobIdJobNameAndDaysOld(
Optional.empty(), Optional.empty(), Optional.empty());
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("No eligible jobs found");
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
}
@Test
public void testAnyJob_succeedsWithDaysOldParameter() throws Exception {
createMapreduce("jobname");
executeTasksUntilEmpty(QUEUE_NAME, clock);
setJobNameAndDaysOld("jobname", 0);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("A total of 1 job(s) processed");
executeTasksUntilEmpty(QUEUE_NAME, clock);
assertNumMapreducesAndShardedJobs(0, 0);
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
}
@Test
public void testNonexistentJobId_succeeds() throws Exception {
setJobId("nonexistent");
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo(
"nonexistent: deletion requested\n"
+ "successfully requested async deletion of 1 job(s); errors received on 0\n");
executeTasksUntilEmpty(QUEUE_NAME, clock);
assertNumMapreducesAndShardedJobs(0, 0);
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0);
}
@Test
public void testDeleteTwoJobs_succeedsWithDaysOldParameter() throws Exception {
createMapreduce("jobname1");
createMapreduce("jobname2");
executeTasksUntilEmpty(QUEUE_NAME, clock);
setAnyJobAndDaysOld(0);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("A total of 2 job(s) processed");
executeTasksUntilEmpty(QUEUE_NAME, clock);
assertNumMapreducesAndShardedJobs(0, 0);
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
}
@Test
public void testDeleteTwoJobsInTwoBatches_succeeds() throws Exception {
createMapreduce("jobname1");
createMapreduce("jobname2");
executeTasksUntilEmpty(QUEUE_NAME, clock);
setAnyJobAndDaysOld(0);
mapreduceEntityCleanupUtil.setMaxNumberOfJobsPerSearch(1);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("A total of 2 job(s) processed");
executeTasksUntilEmpty(QUEUE_NAME, clock);
assertNumMapreducesAndShardedJobs(0, 0);
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(2);
}
@Test
public void testDeleteOneOfTwoJobs_succeedsWithDaysOldParameter() throws Exception {
createMapreduce("jobname1");
createMapreduce("jobname2");
executeTasksUntilEmpty(QUEUE_NAME, clock);
clock.setTo(DateTime.now(UTC));
action = new MapreduceEntityCleanupAction(
Optional.empty(), // jobId
Optional.empty(), // jobName
Optional.of(1), // numJobsToDelete
Optional.of(0), // daysOld
Optional.empty(), // force
mapreduceEntityCleanupUtil,
clock,
DatastoreServiceFactory.getDatastoreService(),
response);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).endsWith(
": deletion requested\n"
+ "successfully requested async deletion of 1 job(s); errors received on 0\n"
+ "A total of 1 job(s) processed");
executeTasksUntilEmpty(QUEUE_NAME, clock);
assertNumMapreducesAndShardedJobs(1, 3);
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
}
@Test
public void testDeleteOneOfTwoJobsByJobName_succeedsWithDaysOldParameter() throws Exception {
createMapreduce("jobname1");
createMapreduce("jobname2");
executeTasksUntilEmpty(QUEUE_NAME, clock);
setJobNameAndDaysOld("jobname1", 0);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("A total of 1 job(s) processed");
executeTasksUntilEmpty(QUEUE_NAME, clock);
assertNumMapreducesAndShardedJobs(1, 3);
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
}
@Test
public void testDeleteOneOfTwoJobsByJobId_succeeds() throws Exception {
String jobId1 = createMapreduce("jobname1");
createMapreduce("jobname2");
executeTasksUntilEmpty(QUEUE_NAME, clock);
setJobId(jobId1);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).endsWith(
": deletion requested\n"
+ "successfully requested async deletion of 1 job(s); errors received on 0\n");
executeTasksUntilEmpty(QUEUE_NAME, clock);
assertNumMapreducesAndShardedJobs(1, 3);
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0);
}
@Test
public void testDeleteTwoJobsByJobId_succeeds() throws Exception {
String jobId1 = createMapreduce("jobname1");
String jobId2 = createMapreduce("jobname2");
executeTasksUntilEmpty(QUEUE_NAME, clock);
setJobId(jobId1);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo(
jobId1
+ ": deletion requested\n"
+ "successfully requested async deletion of 1 job(s); errors received on 0\n");
FakeResponse response2 = new FakeResponse();
clock.setTo(DateTime.now(UTC));
action = new MapreduceEntityCleanupAction(
Optional.of(jobId2), // jobId
Optional.empty(), // jobName
Optional.empty(), // numJobsToDelete
Optional.empty(), // daysOld
Optional.empty(), // force
mapreduceEntityCleanupUtil,
clock,
DatastoreServiceFactory.getDatastoreService(),
response2);
action.run();
assertThat(response2.getStatus()).isEqualTo(SC_OK);
assertThat(response2.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response2.getPayload()).isEqualTo(
jobId2
+ ": deletion requested\n"
+ "successfully requested async deletion of 1 job(s); errors received on 0\n");
executeTasksUntilEmpty(QUEUE_NAME, clock);
assertNumMapreducesAndShardedJobs(0, 0);
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0);
}
@Test
public void testDeleteOfRunningJob_fails() throws Exception {
String jobId = createMapreduce("jobname");
executeTasks(QUEUE_NAME, clock, Optional.of(10));
setJobId(jobId);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).endsWith(
": Job is not in FINALIZED or STOPPED state\n"
+ "successfully requested async deletion of 0 job(s); errors received on 1\n");
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0);
}
@Test
public void testDeleteOfRunningJob_succeedsWithForce() throws Exception {
String jobId = createMapreduce("jobname");
executeTasks(QUEUE_NAME, clock, Optional.of(10));
clock.setTo(DateTime.now(UTC));
action = new MapreduceEntityCleanupAction(
Optional.of(jobId),
Optional.empty(), // jobName
Optional.empty(), // numJobsToDelete
Optional.empty(), // daysOld
Optional.of(true), // force
mapreduceEntityCleanupUtil,
clock,
DatastoreServiceFactory.getDatastoreService(),
response);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).endsWith(
": deletion requested\n"
+ "successfully requested async deletion of 1 job(s); errors received on 0\n");
executeTasksUntilEmpty(QUEUE_NAME, clock);
assertNumMapreducesAndShardedJobs(0, 0);
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0);
}
@Test
public void testJobIdAndJobName_fails() {
setJobIdJobNameAndDaysOld(
Optional.of("jobid"), Optional.of("jobname"), Optional.empty());
action.run();
assertThat(response.getStatus()).isEqualTo(SC_BAD_REQUEST);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("Do not specify both a job ID and a job name");
assertNumMapreducesAndShardedJobs(0, 0);
}
@Test
public void testJobIdAndDaysOld_fails() {
setJobIdJobNameAndDaysOld(Optional.of("jobid"), Optional.empty(), Optional.of(0));
action.run();
assertThat(response.getStatus()).isEqualTo(SC_BAD_REQUEST);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload())
.isEqualTo("Do not specify both a job ID and a days old threshold");
assertNumMapreducesAndShardedJobs(0, 0);
}
@Test
public void testJobIdAndNumJobs_fails() {
action = new MapreduceEntityCleanupAction(
Optional.of("jobid"),
Optional.empty(), // jobName
Optional.of(1), // numJobsToDelete
Optional.empty(), // daysOld
Optional.empty(), // force
mapreduceEntityCleanupUtil,
clock,
DatastoreServiceFactory.getDatastoreService(),
response);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_BAD_REQUEST);
assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8);
assertThat(response.getPayload())
.isEqualTo("Do not specify both a job ID and a number of jobs to delete");
assertNumMapreducesAndShardedJobs(0, 0);
}
@Test
public void testDeleteZeroJobs_throwsUsageError() {
new MapreduceEntityCleanupAction(
Optional.empty(), // jobId
Optional.empty(), // jobName
Optional.of(0), // numJobsToDelete
Optional.empty(), // daysOld
Optional.empty(), // force
mapreduceEntityCleanupUtil,
clock,
DatastoreServiceFactory.getDatastoreService(),
response)
.run();
assertThat(response.getStatus()).isEqualTo(SC_BAD_REQUEST);
assertThat(response.getPayload())
.isEqualTo("Do not specify a non-positive integer for the number of jobs to delete");
}
}

View file

@ -1,60 +0,0 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
package google.registry.batch;
import java.util.Optional;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
/**
* Test harness for {@link MapreduceEntityCleanupUtil}.
*
* <p>It's a somewhat like a mock, in that it records the number of calls to {@link
* #findEligibleJobsByJobName}, and it allows the overriding of the number of jobs returned per
* search, to allow testing of subsequent searches. But it's not a full mock by any means.
*/
public class TestMapreduceEntityCleanupUtil extends MapreduceEntityCleanupUtil {
private int maxNumberOfJobsPerSearch;
private int numSearches = 0;
public TestMapreduceEntityCleanupUtil() {
this.maxNumberOfJobsPerSearch = MAX_NUMBER_OF_JOBS_PER_SEARCH;
}
@Override
protected int getMaxNumberOfJobsPerSearch() {
return maxNumberOfJobsPerSearch;
}
public void setMaxNumberOfJobsPerSearch(int maxNumberOfJobsPerSearch) {
this.maxNumberOfJobsPerSearch = maxNumberOfJobsPerSearch;
}
@Override
public EligibleJobResults findEligibleJobsByJobName(
@Nullable String jobName,
DateTime cutoffDate,
Optional<Integer> maxJobs,
boolean ignoreState,
Optional<String> cursor) {
numSearches++;
return super.findEligibleJobsByJobName(jobName, cutoffDate, maxJobs, ignoreState, cursor);
}
public int getNumSearchesPerformed() {
return numSearches;
}
}

View file

@ -26,7 +26,6 @@ PATH CLASS METHOD
/_dr/task/importRdeHosts RdeHostImportAction GET n INTERNAL APP IGNORED
/_dr/task/linkRdeHosts RdeHostLinkAction GET n INTERNAL APP IGNORED
/_dr/task/loadSnapshot LoadSnapshotAction POST n INTERNAL APP IGNORED
/_dr/task/mapreduceEntityCleanup MapreduceEntityCleanupAction GET n INTERNAL APP IGNORED
/_dr/task/metrics MetricsExportAction POST n INTERNAL APP IGNORED
/_dr/task/nordnUpload NordnUploadAction POST y INTERNAL APP IGNORED
/_dr/task/nordnVerify NordnVerifyAction POST y INTERNAL APP IGNORED