diff --git a/java/google/registry/batch/BUILD b/java/google/registry/batch/BUILD index 02175746f..ae92f08f2 100644 --- a/java/google/registry/batch/BUILD +++ b/java/google/registry/batch/BUILD @@ -25,6 +25,7 @@ java_library( "@com_google_appengine_api_1_0_sdk", "@com_google_appengine_tools_appengine_gcs_client", "@com_google_appengine_tools_appengine_mapreduce", + "@com_google_appengine_tools_appengine_pipeline", "@com_google_auto_factory", "@com_google_auto_value", "@com_google_code_findbugs_jsr305", diff --git a/java/google/registry/batch/BatchModule.java b/java/google/registry/batch/BatchModule.java index 2f67c3c26..425def5b3 100644 --- a/java/google/registry/batch/BatchModule.java +++ b/java/google/registry/batch/BatchModule.java @@ -14,12 +14,19 @@ package google.registry.batch; +import static google.registry.request.RequestParameters.extractOptionalBooleanParameter; +import static google.registry.request.RequestParameters.extractOptionalIntParameter; +import static google.registry.request.RequestParameters.extractOptionalParameter; + import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import dagger.Module; import dagger.Provides; import dagger.multibindings.IntoMap; import dagger.multibindings.StringKey; +import google.registry.request.Parameter; +import javax.servlet.http.HttpServletRequest; /** * Dagger module for injecting common settings for batch actions. @@ -33,4 +40,34 @@ public class BatchModule { static ImmutableList provideEntityIntegrityAlertsSchema() { return EntityIntegrityAlertsSchema.SCHEMA_FIELDS; } + + @Provides + @Parameter("jobName") + static Optional provideJobName(HttpServletRequest req) { + return extractOptionalParameter(req, "jobName"); + } + + @Provides + @Parameter("jobId") + static Optional provideJobId(HttpServletRequest req) { + return extractOptionalParameter(req, "jobId"); + } + + @Provides + @Parameter("numJobsToDelete") + static Optional provideNumJobsToDelete(HttpServletRequest req) { + return extractOptionalIntParameter(req, "numJobsToDelete"); + } + + @Provides + @Parameter("daysOld") + static Optional provideDaysOld(HttpServletRequest req) { + return extractOptionalIntParameter(req, "daysOld"); + } + + @Provides + @Parameter("force") + static Optional provideForce(HttpServletRequest req) { + return extractOptionalBooleanParameter(req, "force"); + } } diff --git a/java/google/registry/batch/MapreduceEntityCleanupAction.java b/java/google/registry/batch/MapreduceEntityCleanupAction.java new file mode 100644 index 000000000..6c8339b59 --- /dev/null +++ b/java/google/registry/batch/MapreduceEntityCleanupAction.java @@ -0,0 +1,236 @@ +// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.batch; + +import com.google.appengine.api.datastore.DatastoreService; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import com.google.common.net.MediaType; +import google.registry.batch.MapreduceEntityCleanupUtil.EligibleJobResults; +import google.registry.mapreduce.MapreduceRunner; +import google.registry.request.Action; +import google.registry.request.Parameter; +import google.registry.request.Response; +import google.registry.util.Clock; +import google.registry.util.FormattingLogger; +import java.util.Set; +import javax.inject.Inject; +import org.joda.time.DateTime; + +/** + * Action to delete entities associated with the App Engine Mapreduce library. + * + *

To delete a specific job, set the jobId parameter. To delete all jobs with a specific job name + * which are older than the specified age, set the jobName parameter. Otherwise, all jobs older than + * the specified age are deleted. Examples: + * + *

    + *
  • jobId=12345: delete only the root pipeline job with ID 12345, and all descendant jobs + *
  • jobName=Generate+Important+Files: delete all root pipeline jobs with the display name + * "Generate Important Files" (subject to the limits imposed by the daysOld and numJobsToDelete + * parameters), and all descendant jobs + *
  • (neither specified): delete all jobs (subject to the limits imposed by the daysOld and + * numJobsToDelete parameters) + *
+ * + *

More about display names: The pipeline library assigns each root pipeline job a "display + * name". You can see the display name of each job using the pipeline Web interface, available at + * /_ah/pipeline/list, where the display name column is confusingly labeled "Class Path". Usually, + * the display name is set to a fixed value by the mapreduce code. For instance, when a pipeline job + * is created by the {@link MapreduceRunner} class, the display name is set by the + * {@link MapreduceRunner#setJobName} method. When formulating a URL to invoke {@link + * MapreduceEntityCleanupAction}, the display name must of course be URL-encoded -- spaces are + * replaced by the plus sign, and so forth. For more information, see the Wikipedia article on percent + * encoding. + * + *

The daysOld parameter specifies the minimum allowable age of a job in days for it to be + * eligible for deletion. Jobs will not be deleted if they are newer than this threshold, unless + * specifically named using the jobId parameter. + * + *

The numJobsToDelete parameter specifies the maximum number of jobs to delete. If this is fewer + * than would ordinarily be deleted, the jobs to be deleted are chosen arbitrarily. + * + *

The force parameter, if present and true, indicates that jobs should be deleted even if they + * are not in FINALIZED or STOPPED state. + */ + +@Action(path = "/_dr/task/mapreduceEntityCleanup") +public class MapreduceEntityCleanupAction implements Runnable { + + private static final int DEFAULT_DAYS_OLD = 180; + private static final int DEFAULT_MAX_NUM_JOBS_TO_DELETE = 5; + + private static final String ERROR_BOTH_JOB_ID_AND_NAME = + "Do not specify both a job ID and a job name"; + private static final String ERROR_BOTH_JOB_ID_AND_NUMBER_OF_JOBS = + "Do not specify both a job ID and a number of jobs to delete"; + private static final String ERROR_BOTH_JOB_ID_AND_DAYS_OLD = + "Do not specify both a job ID and a days old threshold"; + + private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass(); + + private final Optional jobId; + private final Optional jobName; + private final Optional numJobsToDelete; + private final Optional daysOld; + private final Optional force; + private final MapreduceEntityCleanupUtil mapreduceEntityCleanupUtil; + private final Clock clock; + private final DatastoreService datastore; + private final Response response; + + @Inject + MapreduceEntityCleanupAction( + @Parameter("jobId") Optional jobId, + @Parameter("jobName") Optional jobName, + @Parameter("numJobsToDelete") Optional numJobsToDelete, + @Parameter("daysOld") Optional daysOld, + @Parameter("force") Optional force, + MapreduceEntityCleanupUtil mapreduceEntityCleanupUtil, + Clock clock, + DatastoreService datastore, + Response response) { + this.jobId = jobId; + this.jobName = jobName; + this.numJobsToDelete = numJobsToDelete; + this.daysOld = daysOld; + this.force = force; + this.mapreduceEntityCleanupUtil = mapreduceEntityCleanupUtil; + this.clock = clock; + this.datastore = datastore; + this.response = response; + } + + @Override + public void run() { + response.setContentType(MediaType.PLAIN_TEXT_UTF_8); + if (jobId.isPresent()) { + runWithJobId(); + } else { + runWithoutJobId(); + } + } + + private void logSevereAndSetPayload(String message) { + logger.severe(message); + response.setPayload(message); + } + + /** Delete the job with the specified job ID, checking for conflicting parameters. */ + private void runWithJobId() { + if (jobName.isPresent()) { + logSevereAndSetPayload(ERROR_BOTH_JOB_ID_AND_NAME); + return; + } + if (numJobsToDelete.isPresent()) { + logSevereAndSetPayload(ERROR_BOTH_JOB_ID_AND_NUMBER_OF_JOBS); + return; + } + if (daysOld.isPresent()) { + logSevereAndSetPayload(ERROR_BOTH_JOB_ID_AND_DAYS_OLD); + return; + } + response.setPayload(requestDeletion(ImmutableSet.of(jobId.get()), true /* generatePayload */)); + } + + /** + * Delete jobs with a matching display name, or all jobs if no name is specified. Only pick jobs + * which are old enough. + */ + private void runWithoutJobId() { + int defaultedDaysOld = daysOld.or(DEFAULT_DAYS_OLD); + + // Only generate the detailed response payload if there aren't too many jobs involved. + boolean generatePayload = + numJobsToDelete.isPresent() && (numJobsToDelete.get() <= DEFAULT_MAX_NUM_JOBS_TO_DELETE); + Optional payloadBuilder = + generatePayload ? Optional.of(new StringBuilder()) : Optional.absent(); + String defaultPayload = "done"; + + // Since findEligibleJobsByJobName returns only a certain number of jobs, we must loop through + // until we find enough, requesting deletion as we go. We also stop if we don't find anything, + // or if there are no more jobs to be found (because no cursor is returned). + int numJobsDeletedSoFar = 0; + boolean isFirstTime = true; + Optional cursor = Optional.absent(); + DateTime cutoffDate = clock.nowUtc().minusDays(defaultedDaysOld); + while ((isFirstTime || cursor.isPresent()) + && (!numJobsToDelete.isPresent() || (numJobsDeletedSoFar < numJobsToDelete.get()))) { + isFirstTime = false; + EligibleJobResults eligibleJobResults = + mapreduceEntityCleanupUtil.findEligibleJobsByJobName( + jobName.orNull(), cutoffDate, numJobsToDelete, force.or(false), cursor); + cursor = eligibleJobResults.cursor(); + if (eligibleJobResults.eligibleJobs().isEmpty()) { + logger.infofmt( + "No eligible job with name '%s' older than %s days old.", + jobName.or("(null)"), defaultedDaysOld); + if (generatePayload) { + payloadBuilder.get().append("No eligible job."); + } + defaultPayload = "No eligible job."; + } else { + String payloadChunk = requestDeletion(eligibleJobResults.eligibleJobs(), generatePayload); + if (generatePayload) { + payloadBuilder.get().append(payloadChunk); + } + numJobsDeletedSoFar += eligibleJobResults.eligibleJobs().size(); + } + } + + logger.infofmt("A total of %s job(s) processed", numJobsDeletedSoFar); + if (generatePayload) { + payloadBuilder + .get() + .append(String.format("A total of %d job(s) processed\n", numJobsDeletedSoFar)); + response.setPayload(payloadBuilder.get().toString()); + } else { + response.setPayload(defaultPayload); + } + } + + private String requestDeletion(Set actualJobIds, boolean generatePayload) { + Optional payloadChunkBuilder = + generatePayload ? Optional.of(new StringBuilder()) : Optional.absent(); + int errorCount = 0; + for (String actualJobId : actualJobIds) { + Optional error = + mapreduceEntityCleanupUtil.deleteJobAsync(datastore, actualJobId, force.or(false)); + if (error.isPresent()) { + errorCount++; + } + logger.infofmt("%s: %s", actualJobId, error.or("deletion requested")); + if (payloadChunkBuilder.isPresent()) { + payloadChunkBuilder + .get() + .append(String.format("%s: %s\n", actualJobId, error.or("deletion requested"))); + } + } + logger.infofmt( + "successfully requested async deletion of %s job(s); errors received on %s", + actualJobIds.size() - errorCount, + errorCount); + if (payloadChunkBuilder.isPresent()) { + payloadChunkBuilder.get().append(String.format( + "successfully requested async deletion of %d job(s); errors received on %d\n", + actualJobIds.size() - errorCount, + errorCount)); + return payloadChunkBuilder.get().toString(); + } else { + return ""; + } + } +} diff --git a/java/google/registry/batch/MapreduceEntityCleanupUtil.java b/java/google/registry/batch/MapreduceEntityCleanupUtil.java new file mode 100644 index 000000000..515d2b176 --- /dev/null +++ b/java/google/registry/batch/MapreduceEntityCleanupUtil.java @@ -0,0 +1,210 @@ +// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.batch; + +import com.google.appengine.api.datastore.BaseDatastoreService; +import com.google.appengine.api.datastore.Key; +import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobServiceFactory; +import com.google.appengine.tools.pipeline.NoSuchObjectException; +import com.google.appengine.tools.pipeline.impl.PipelineManager; +import com.google.appengine.tools.pipeline.impl.model.JobRecord; +import com.google.appengine.tools.pipeline.util.Pair; +import com.google.auto.value.AutoValue; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; +import java.util.HashSet; +import java.util.Set; +import javax.annotation.Nullable; +import javax.inject.Inject; +import org.joda.time.DateTime; + +/** Utilities used in mapreduce datastore entity cleanup. */ +class MapreduceEntityCleanupUtil { + + /** Number of jobs to fetch at a time using PipelineManager.queryRootPipelines. */ + protected static final int MAX_NUMBER_OF_JOBS_PER_SEARCH = 100; + + private static final ImmutableSet JOB_PREFIXES = + ImmutableSet.of("", "map-", "sort-", "merge-", "reduce-"); + + @Inject + MapreduceEntityCleanupUtil() {} + + /** Return value from {@link #findEligibleJobsByJobName} */ + @AutoValue + abstract static class EligibleJobResults { + static EligibleJobResults create(ImmutableSet jobs, Optional cursor) { + return new AutoValue_MapreduceEntityCleanupUtil_EligibleJobResults(jobs, cursor); + } + + abstract ImmutableSet eligibleJobs(); + abstract Optional cursor(); + } + + /** + * Returns the maximum number of jobs to return per search request. + * + *

This method is present to allow overriding by test subclasses. + */ + protected int getMaxNumberOfJobsPerSearch() { + return MAX_NUMBER_OF_JOBS_PER_SEARCH; + } + + /** + * Finds the requested number of root pipeline jobs eligible for deletion. + * + *

Loops through the root jobs returned by the pipeline API, searching for those with a + * matching name in an appropriate state, and older than the specified cutoff date. + * + *

Regardless of the setting of maxJobs, a maximum of {@link + * #MAX_NUMBER_OF_JOBS_PER_SEARCH} will be returned. If there might be more jobs available to + * find, a cursor will be returned, which can be used in a subsequent call to {@link + * #findEligibleJobsByJobName} to continue the search. + * + * @param jobName the desired job name; if null, all jobs are considered to match + * @param cutoffDate eligible jobs must have both startTime and endTime before cutoffDate; if + * startTime and/or endTime are null, they are considered to be old enough -- this is because + * many jobs do lack at least one of these, and we don't want such jobs to stick around + * forever and not get deleted + * @param maxJobs the maximum number of jobs to return; if absent, return all eligible jobs (see + * note above about {@link #MAX_NUMBER_OF_JOBS_PER_SEARCH}) + * @param ignoreState if true, jobs will be included regardless of the state + * @param cursor if present, a cursor returned from a previous call to the method; the search will + * be picked up where it left off + * @return job IDs of the eligible jobs + */ + EligibleJobResults findEligibleJobsByJobName( + @Nullable String jobName, + DateTime cutoffDate, + Optional maxJobs, + boolean ignoreState, + Optional cursor) { + if (maxJobs.isPresent() && (maxJobs.get() <= 0)) { + return EligibleJobResults.create(ImmutableSet.of(), Optional.absent()); + } + Set eligibleJobs = new HashSet<>(); + Pair, String> pair = + PipelineManager.queryRootPipelines(jobName, cursor.orNull(), getMaxNumberOfJobsPerSearch()); + for (JobRecord jobRecord : pair.getFirst()) { + if (((jobRecord.getStartTime() == null) + || jobRecord.getStartTime().before(cutoffDate.toDate())) + && ((jobRecord.getEndTime() == null) + || jobRecord.getEndTime().before(cutoffDate.toDate())) + && (ignoreState + || (jobRecord.getState() == JobRecord.State.FINALIZED) + || (jobRecord.getState() == JobRecord.State.STOPPED))) { + eligibleJobs.add(jobRecord.getRootJobKey().getName()); + if (maxJobs.isPresent() && (eligibleJobs.size() >= maxJobs.get())) { + return EligibleJobResults.create( + ImmutableSet.copyOf(eligibleJobs), Optional.absent()); + } + } + } + return EligibleJobResults.create( + ImmutableSet.copyOf(eligibleJobs), Optional.fromNullable(pair.getSecond())); + } + + /** + * Requests asynchronous deletion of entities associated with the specified job ID. + * + *

The mapreduce API is used to delete the MR-* entities, and the pipeline API is used to + * delete the main job records. No attempt is made to check whether the deletion succeeds, only + * whether it appeared to be a valid deletion request up front. + * + * @param datastore The datastore service, which can be either synchronous or asynchronous, since + * the only interaction with the database is via prepared queries + * @param jobId the root pipeline job ID to be deleted; if the jobId does not exist, the deletion + * will be apparently successful, because the underlying library routines do not complain + * @param force passed to the pipeline API, indicating whether jobs should be forcibly deleted + * even if they are not in a completed state; however, there is no force flag on the mapreduce + * API call, meaning that running jobs cannot be deleted + * @return an error string, or absent if no error was detected + */ + Optional deleteJobAsync( + BaseDatastoreService datastore, String jobId, boolean force) { + + // Try to delete the MR-* entities. This is always done asynchronously. A return value of false + // indicates that the job is in RUNNING state, and nothing has been done. + // TODO(mountford) check the state of all sharded jobs before deleting any + for (String mrShardedJobId : getPossibleIdsForPipelineJob(datastore, jobId)) { + if (!ShardedJobServiceFactory.getShardedJobService().cleanupJob(mrShardedJobId)) { + return Optional.of(String.format("Skipping; job %s is in running state", mrShardedJobId)); + } + } + + // If we are successful (meaning, MR-* entity deletion has been kicked off asynchronously), + // delete the pipeline-* entities as well. + try { + PipelineManager.deletePipelineRecords(jobId, force, true /* async */); + return Optional.absent(); + } catch (NoSuchObjectException ex) { + return Optional.of("No such pipeline job"); + } catch (IllegalStateException ex) { + return Optional.of("Job is not in FINALIZED or STOPPED state"); + } + } + + /** + * Returns the possible MR-ShardedJob IDs associated with the specified pipeline job and any child + * jobs. + * + * @param datastore The datastore service, which can be either synchronous or asynchronous, since + * the only interaction with the database is via prepared queries + * @param jobId The pipeline job ID + * @return the IDs of MR-ShardedJob entities that the Mapreduce library might have created, + * depending on which steps of the mapreduce were used + */ + private ImmutableSet getPossibleIdsForPipelineJob( + BaseDatastoreService datastore, String jobId) { + return getPossibleIdsForPipelineJobRecur(datastore, jobId, new HashSet()); + } + + /** + * Called by getPossibleIdsForPipelineJob(), and by itself recursively. + * + * @param datastore The datastore service, which can be either synchronous or asynchronous, since + * the only interaction with the database is via prepared queries + * @param jobId The pipeline job ID + * @param handledJobIds The set of job IDs which have been handled so far; this is a sanity check + * to prevent an infinite loop if, for some crazy reason, the job dependency graph is cyclic + * @return the IDs of MR-ShardedJob entities that the Mapreduce library might have created, + * depending on which steps of the mapreduce were used + */ + private ImmutableSet getPossibleIdsForPipelineJobRecur( + BaseDatastoreService datastore, String jobId, Set handledJobIds) { + if (handledJobIds.contains(jobId)) { + return ImmutableSet.of(); + } + handledJobIds.add(jobId); + + JobRecord jobRecord; + try { + jobRecord = PipelineManager.getJob(jobId); + } catch (NoSuchObjectException e) { + return ImmutableSet.of(); + } + + ImmutableSet.Builder idSetBuilder = new ImmutableSet.Builder<>(); + for (String jobPrefix : JOB_PREFIXES) { + idSetBuilder.add("MR-ShardedJob", jobPrefix + jobId); + } + + for (Key childKey : jobRecord.getChildKeys()) { + idSetBuilder + .addAll(getPossibleIdsForPipelineJobRecur(datastore, childKey.getName(), handledJobIds)); + } + return idSetBuilder.build(); + } +} diff --git a/java/google/registry/env/common/backend/WEB-INF/web.xml b/java/google/registry/env/common/backend/WEB-INF/web.xml index 5313c52f0..25709be7e 100644 --- a/java/google/registry/env/common/backend/WEB-INF/web.xml +++ b/java/google/registry/env/common/backend/WEB-INF/web.xml @@ -143,6 +143,12 @@ /_dr/task/pollBigqueryJob + + + backend-servlet + /_dr/task/mapreduceEntityCleanup + + backend-servlet diff --git a/java/google/registry/module/backend/BackendRequestComponent.java b/java/google/registry/module/backend/BackendRequestComponent.java index 7677596eb..626151a88 100644 --- a/java/google/registry/module/backend/BackendRequestComponent.java +++ b/java/google/registry/module/backend/BackendRequestComponent.java @@ -25,6 +25,7 @@ import google.registry.batch.BatchModule; import google.registry.batch.DeleteContactsAndHostsAction; import google.registry.batch.DeleteProberDataAction; import google.registry.batch.ExpandRecurringBillingEventsAction; +import google.registry.batch.MapreduceEntityCleanupAction; import google.registry.batch.RefreshDnsOnHostRenameAction; import google.registry.batch.VerifyEntityIntegrityAction; import google.registry.cron.CommitLogFanoutAction; @@ -110,6 +111,7 @@ interface BackendRequestComponent { ExportReservedTermsAction exportReservedTermsAction(); ExportSnapshotAction exportSnapshotAction(); LoadSnapshotAction loadSnapshotAction(); + MapreduceEntityCleanupAction mapreduceEntityCleanupAction(); MetricsExportAction metricsExportAction(); NordnUploadAction nordnUploadAction(); NordnVerifyAction nordnVerifyAction(); diff --git a/java/google/registry/request/RequestParameters.java b/java/google/registry/request/RequestParameters.java index 97e1b986e..023e8380c 100644 --- a/java/google/registry/request/RequestParameters.java +++ b/java/google/registry/request/RequestParameters.java @@ -114,6 +114,19 @@ public final class RequestParameters { } } + /** + * Returns first GET or POST parameter associated with {@code name} as a boolean. + * + * @throws BadRequestException if request parameter is present but not a valid boolean + */ + public static Optional extractOptionalBooleanParameter( + HttpServletRequest req, String name) { + String stringParam = req.getParameter(name); + return isNullOrEmpty(stringParam) + ? Optional.absent() + : Optional.of(Boolean.valueOf(stringParam)); + } + /** * Returns {@code true} if parameter is present and not empty and not {@code "false"}. * diff --git a/javatests/google/registry/batch/BUILD b/javatests/google/registry/batch/BUILD index 527da5418..23eb93544 100644 --- a/javatests/google/registry/batch/BUILD +++ b/javatests/google/registry/batch/BUILD @@ -24,6 +24,8 @@ java_library( "@com_google_appengine_api_1_0_sdk//:testonly", "@com_google_appengine_api_stubs", "@com_google_appengine_tools_appengine_gcs_client", + "@com_google_appengine_tools_appengine_mapreduce", + "@com_google_appengine_tools_appengine_pipeline", "@com_google_code_findbugs_jsr305", "@com_google_dagger", "@com_google_guava", @@ -40,7 +42,7 @@ java_library( GenTestRules( name = "GeneratedTestRules", - default_test_size = "medium", + default_test_size = "large", test_files = glob(["*Test.java"]), deps = [":batch"], ) diff --git a/javatests/google/registry/batch/MapreduceEntityCleanupActionTest.java b/javatests/google/registry/batch/MapreduceEntityCleanupActionTest.java new file mode 100644 index 000000000..6afd1320f --- /dev/null +++ b/javatests/google/registry/batch/MapreduceEntityCleanupActionTest.java @@ -0,0 +1,584 @@ +// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.batch; + +import static com.google.appengine.api.datastore.DatastoreServiceFactory.getDatastoreService; +import static com.google.common.truth.Truth.assertThat; +import static javax.servlet.http.HttpServletResponse.SC_OK; +import static org.joda.time.DateTimeZone.UTC; + +import com.google.appengine.api.datastore.DatastoreService; +import com.google.appengine.api.datastore.DatastoreServiceFactory; +import com.google.appengine.api.datastore.Entity; +import com.google.appengine.api.datastore.FetchOptions; +import com.google.appengine.api.datastore.Query; +import com.google.appengine.tools.mapreduce.MapReduceJob; +import com.google.appengine.tools.mapreduce.MapReduceSettings; +import com.google.appengine.tools.mapreduce.MapReduceSpecification; +import com.google.appengine.tools.mapreduce.Mapper; +import com.google.appengine.tools.mapreduce.Reducer; +import com.google.appengine.tools.mapreduce.ReducerInput; +import com.google.appengine.tools.mapreduce.impl.shardedjob.IncrementalTaskState; +import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardRetryState; +import com.google.appengine.tools.mapreduce.inputs.InMemoryInput; +import com.google.appengine.tools.mapreduce.outputs.InMemoryOutput; +import com.google.appengine.tools.pipeline.JobSetting; +import com.google.appengine.tools.pipeline.PipelineService; +import com.google.appengine.tools.pipeline.PipelineServiceFactory; +import com.google.appengine.tools.pipeline.impl.model.Barrier; +import com.google.appengine.tools.pipeline.impl.model.FanoutTaskRecord; +import com.google.appengine.tools.pipeline.impl.model.JobInstanceRecord; +import com.google.appengine.tools.pipeline.impl.model.JobRecord; +import com.google.appengine.tools.pipeline.impl.model.ShardedValue; +import com.google.appengine.tools.pipeline.impl.model.Slot; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.net.MediaType; +import google.registry.testing.ExceptionRule; +import google.registry.testing.FakeClock; +import google.registry.testing.FakeResponse; +import google.registry.testing.mapreduce.MapreduceTestCase; +import java.util.List; +import org.joda.time.DateTime; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class MapreduceEntityCleanupActionTest + extends MapreduceTestCase { + + @Rule + public final ExceptionRule thrown = new ExceptionRule(); + + private static final DatastoreService datastore = getDatastoreService(); + private static final FetchOptions FETCH_OPTIONS = FetchOptions.Builder.withChunkSize(200); + private static final String QUEUE_NAME = "mapreduce"; + + private final TestMapreduceEntityCleanupUtil mapreduceEntityCleanupUtil = + new TestMapreduceEntityCleanupUtil(); + private final FakeClock clock = new FakeClock(DateTime.now(UTC)); + private final FakeResponse response = new FakeResponse(); + + private static final ImmutableList> inputStrings = ImmutableList.>of( + ImmutableList.of("a", "b", "c"), + ImmutableList.of("d", "e", "f", "g", "h"), + ImmutableList.of("i", "j", "k"), + ImmutableList.of("l"), + ImmutableList.of("m", "n")); + + private static final InMemoryInput input = new InMemoryInput<>(inputStrings); + + private static class TestMapper extends Mapper { + private static final long serialVersionUID = 8472979502634351364L; + + @Override + public void map(String s) { + emit(s, s); + } + } + + private static class TestReducer extends Reducer { + private static final long serialVersionUID = 5344368517893904668L; + + @Override + public void reduce(final String key, ReducerInput values) { + while (values.hasNext()) { + emit(values.next()); + } + } + } + + private static String createMapreduce(String jobName) throws Exception { + MapReduceJob>> mapReduceJob = + new MapReduceJob<>( + new MapReduceSpecification.Builder>>() + .setJobName(jobName) + .setInput(input) + .setMapper(new TestMapper()) + .setReducer(new TestReducer()) + .setOutput(new InMemoryOutput()) + .setNumReducers(2) + .build(), + new MapReduceSettings.Builder().setWorkerQueueName(QUEUE_NAME).build()); + PipelineService pipelineService = PipelineServiceFactory.newPipelineService(); + return pipelineService.startNewPipeline(mapReduceJob, new JobSetting.OnQueue(QUEUE_NAME)); + } + + /* + private void setJobIdAndJobName(Optional jobId, Optional jobName) { + setJobIdJobNameAndDaysOld(jobId, jobName, Optional.absent()); + } + */ + + private void setAnyJob() { + setJobIdJobNameAndDaysOld( + Optional.absent(), Optional.absent(), Optional.absent()); + } + + private void setAnyJobAndDaysOld(int daysOld) { + setJobIdJobNameAndDaysOld( + Optional.absent(), Optional.absent(), Optional.of(daysOld)); + } + + private void setJobId(String jobId) { + setJobIdJobNameAndDaysOld( + Optional.of(jobId), Optional.absent(), Optional.absent()); + } + + private void setJobName(String jobName) { + setJobIdJobNameAndDaysOld( + Optional.absent(), Optional.of(jobName), Optional.absent()); + } + + private void setJobNameAndDaysOld(String jobName, int daysOld) { + setJobIdJobNameAndDaysOld( + Optional.absent(), Optional.of(jobName), Optional.of(daysOld)); + } + + private void setJobIdJobNameAndDaysOld( + Optional jobId, Optional jobName, Optional daysOld) { + clock.setTo(DateTime.now(UTC)); + action = new MapreduceEntityCleanupAction( + jobId, + jobName, + Optional.absent(), // numJobsToDelete + daysOld, + Optional.absent(), // force + mapreduceEntityCleanupUtil, + clock, + DatastoreServiceFactory.getDatastoreService(), + response); + } + + /** + * Get the keys of a particular kind. + * + *

We really just care about the count of keys, but the exception messages are much clearer if + * we print out the complete list, rather than just the count. + */ + private static List getKeys(String kind) { + return datastore.prepare(new Query(kind).setKeysOnly()).asList(FETCH_OPTIONS); + } + + private static void assertNumMapreducesAndShardedJobs( + int numberOfMapreduces, int numberOfShardedJobEntities) { + assertThat(getKeys(JobRecord.DATA_STORE_KIND)).hasSize(14 * numberOfMapreduces); + assertThat(getKeys(Slot.DATA_STORE_KIND)).hasSize(32 * numberOfMapreduces); + assertThat(getKeys(ShardedValue.DATA_STORE_KIND)).isEmpty(); + assertThat(getKeys(Barrier.DATA_STORE_KIND)).hasSize(28 * numberOfMapreduces); + assertThat(getKeys(JobInstanceRecord.DATA_STORE_KIND)).hasSize(14 * numberOfMapreduces); + assertThat(getKeys(FanoutTaskRecord.DATA_STORE_KIND)).hasSize(18 * numberOfMapreduces); + assertThat(getKeys("MR-ShardedJob")).hasSize(numberOfShardedJobEntities); + assertThat(getKeys("MR-ShardedValue")).isEmpty(); + assertThat(getKeys(ShardRetryState.Serializer.makeKey("dummy").getKind())).isEmpty(); + assertThat(getKeys(IncrementalTaskState.Serializer.makeKey("dummy").getKind())) + .hasSize(numberOfMapreduces * (2 + 2 * inputStrings.size())); + } + + @Test + public void testCleanup_succeeds() throws Exception { + + // Create and run the mapreduce. + String jobId = createMapreduce("jobname"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + + // The desired entities should be present. + assertNumMapreducesAndShardedJobs(1, 3); + + // Now run the cleanup action. + setJobId(jobId); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).isEqualTo( + jobId + + ": deletion requested\n" + + "successfully requested async deletion of 1 job(s); errors received on 0\n"); + + // The entities should still be there, because everything executes asynchronously, and in fact + // there should be more pipeline entities, because the deletion tasks are there as well. + // However, the MR-* sharded entities are gone for some reason. + assertThat(getKeys(JobRecord.DATA_STORE_KIND)).hasSize(17); + assertThat(getKeys(Slot.DATA_STORE_KIND)).hasSize(38); + assertThat(getKeys(ShardedValue.DATA_STORE_KIND)).isEmpty(); + assertThat(getKeys(Barrier.DATA_STORE_KIND)).hasSize(34); + assertThat(getKeys(JobInstanceRecord.DATA_STORE_KIND)).hasSize(17); + assertThat(getKeys(FanoutTaskRecord.DATA_STORE_KIND)).hasSize(21); + assertThat(getKeys("MR-ShardedJob")).isEmpty(); + assertThat(getKeys("MR-ShardedValue")).isEmpty(); + assertThat(getKeys(ShardRetryState.Serializer.makeKey("dummy").getKind())).isEmpty(); + assertThat(getKeys(IncrementalTaskState.Serializer.makeKey("dummy").getKind())) + .hasSize(2 + 2 * inputStrings.size()); + + // Run the async deletion. + executeTasksUntilEmpty(QUEUE_NAME, clock); + + // Everything should be gone. + assertNumMapreducesAndShardedJobs(0, 0); + } + + @Test + public void testNonexistentJobName_fails() throws Exception { + setJobName("nonexistent"); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).isEqualTo("No eligible job."); + assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); + } + + @Test + public void testJobNameOfRecentJob_fails() throws Exception { + createMapreduce("jobname"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + setJobName("jobname"); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); + assertThat(response.getPayload()).isEqualTo("No eligible job."); + } + + @Test + public void testJobNameOfRecentJob_succeedsWithDaysOldParameter() throws Exception { + createMapreduce("jobname"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + setJobNameAndDaysOld("jobname", 0); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).isEqualTo("done"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + assertNumMapreducesAndShardedJobs(0, 0); + assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); + } + + @Test + public void testDeleteZeroJobs_succeeds() throws Exception { + createMapreduce("jobname"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + action = new MapreduceEntityCleanupAction( + Optional.absent(), // jobId + Optional.absent(), // jobName + Optional.of(0), // numJobsToDelete + Optional.absent(), // daysOld + Optional.absent(), // force + mapreduceEntityCleanupUtil, + clock, + DatastoreServiceFactory.getDatastoreService(), + response); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).isEqualTo("A total of 0 job(s) processed\n"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + assertNumMapreducesAndShardedJobs(1, 3); + assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0); + } + + @Test + public void testAnyJob_fails() throws Exception { + createMapreduce("jobname"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + setAnyJob(); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).isEqualTo("No eligible job."); + assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); + } + + @Test + public void testAnyJob_succeedsWithDaysOldParameter() throws Exception { + createMapreduce("jobname"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + setJobNameAndDaysOld("jobname", 0); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).isEqualTo("done"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + assertNumMapreducesAndShardedJobs(0, 0); + assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); + } + + @Test + public void testNonexistentJobId_succeeds() throws Exception { + setJobId("nonexistent"); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).isEqualTo( + "nonexistent: deletion requested\n" + + "successfully requested async deletion of 1 job(s); errors received on 0\n"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + assertNumMapreducesAndShardedJobs(0, 0); + assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0); + } + + @Test + public void testDeleteTwoJobs_succeedsWithDaysOldParameter() throws Exception { + createMapreduce("jobname1"); + createMapreduce("jobname2"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + setAnyJobAndDaysOld(0); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).isEqualTo("done"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + assertNumMapreducesAndShardedJobs(0, 0); + assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); + } + + @Test + public void testDeleteTwoJobsInTwoBatches_succeeds() throws Exception { + createMapreduce("jobname1"); + createMapreduce("jobname2"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + setAnyJobAndDaysOld(0); + mapreduceEntityCleanupUtil.setMaxNumberOfJobsPerSearch(1); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).isEqualTo("done"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + assertNumMapreducesAndShardedJobs(0, 0); + assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(2); + } + + @Test + public void testDeleteOneOfTwoJobs_succeedsWithDaysOldParameter() throws Exception { + createMapreduce("jobname1"); + createMapreduce("jobname2"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + clock.setTo(DateTime.now(UTC)); + action = new MapreduceEntityCleanupAction( + Optional.absent(), // jobId + Optional.absent(), // jobName + Optional.of(1), // numJobsToDelete + Optional.of(0), // daysOld + Optional.absent(), // force + mapreduceEntityCleanupUtil, + clock, + DatastoreServiceFactory.getDatastoreService(), + response); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).endsWith( + ": deletion requested\n" + + "successfully requested async deletion of 1 job(s); errors received on 0\n" + + "A total of 1 job(s) processed\n"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + assertNumMapreducesAndShardedJobs(1, 3); + assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); + } + + @Test + public void testDeleteOneOfTwoJobsByJobName_succeedsWithDaysOldParameter() throws Exception { + createMapreduce("jobname1"); + createMapreduce("jobname2"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + setJobNameAndDaysOld("jobname1", 0); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).isEqualTo("done"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + assertNumMapreducesAndShardedJobs(1, 3); + assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); + } + + @Test + public void testDeleteOneOfTwoJobsByJobId_succeeds() throws Exception { + String jobId1 = createMapreduce("jobname1"); + createMapreduce("jobname2"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + setJobId(jobId1); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).endsWith( + ": deletion requested\n" + + "successfully requested async deletion of 1 job(s); errors received on 0\n"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + assertNumMapreducesAndShardedJobs(1, 3); + assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0); + } + + @Test + public void testDeleteTwoJobsByJobId_succeeds() throws Exception { + String jobId1 = createMapreduce("jobname1"); + String jobId2 = createMapreduce("jobname2"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + setJobId(jobId1); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).isEqualTo( + jobId1 + + ": deletion requested\n" + + "successfully requested async deletion of 1 job(s); errors received on 0\n"); + + FakeResponse response2 = new FakeResponse(); + clock.setTo(DateTime.now(UTC)); + action = new MapreduceEntityCleanupAction( + Optional.of(jobId2), // jobId + Optional.absent(), // jobName + Optional.absent(), // numJobsToDelete + Optional.absent(), // daysOld + Optional.absent(), // force + mapreduceEntityCleanupUtil, + clock, + DatastoreServiceFactory.getDatastoreService(), + response2); + + action.run(); + + assertThat(response2.getStatus()).isEqualTo(SC_OK); + assertThat(response2.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response2.getPayload()).isEqualTo( + jobId2 + + ": deletion requested\n" + + "successfully requested async deletion of 1 job(s); errors received on 0\n"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + assertNumMapreducesAndShardedJobs(0, 0); + assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0); + } + + @Test + public void testDeleteOfRunningJob_fails() throws Exception { + String jobId = createMapreduce("jobname"); + executeTasks(QUEUE_NAME, clock, Optional.of(10)); + setJobId(jobId); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).endsWith( + ": Job is not in FINALIZED or STOPPED state\n" + + "successfully requested async deletion of 0 job(s); errors received on 1\n"); + assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0); + } + + @Test + public void testDeleteOfRunningJob_succeedsWithForce() throws Exception { + String jobId = createMapreduce("jobname"); + executeTasks(QUEUE_NAME, clock, Optional.of(10)); + clock.setTo(DateTime.now(UTC)); + action = new MapreduceEntityCleanupAction( + Optional.of(jobId), + Optional.absent(), // jobName + Optional.absent(), // numJobsToDelete + Optional.absent(), // daysOld + Optional.of(true), // force + mapreduceEntityCleanupUtil, + clock, + DatastoreServiceFactory.getDatastoreService(), + response); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).endsWith( + ": deletion requested\n" + + "successfully requested async deletion of 1 job(s); errors received on 0\n"); + executeTasksUntilEmpty(QUEUE_NAME, clock); + assertNumMapreducesAndShardedJobs(0, 0); + assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0); + } + + @Test + public void testJobIdAndJobName_fails() throws Exception { + setJobIdJobNameAndDaysOld( + Optional.of("jobid"), Optional.of("jobname"), Optional.absent()); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).isEqualTo("Do not specify both a job ID and a job name"); + assertNumMapreducesAndShardedJobs(0, 0); + } + + @Test + public void testJobIdAndDaysOld_fails() throws Exception { + setJobIdJobNameAndDaysOld(Optional.of("jobid"), Optional.absent(), Optional.of(0)); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()) + .isEqualTo("Do not specify both a job ID and a days old threshold"); + assertNumMapreducesAndShardedJobs(0, 0); + } + + @Test + public void testJobIdAndNumJobs_fails() throws Exception { + clock.setTo(DateTime.now(UTC)); + action = new MapreduceEntityCleanupAction( + Optional.of("jobid"), + Optional.absent(), // jobName + Optional.of(1), // numJobsToDelete + Optional.absent(), // daysOld + Optional.absent(), // force + mapreduceEntityCleanupUtil, + clock, + DatastoreServiceFactory.getDatastoreService(), + response); + + action.run(); + + assertThat(response.getStatus()).isEqualTo(SC_OK); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()) + .isEqualTo("Do not specify both a job ID and a number of jobs to delete"); + assertNumMapreducesAndShardedJobs(0, 0); + } +} diff --git a/javatests/google/registry/batch/TestMapreduceEntityCleanupUtil.java b/javatests/google/registry/batch/TestMapreduceEntityCleanupUtil.java new file mode 100644 index 000000000..fb8fd83da --- /dev/null +++ b/javatests/google/registry/batch/TestMapreduceEntityCleanupUtil.java @@ -0,0 +1,60 @@ +// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// +//Licensed under the Apache License, Version 2.0 (the "License"); +//you may not use this file except in compliance with the License. +//You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, software +//distributed under the License is distributed on an "AS IS" BASIS, +//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//See the License for the specific language governing permissions and +//limitations under the License. + +package google.registry.batch; + +import com.google.common.base.Optional; +import javax.annotation.Nullable; +import org.joda.time.DateTime; + +/** + * Test harness for {@link MapreduceEntityCleanupUtil}. + * + *

It's a somewhat like a mock, in that it records the number of calls to {@link + * #findEligibleJobsByJobName}, and it allows the overriding of the number of jobs returned per + * search, to allow testing of subsequent searches. But it's not a full mock by any means. + */ +public class TestMapreduceEntityCleanupUtil extends MapreduceEntityCleanupUtil { + + private int maxNumberOfJobsPerSearch; + private int numSearches = 0; + + public TestMapreduceEntityCleanupUtil() { + this.maxNumberOfJobsPerSearch = MAX_NUMBER_OF_JOBS_PER_SEARCH; + } + + @Override + protected int getMaxNumberOfJobsPerSearch() { + return maxNumberOfJobsPerSearch; + } + + public void setMaxNumberOfJobsPerSearch(int maxNumberOfJobsPerSearch) { + this.maxNumberOfJobsPerSearch = maxNumberOfJobsPerSearch; + } + + @Override + public EligibleJobResults findEligibleJobsByJobName( + @Nullable String jobName, + DateTime cutoffDate, + Optional maxJobs, + boolean ignoreState, + Optional cursor) { + numSearches++; + return super.findEligibleJobsByJobName(jobName, cutoffDate, maxJobs, ignoreState, cursor); + } + + public int getNumSearchesPerformed() { + return numSearches; + } +} diff --git a/javatests/google/registry/testing/mapreduce/MapreduceTestCase.java b/javatests/google/registry/testing/mapreduce/MapreduceTestCase.java index fd72d3809..2f8868bce 100644 --- a/javatests/google/registry/testing/mapreduce/MapreduceTestCase.java +++ b/javatests/google/registry/testing/mapreduce/MapreduceTestCase.java @@ -115,6 +115,8 @@ public abstract class MapreduceTestCase extends ShardableTestCase { String pathInfo = taskStateInfo.getUrl(); if (pathInfo.startsWith("/_dr/mapreduce/")) { pathInfo = pathInfo.replace("/_dr/mapreduce", ""); + } else if (pathInfo.startsWith("/mapreduce/")) { + pathInfo = pathInfo.replace("/mapreduce", ""); } else if (pathInfo.startsWith("/")) { pathInfo = pathInfo.replace("/_ah/", ""); pathInfo = pathInfo.substring(pathInfo.indexOf('/')); @@ -176,7 +178,23 @@ public abstract class MapreduceTestCase extends ShardableTestCase { */ protected void executeTasksUntilEmpty(String queueName, @Nullable FakeClock clock) throws Exception { - while (true) { + executeTasks(queueName, clock, Optional.absent()); + } + + /** + * Executes mapreduce tasks, increment the clock between each task. + * + *

Incrementing the clock between tasks is important if tasks have transactions inside the + * mapper or reducer, which don't have access to the fake clock. + * + *

The maxTasks parameter determines how many tasks (at most) will be run. If maxTasks is + * absent(), all tasks are run until the queue is empty. If maxTasks is zero, no tasks are run. + */ + protected void executeTasks( + String queueName, @Nullable FakeClock clock, Optional maxTasks) throws Exception { + for (int numTasksDeleted = 0; + !maxTasks.isPresent() || (numTasksDeleted < maxTasks.get()); + numTasksDeleted++) { ofy().clearSessionCache(); // We have to re-acquire task list every time, because local implementation returns a copy. List taskInfo =