diff --git a/java/google/registry/batch/MapreduceEntityCleanupAction.java b/java/google/registry/batch/MapreduceEntityCleanupAction.java deleted file mode 100644 index 0ffe19feb..000000000 --- a/java/google/registry/batch/MapreduceEntityCleanupAction.java +++ /dev/null @@ -1,241 +0,0 @@ -// Copyright 2017 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.batch; - -import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8; -import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST; - -import com.google.appengine.api.datastore.DatastoreService; -import com.google.common.collect.ImmutableSet; -import com.google.common.flogger.FluentLogger; -import google.registry.batch.MapreduceEntityCleanupUtil.EligibleJobResults; -import google.registry.mapreduce.MapreduceRunner; -import google.registry.request.Action; -import google.registry.request.Parameter; -import google.registry.request.Response; -import google.registry.request.auth.Auth; -import google.registry.util.Clock; -import java.util.Optional; -import java.util.Set; -import javax.inject.Inject; -import org.joda.time.DateTime; - -/** - * Action to delete entities associated with the App Engine Mapreduce library. - * - *

To delete a specific job, set the jobId parameter. To delete all jobs with a specific job name - * which are older than the specified age, set the jobName parameter. Otherwise, all jobs older than - * the specified age are deleted. Examples: - * - *

- * - *

More about display names: The pipeline library assigns each root pipeline job a "display - * name". You can see the display name of each job using the pipeline Web interface, available at - * /_ah/pipeline/list, where the display name column is confusingly labeled "Class Path". Usually, - * the display name is set to a fixed value by the mapreduce code. For instance, when a pipeline job - * is created by the {@link MapreduceRunner} class, the display name is set by the - * {@link MapreduceRunner#setJobName} method. When formulating a URL to invoke {@link - * MapreduceEntityCleanupAction}, the display name must of course be URL-encoded -- spaces are - * replaced by the plus sign, and so forth. For more information, see the Wikipedia article on percent - * encoding. - * - *

The daysOld parameter specifies the minimum allowable age of a job in days for it to be - * eligible for deletion. Jobs will not be deleted if they are newer than this threshold, unless - * specifically named using the jobId parameter. - * - *

The numJobsToDelete parameter specifies the maximum number of jobs to delete. If this is fewer - * than would ordinarily be deleted, the jobs to be deleted are chosen arbitrarily. - * - *

The force parameter, if present and true, indicates that jobs should be deleted even if they - * are not in FINALIZED or STOPPED state. - */ - -@Action( - path = "/_dr/task/mapreduceEntityCleanup", - auth = Auth.AUTH_INTERNAL_ONLY -) -public class MapreduceEntityCleanupAction implements Runnable { - - private static final int DEFAULT_DAYS_OLD = 180; - private static final int DEFAULT_MAX_NUM_JOBS_TO_DELETE = 5; - - private static final String ERROR_BOTH_JOB_ID_AND_NAME = - "Do not specify both a job ID and a job name"; - private static final String ERROR_BOTH_JOB_ID_AND_NUMBER_OF_JOBS = - "Do not specify both a job ID and a number of jobs to delete"; - private static final String ERROR_BOTH_JOB_ID_AND_DAYS_OLD = - "Do not specify both a job ID and a days old threshold"; - private static final String ERROR_NON_POSITIVE_JOBS_TO_DELETE = - "Do not specify a non-positive integer for the number of jobs to delete"; - - private static final FluentLogger logger = FluentLogger.forEnclosingClass(); - - private final Optional jobId; - private final Optional jobName; - private final Optional numJobsToDelete; - private final Optional daysOld; - private final Optional force; - private final MapreduceEntityCleanupUtil mapreduceEntityCleanupUtil; - private final Clock clock; - private final DatastoreService datastore; - private final Response response; - - @Inject - MapreduceEntityCleanupAction( - @Parameter("jobId") Optional jobId, - @Parameter("jobName") Optional jobName, - @Parameter("numJobsToDelete") Optional numJobsToDelete, - @Parameter("daysOld") Optional daysOld, - @Parameter("force") Optional force, - MapreduceEntityCleanupUtil mapreduceEntityCleanupUtil, - Clock clock, - DatastoreService datastore, - Response response) { - this.jobId = jobId; - this.jobName = jobName; - this.numJobsToDelete = numJobsToDelete; - this.daysOld = daysOld; - this.force = force; - this.mapreduceEntityCleanupUtil = mapreduceEntityCleanupUtil; - this.clock = clock; - this.datastore = datastore; - this.response = response; - } - - @Override - public void run() { - response.setContentType(PLAIN_TEXT_UTF_8); - if (jobId.isPresent()) { - runWithJobId(); - } else { - runWithoutJobId(); - } - } - - private void handleBadRequest(String message) { - logger.atSevere().log(message); - response.setPayload(message); - response.setStatus(SC_BAD_REQUEST); - } - - /** Delete the job with the specified job ID, checking for conflicting parameters. */ - private void runWithJobId() { - if (jobName.isPresent()) { - handleBadRequest(ERROR_BOTH_JOB_ID_AND_NAME); - return; - } - if (numJobsToDelete.isPresent()) { - handleBadRequest(ERROR_BOTH_JOB_ID_AND_NUMBER_OF_JOBS); - return; - } - if (daysOld.isPresent()) { - handleBadRequest(ERROR_BOTH_JOB_ID_AND_DAYS_OLD); - return; - } - response.setPayload(requestDeletion(ImmutableSet.of(jobId.get()), true /* verbose */)); - } - - /** - * Delete jobs with a matching display name, or all jobs if no name is specified. Only pick jobs - * which are old enough. - */ - private void runWithoutJobId() { - if (numJobsToDelete.isPresent() && numJobsToDelete.get() <= 0) { - handleBadRequest(ERROR_NON_POSITIVE_JOBS_TO_DELETE); - return; - } - int defaultedDaysOld = daysOld.orElse(DEFAULT_DAYS_OLD); - // Only generate the detailed response payload if there aren't too many jobs involved. - boolean verbose = - numJobsToDelete.isPresent() && (numJobsToDelete.get() <= DEFAULT_MAX_NUM_JOBS_TO_DELETE); - StringBuilder payload = new StringBuilder(); - - // Since findEligibleJobsByJobName returns only a certain number of jobs, we must loop through - // until we find enough, requesting deletion as we go. - int numJobsProcessed = 0; - DateTime cutoffDate = clock.nowUtc().minusDays(defaultedDaysOld); - Optional cursor = Optional.empty(); - do { - Optional numJobsToRequest = - Optional.ofNullable( - numJobsToDelete.isPresent() ? numJobsToDelete.get() - numJobsProcessed : null); - EligibleJobResults batch = - mapreduceEntityCleanupUtil.findEligibleJobsByJobName( - jobName.orElse(null), cutoffDate, numJobsToRequest, force.orElse(false), cursor); - cursor = batch.cursor(); - // Individual batches can come back empty if none of the returned jobs meet the requirements - // or if all jobs have been exhausted. - if (!batch.eligibleJobs().isEmpty()) { - String payloadChunk = requestDeletion(batch.eligibleJobs(), verbose); - if (verbose) { - payload.append(payloadChunk); - } - numJobsProcessed += batch.eligibleJobs().size(); - } - // Stop iterating when all jobs have been exhausted (cursor is absent) or enough have been - // processed. - } while (cursor.isPresent() - && (!numJobsToDelete.isPresent() || (numJobsProcessed < numJobsToDelete.get()))); - - if (numJobsProcessed == 0) { - logger.atInfo().log( - "No eligible jobs found with name '%s' older than %d days old.", - jobName.orElse("(any)"), defaultedDaysOld); - payload.append("No eligible jobs found"); - } else { - logger.atInfo().log("A total of %d job(s) processed.", numJobsProcessed); - payload.append(String.format("A total of %d job(s) processed", numJobsProcessed)); - } - response.setPayload(payload.toString()); - } - - private String requestDeletion(Set actualJobIds, boolean verbose) { - Optional payloadChunkBuilder = - verbose ? Optional.of(new StringBuilder()) : Optional.empty(); - int errorCount = 0; - for (String actualJobId : actualJobIds) { - Optional error = - mapreduceEntityCleanupUtil.deleteJobAsync(datastore, actualJobId, force.orElse(false)); - if (error.isPresent()) { - errorCount++; - } - logger.atInfo().log("%s: %s", actualJobId, error.orElse("deletion requested")); - payloadChunkBuilder.ifPresent( - stringBuilder -> - stringBuilder.append( - String.format("%s: %s\n", actualJobId, error.orElse("deletion requested")))); - } - logger.atInfo().log( - "successfully requested async deletion of %d job(s); errors received on %d", - actualJobIds.size() - errorCount, errorCount); - if (payloadChunkBuilder.isPresent()) { - payloadChunkBuilder.get().append(String.format( - "successfully requested async deletion of %d job(s); errors received on %d\n", - actualJobIds.size() - errorCount, - errorCount)); - return payloadChunkBuilder.get().toString(); - } else { - return ""; - } - } -} diff --git a/java/google/registry/batch/MapreduceEntityCleanupUtil.java b/java/google/registry/batch/MapreduceEntityCleanupUtil.java deleted file mode 100644 index 6c003e241..000000000 --- a/java/google/registry/batch/MapreduceEntityCleanupUtil.java +++ /dev/null @@ -1,210 +0,0 @@ -// Copyright 2017 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.batch; - -import com.google.appengine.api.datastore.BaseDatastoreService; -import com.google.appengine.api.datastore.Key; -import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobServiceFactory; -import com.google.appengine.tools.pipeline.NoSuchObjectException; -import com.google.appengine.tools.pipeline.impl.PipelineManager; -import com.google.appengine.tools.pipeline.impl.model.JobRecord; -import com.google.appengine.tools.pipeline.util.Pair; -import com.google.auto.value.AutoValue; -import com.google.common.collect.ImmutableSet; -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; -import javax.annotation.Nullable; -import javax.inject.Inject; -import org.joda.time.DateTime; - -/** Utilities used in mapreduce datastore entity cleanup. */ -class MapreduceEntityCleanupUtil { - - /** Number of jobs to fetch at a time using PipelineManager.queryRootPipelines. */ - protected static final int MAX_NUMBER_OF_JOBS_PER_SEARCH = 100; - - private static final ImmutableSet JOB_PREFIXES = - ImmutableSet.of("", "map-", "sort-", "merge-", "reduce-"); - - @Inject - MapreduceEntityCleanupUtil() {} - - /** Return value from {@link #findEligibleJobsByJobName} */ - @AutoValue - abstract static class EligibleJobResults { - static EligibleJobResults create(ImmutableSet jobs, Optional cursor) { - return new AutoValue_MapreduceEntityCleanupUtil_EligibleJobResults(jobs, cursor); - } - - abstract ImmutableSet eligibleJobs(); - abstract Optional cursor(); - } - - /** - * Returns the maximum number of jobs to return per search request. - * - *

This method is present to allow overriding by test subclasses. - */ - protected int getMaxNumberOfJobsPerSearch() { - return MAX_NUMBER_OF_JOBS_PER_SEARCH; - } - - /** - * Finds the requested number of root pipeline jobs eligible for deletion. - * - *

Loops through the root jobs returned by the pipeline API, searching for those with a - * matching name in an appropriate state, and older than the specified cutoff date. - * - *

Regardless of the setting of maxJobs, a maximum of {@link - * #MAX_NUMBER_OF_JOBS_PER_SEARCH} will be returned. If there might be more jobs available to - * find, a cursor will be returned, which can be used in a subsequent call to {@link - * #findEligibleJobsByJobName} to continue the search. - * - * @param jobName the desired job name; if null, all jobs are considered to match - * @param cutoffDate eligible jobs must have both startTime and endTime before cutoffDate; if - * startTime and/or endTime are null, they are considered to be old enough -- this is because - * many jobs do lack at least one of these, and we don't want such jobs to stick around - * forever and not get deleted - * @param maxJobs the maximum number of jobs to return; if absent, return all eligible jobs (see - * note above about {@link #MAX_NUMBER_OF_JOBS_PER_SEARCH}) - * @param ignoreState if true, jobs will be included regardless of the state - * @param cursor if present, a cursor returned from a previous call to the method; the search will - * be picked up where it left off - * @return job IDs of the eligible jobs - */ - EligibleJobResults findEligibleJobsByJobName( - @Nullable String jobName, - DateTime cutoffDate, - Optional maxJobs, - boolean ignoreState, - Optional cursor) { - if (maxJobs.isPresent() && (maxJobs.get() <= 0)) { - return EligibleJobResults.create(ImmutableSet.of(), Optional.empty()); - } - Set eligibleJobs = new HashSet<>(); - Pair, String> pair = - PipelineManager.queryRootPipelines(jobName, cursor.orElse(null), getMaxNumberOfJobsPerSearch()); - for (JobRecord jobRecord : pair.getFirst()) { - if (((jobRecord.getStartTime() == null) - || jobRecord.getStartTime().before(cutoffDate.toDate())) - && ((jobRecord.getEndTime() == null) - || jobRecord.getEndTime().before(cutoffDate.toDate())) - && (ignoreState - || (jobRecord.getState() == JobRecord.State.FINALIZED) - || (jobRecord.getState() == JobRecord.State.STOPPED))) { - eligibleJobs.add(jobRecord.getRootJobKey().getName()); - if (maxJobs.isPresent() && (eligibleJobs.size() >= maxJobs.get())) { - return EligibleJobResults.create( - ImmutableSet.copyOf(eligibleJobs), Optional.empty()); - } - } - } - return EligibleJobResults.create( - ImmutableSet.copyOf(eligibleJobs), Optional.ofNullable(pair.getSecond())); - } - - /** - * Requests asynchronous deletion of entities associated with the specified job ID. - * - *

The mapreduce API is used to delete the MR-* entities, and the pipeline API is used to - * delete the main job records. No attempt is made to check whether the deletion succeeds, only - * whether it appeared to be a valid deletion request up front. - * - * @param datastore The datastore service, which can be either synchronous or asynchronous, since - * the only interaction with the database is via prepared queries - * @param jobId the root pipeline job ID to be deleted; if the jobId does not exist, the deletion - * will be apparently successful, because the underlying library routines do not complain - * @param force passed to the pipeline API, indicating whether jobs should be forcibly deleted - * even if they are not in a completed state; however, there is no force flag on the mapreduce - * API call, meaning that running jobs cannot be deleted - * @return an error string, or absent if no error was detected - */ - Optional deleteJobAsync( - BaseDatastoreService datastore, String jobId, boolean force) { - - // Try to delete the MR-* entities. This is always done asynchronously. A return value of false - // indicates that the job is in RUNNING state, and nothing has been done. - // TODO(mountford) check the state of all sharded jobs before deleting any - for (String mrShardedJobId : getPossibleIdsForPipelineJob(datastore, jobId)) { - if (!ShardedJobServiceFactory.getShardedJobService().cleanupJob(mrShardedJobId)) { - return Optional.of(String.format("Skipping; job %s is in running state", mrShardedJobId)); - } - } - - // If we are successful (meaning, MR-* entity deletion has been kicked off asynchronously), - // delete the pipeline-* entities as well. - try { - PipelineManager.deletePipelineRecords(jobId, force, true /* async */); - return Optional.empty(); - } catch (NoSuchObjectException ex) { - return Optional.of("No such pipeline job"); - } catch (IllegalStateException ex) { - return Optional.of("Job is not in FINALIZED or STOPPED state"); - } - } - - /** - * Returns the possible MR-ShardedJob IDs associated with the specified pipeline job and any child - * jobs. - * - * @param datastore The datastore service, which can be either synchronous or asynchronous, since - * the only interaction with the database is via prepared queries - * @param jobId The pipeline job ID - * @return the IDs of MR-ShardedJob entities that the Mapreduce library might have created, - * depending on which steps of the mapreduce were used - */ - private ImmutableSet getPossibleIdsForPipelineJob( - BaseDatastoreService datastore, String jobId) { - return getPossibleIdsForPipelineJobRecur(datastore, jobId, new HashSet<>()); - } - - /** - * Called by getPossibleIdsForPipelineJob(), and by itself recursively. - * - * @param datastore The datastore service, which can be either synchronous or asynchronous, since - * the only interaction with the database is via prepared queries - * @param jobId The pipeline job ID - * @param handledJobIds The set of job IDs which have been handled so far; this is a sanity check - * to prevent an infinite loop if, for some crazy reason, the job dependency graph is cyclic - * @return the IDs of MR-ShardedJob entities that the Mapreduce library might have created, - * depending on which steps of the mapreduce were used - */ - private ImmutableSet getPossibleIdsForPipelineJobRecur( - BaseDatastoreService datastore, String jobId, Set handledJobIds) { - if (handledJobIds.contains(jobId)) { - return ImmutableSet.of(); - } - handledJobIds.add(jobId); - - JobRecord jobRecord; - try { - jobRecord = PipelineManager.getJob(jobId); - } catch (NoSuchObjectException e) { - return ImmutableSet.of(); - } - - ImmutableSet.Builder idSetBuilder = new ImmutableSet.Builder<>(); - for (String jobPrefix : JOB_PREFIXES) { - idSetBuilder.add("MR-ShardedJob", jobPrefix + jobId); - } - - for (Key childKey : jobRecord.getChildKeys()) { - idSetBuilder - .addAll(getPossibleIdsForPipelineJobRecur(datastore, childKey.getName(), handledJobIds)); - } - return idSetBuilder.build(); - } -} diff --git a/java/google/registry/env/common/backend/WEB-INF/web.xml b/java/google/registry/env/common/backend/WEB-INF/web.xml index 714c469eb..c00deae46 100644 --- a/java/google/registry/env/common/backend/WEB-INF/web.xml +++ b/java/google/registry/env/common/backend/WEB-INF/web.xml @@ -210,12 +210,6 @@ /_dr/task/pollBigqueryJob - - - backend-servlet - /_dr/task/mapreduceEntityCleanup - - backend-servlet diff --git a/java/google/registry/module/backend/BackendRequestComponent.java b/java/google/registry/module/backend/BackendRequestComponent.java index a36d57b0c..14468c152 100644 --- a/java/google/registry/module/backend/BackendRequestComponent.java +++ b/java/google/registry/module/backend/BackendRequestComponent.java @@ -25,7 +25,6 @@ import google.registry.batch.DeleteContactsAndHostsAction; import google.registry.batch.DeleteLoadTestDataAction; import google.registry.batch.DeleteProberDataAction; import google.registry.batch.ExpandRecurringBillingEventsAction; -import google.registry.batch.MapreduceEntityCleanupAction; import google.registry.batch.RefreshDnsOnHostRenameAction; import google.registry.batch.ResaveAllEppResourcesAction; import google.registry.batch.ResaveEntityAction; @@ -138,7 +137,6 @@ interface BackendRequestComponent { IcannReportingStagingAction icannReportingStagingAction(); IcannReportingUploadAction icannReportingUploadAction(); LoadSnapshotAction loadSnapshotAction(); - MapreduceEntityCleanupAction mapreduceEntityCleanupAction(); MetricsExportAction metricsExportAction(); NordnUploadAction nordnUploadAction(); NordnVerifyAction nordnVerifyAction(); diff --git a/javatests/google/registry/batch/MapreduceEntityCleanupActionTest.java b/javatests/google/registry/batch/MapreduceEntityCleanupActionTest.java deleted file mode 100644 index 96cf714c0..000000000 --- a/javatests/google/registry/batch/MapreduceEntityCleanupActionTest.java +++ /dev/null @@ -1,559 +0,0 @@ -// Copyright 2017 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.batch; - -import static com.google.appengine.api.datastore.DatastoreServiceFactory.getDatastoreService; -import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8; -import static com.google.common.truth.Truth.assertThat; -import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST; -import static javax.servlet.http.HttpServletResponse.SC_OK; -import static org.joda.time.DateTimeZone.UTC; - -import com.google.appengine.api.datastore.DatastoreService; -import com.google.appengine.api.datastore.DatastoreServiceFactory; -import com.google.appengine.api.datastore.Entity; -import com.google.appengine.api.datastore.FetchOptions; -import com.google.appengine.api.datastore.Query; -import com.google.appengine.tools.mapreduce.MapReduceJob; -import com.google.appengine.tools.mapreduce.MapReduceSettings; -import com.google.appengine.tools.mapreduce.MapReduceSpecification; -import com.google.appengine.tools.mapreduce.Mapper; -import com.google.appengine.tools.mapreduce.Reducer; -import com.google.appengine.tools.mapreduce.ReducerInput; -import com.google.appengine.tools.mapreduce.impl.shardedjob.IncrementalTaskState; -import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardRetryState; -import com.google.appengine.tools.mapreduce.inputs.InMemoryInput; -import com.google.appengine.tools.mapreduce.outputs.InMemoryOutput; -import com.google.appengine.tools.pipeline.JobSetting; -import com.google.appengine.tools.pipeline.PipelineService; -import com.google.appengine.tools.pipeline.PipelineServiceFactory; -import com.google.appengine.tools.pipeline.impl.model.Barrier; -import com.google.appengine.tools.pipeline.impl.model.FanoutTaskRecord; -import com.google.appengine.tools.pipeline.impl.model.JobInstanceRecord; -import com.google.appengine.tools.pipeline.impl.model.JobRecord; -import com.google.appengine.tools.pipeline.impl.model.ShardedValue; -import com.google.appengine.tools.pipeline.impl.model.Slot; -import com.google.common.collect.ImmutableList; -import google.registry.testing.FakeClock; -import google.registry.testing.FakeResponse; -import google.registry.testing.mapreduce.MapreduceTestCase; -import java.util.List; -import java.util.Optional; -import org.joda.time.DateTime; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class MapreduceEntityCleanupActionTest - extends MapreduceTestCase { - - private static final DatastoreService datastore = getDatastoreService(); - private static final FetchOptions FETCH_OPTIONS = FetchOptions.Builder.withChunkSize(200); - private static final String QUEUE_NAME = "mapreduce"; - - private final TestMapreduceEntityCleanupUtil mapreduceEntityCleanupUtil = - new TestMapreduceEntityCleanupUtil(); - private final FakeClock clock = new FakeClock(DateTime.now(UTC)); - private final FakeResponse response = new FakeResponse(); - - private static final ImmutableList> inputStrings = ImmutableList.of( - ImmutableList.of("a", "b", "c"), - ImmutableList.of("d", "e", "f", "g", "h"), - ImmutableList.of("i", "j", "k"), - ImmutableList.of("l"), - ImmutableList.of("m", "n")); - - private static final InMemoryInput input = new InMemoryInput<>(inputStrings); - - private static class TestMapper extends Mapper { - private static final long serialVersionUID = 8472979502634351364L; - - @Override - public void map(String s) { - emit(s, s); - } - } - - private static class TestReducer extends Reducer { - private static final long serialVersionUID = 5344368517893904668L; - - @Override - public void reduce(final String key, ReducerInput values) { - while (values.hasNext()) { - emit(values.next()); - } - } - } - - private static String createMapreduce(String jobName) { - MapReduceJob>> mapReduceJob = - new MapReduceJob<>( - new MapReduceSpecification.Builder>>() - .setJobName(jobName) - .setInput(input) - .setMapper(new TestMapper()) - .setReducer(new TestReducer()) - .setOutput(new InMemoryOutput<>()) - .setNumReducers(2) - .build(), - new MapReduceSettings.Builder().setWorkerQueueName(QUEUE_NAME).build()); - PipelineService pipelineService = PipelineServiceFactory.newPipelineService(); - return pipelineService.startNewPipeline(mapReduceJob, new JobSetting.OnQueue(QUEUE_NAME)); - } - - private void setAnyJobAndDaysOld(int daysOld) { - setJobIdJobNameAndDaysOld(Optional.empty(), Optional.empty(), Optional.of(daysOld)); - } - - private void setJobId(String jobId) { - setJobIdJobNameAndDaysOld(Optional.of(jobId), Optional.empty(), Optional.empty()); - } - - private void setJobName(String jobName) { - setJobIdJobNameAndDaysOld(Optional.empty(), Optional.of(jobName), Optional.empty()); - } - - private void setJobNameAndDaysOld(String jobName, int daysOld) { - setJobIdJobNameAndDaysOld(Optional.empty(), Optional.of(jobName), Optional.of(daysOld)); - } - - private void setJobIdJobNameAndDaysOld( - Optional jobId, Optional jobName, Optional daysOld) { - clock.setTo(DateTime.now(UTC)); - action = new MapreduceEntityCleanupAction( - jobId, - jobName, - Optional.empty(), // numJobsToDelete - daysOld, - Optional.empty(), // force - mapreduceEntityCleanupUtil, - clock, - DatastoreServiceFactory.getDatastoreService(), - response); - } - - /** - * Get the keys of a particular kind. - * - *

We really just care about the count of keys, but the exception messages are much clearer if - * we print out the complete list, rather than just the count. - */ - private static List getKeys(String kind) { - return datastore.prepare(new Query(kind).setKeysOnly()).asList(FETCH_OPTIONS); - } - - private static void assertNumMapreducesAndShardedJobs( - int numberOfMapreduces, int numberOfShardedJobEntities) { - assertThat(getKeys(JobRecord.DATA_STORE_KIND)).hasSize(14 * numberOfMapreduces); - assertThat(getKeys(Slot.DATA_STORE_KIND)).hasSize(32 * numberOfMapreduces); - assertThat(getKeys(ShardedValue.DATA_STORE_KIND)).isEmpty(); - assertThat(getKeys(Barrier.DATA_STORE_KIND)).hasSize(28 * numberOfMapreduces); - assertThat(getKeys(JobInstanceRecord.DATA_STORE_KIND)).hasSize(14 * numberOfMapreduces); - assertThat(getKeys(FanoutTaskRecord.DATA_STORE_KIND)).hasSize(18 * numberOfMapreduces); - assertThat(getKeys("MR-ShardedJob")).hasSize(numberOfShardedJobEntities); - assertThat(getKeys("MR-ShardedValue")).isEmpty(); - assertThat(getKeys(ShardRetryState.Serializer.makeKey("dummy").getKind())).isEmpty(); - assertThat(getKeys(IncrementalTaskState.Serializer.makeKey("dummy").getKind())) - .hasSize(numberOfMapreduces * (2 + 2 * inputStrings.size())); - } - - @Test - public void testCleanup_succeeds() throws Exception { - - // Create and run the mapreduce. - String jobId = createMapreduce("jobname"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - - // The desired entities should be present. - assertNumMapreducesAndShardedJobs(1, 3); - - // Now run the cleanup action. - setJobId(jobId); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_OK); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).isEqualTo( - jobId - + ": deletion requested\n" - + "successfully requested async deletion of 1 job(s); errors received on 0\n"); - - // The entities should still be there, because everything executes asynchronously, and in fact - // there should be more pipeline entities, because the deletion tasks are there as well. - // However, the MR-* sharded entities are gone for some reason. - assertThat(getKeys(JobRecord.DATA_STORE_KIND)).hasSize(17); - assertThat(getKeys(Slot.DATA_STORE_KIND)).hasSize(38); - assertThat(getKeys(ShardedValue.DATA_STORE_KIND)).isEmpty(); - assertThat(getKeys(Barrier.DATA_STORE_KIND)).hasSize(34); - assertThat(getKeys(JobInstanceRecord.DATA_STORE_KIND)).hasSize(17); - assertThat(getKeys(FanoutTaskRecord.DATA_STORE_KIND)).hasSize(21); - assertThat(getKeys("MR-ShardedJob")).isEmpty(); - assertThat(getKeys("MR-ShardedValue")).isEmpty(); - assertThat(getKeys(ShardRetryState.Serializer.makeKey("dummy").getKind())).isEmpty(); - assertThat(getKeys(IncrementalTaskState.Serializer.makeKey("dummy").getKind())) - .hasSize(2 + 2 * inputStrings.size()); - - // Run the async deletion. - executeTasksUntilEmpty(QUEUE_NAME, clock); - - // Everything should be gone. - assertNumMapreducesAndShardedJobs(0, 0); - } - - @Test - public void testNonexistentJobName_fails() { - setJobName("nonexistent"); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_OK); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).isEqualTo("No eligible jobs found"); - assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); - } - - @Test - public void testJobNameOfRecentJob_fails() throws Exception { - createMapreduce("jobname"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - setJobName("jobname"); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_OK); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); - assertThat(response.getPayload()).isEqualTo("No eligible jobs found"); - } - - @Test - public void testJobNameOfRecentJob_succeedsWithDaysOldParameter() throws Exception { - createMapreduce("jobname"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - setJobNameAndDaysOld("jobname", 0); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_OK); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).isEqualTo("A total of 1 job(s) processed"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - assertNumMapreducesAndShardedJobs(0, 0); - assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); - } - - @Test - public void testAnyJob_fails() throws Exception { - createMapreduce("jobname"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - setJobIdJobNameAndDaysOld( - Optional.empty(), Optional.empty(), Optional.empty()); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_OK); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).isEqualTo("No eligible jobs found"); - assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); - } - - @Test - public void testAnyJob_succeedsWithDaysOldParameter() throws Exception { - createMapreduce("jobname"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - setJobNameAndDaysOld("jobname", 0); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_OK); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).isEqualTo("A total of 1 job(s) processed"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - assertNumMapreducesAndShardedJobs(0, 0); - assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); - } - - @Test - public void testNonexistentJobId_succeeds() throws Exception { - setJobId("nonexistent"); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_OK); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).isEqualTo( - "nonexistent: deletion requested\n" - + "successfully requested async deletion of 1 job(s); errors received on 0\n"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - assertNumMapreducesAndShardedJobs(0, 0); - assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0); - } - - @Test - public void testDeleteTwoJobs_succeedsWithDaysOldParameter() throws Exception { - createMapreduce("jobname1"); - createMapreduce("jobname2"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - setAnyJobAndDaysOld(0); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_OK); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).isEqualTo("A total of 2 job(s) processed"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - assertNumMapreducesAndShardedJobs(0, 0); - assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); - } - - @Test - public void testDeleteTwoJobsInTwoBatches_succeeds() throws Exception { - createMapreduce("jobname1"); - createMapreduce("jobname2"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - setAnyJobAndDaysOld(0); - mapreduceEntityCleanupUtil.setMaxNumberOfJobsPerSearch(1); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_OK); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).isEqualTo("A total of 2 job(s) processed"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - assertNumMapreducesAndShardedJobs(0, 0); - assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(2); - } - - @Test - public void testDeleteOneOfTwoJobs_succeedsWithDaysOldParameter() throws Exception { - createMapreduce("jobname1"); - createMapreduce("jobname2"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - clock.setTo(DateTime.now(UTC)); - action = new MapreduceEntityCleanupAction( - Optional.empty(), // jobId - Optional.empty(), // jobName - Optional.of(1), // numJobsToDelete - Optional.of(0), // daysOld - Optional.empty(), // force - mapreduceEntityCleanupUtil, - clock, - DatastoreServiceFactory.getDatastoreService(), - response); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_OK); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).endsWith( - ": deletion requested\n" - + "successfully requested async deletion of 1 job(s); errors received on 0\n" - + "A total of 1 job(s) processed"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - assertNumMapreducesAndShardedJobs(1, 3); - assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); - } - - @Test - public void testDeleteOneOfTwoJobsByJobName_succeedsWithDaysOldParameter() throws Exception { - createMapreduce("jobname1"); - createMapreduce("jobname2"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - setJobNameAndDaysOld("jobname1", 0); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_OK); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).isEqualTo("A total of 1 job(s) processed"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - assertNumMapreducesAndShardedJobs(1, 3); - assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1); - } - - @Test - public void testDeleteOneOfTwoJobsByJobId_succeeds() throws Exception { - String jobId1 = createMapreduce("jobname1"); - createMapreduce("jobname2"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - setJobId(jobId1); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_OK); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).endsWith( - ": deletion requested\n" - + "successfully requested async deletion of 1 job(s); errors received on 0\n"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - assertNumMapreducesAndShardedJobs(1, 3); - assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0); - } - - @Test - public void testDeleteTwoJobsByJobId_succeeds() throws Exception { - String jobId1 = createMapreduce("jobname1"); - String jobId2 = createMapreduce("jobname2"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - setJobId(jobId1); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_OK); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).isEqualTo( - jobId1 - + ": deletion requested\n" - + "successfully requested async deletion of 1 job(s); errors received on 0\n"); - - FakeResponse response2 = new FakeResponse(); - clock.setTo(DateTime.now(UTC)); - action = new MapreduceEntityCleanupAction( - Optional.of(jobId2), // jobId - Optional.empty(), // jobName - Optional.empty(), // numJobsToDelete - Optional.empty(), // daysOld - Optional.empty(), // force - mapreduceEntityCleanupUtil, - clock, - DatastoreServiceFactory.getDatastoreService(), - response2); - - action.run(); - - assertThat(response2.getStatus()).isEqualTo(SC_OK); - assertThat(response2.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response2.getPayload()).isEqualTo( - jobId2 - + ": deletion requested\n" - + "successfully requested async deletion of 1 job(s); errors received on 0\n"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - assertNumMapreducesAndShardedJobs(0, 0); - assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0); - } - - @Test - public void testDeleteOfRunningJob_fails() throws Exception { - String jobId = createMapreduce("jobname"); - executeTasks(QUEUE_NAME, clock, Optional.of(10)); - setJobId(jobId); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_OK); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).endsWith( - ": Job is not in FINALIZED or STOPPED state\n" - + "successfully requested async deletion of 0 job(s); errors received on 1\n"); - assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0); - } - - @Test - public void testDeleteOfRunningJob_succeedsWithForce() throws Exception { - String jobId = createMapreduce("jobname"); - executeTasks(QUEUE_NAME, clock, Optional.of(10)); - clock.setTo(DateTime.now(UTC)); - action = new MapreduceEntityCleanupAction( - Optional.of(jobId), - Optional.empty(), // jobName - Optional.empty(), // numJobsToDelete - Optional.empty(), // daysOld - Optional.of(true), // force - mapreduceEntityCleanupUtil, - clock, - DatastoreServiceFactory.getDatastoreService(), - response); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_OK); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).endsWith( - ": deletion requested\n" - + "successfully requested async deletion of 1 job(s); errors received on 0\n"); - executeTasksUntilEmpty(QUEUE_NAME, clock); - assertNumMapreducesAndShardedJobs(0, 0); - assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0); - } - - @Test - public void testJobIdAndJobName_fails() { - setJobIdJobNameAndDaysOld( - Optional.of("jobid"), Optional.of("jobname"), Optional.empty()); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_BAD_REQUEST); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).isEqualTo("Do not specify both a job ID and a job name"); - assertNumMapreducesAndShardedJobs(0, 0); - } - - @Test - public void testJobIdAndDaysOld_fails() { - setJobIdJobNameAndDaysOld(Optional.of("jobid"), Optional.empty(), Optional.of(0)); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_BAD_REQUEST); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()) - .isEqualTo("Do not specify both a job ID and a days old threshold"); - assertNumMapreducesAndShardedJobs(0, 0); - } - - @Test - public void testJobIdAndNumJobs_fails() { - action = new MapreduceEntityCleanupAction( - Optional.of("jobid"), - Optional.empty(), // jobName - Optional.of(1), // numJobsToDelete - Optional.empty(), // daysOld - Optional.empty(), // force - mapreduceEntityCleanupUtil, - clock, - DatastoreServiceFactory.getDatastoreService(), - response); - - action.run(); - - assertThat(response.getStatus()).isEqualTo(SC_BAD_REQUEST); - assertThat(response.getContentType()).isEqualTo(PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()) - .isEqualTo("Do not specify both a job ID and a number of jobs to delete"); - assertNumMapreducesAndShardedJobs(0, 0); - } - - @Test - public void testDeleteZeroJobs_throwsUsageError() { - new MapreduceEntityCleanupAction( - Optional.empty(), // jobId - Optional.empty(), // jobName - Optional.of(0), // numJobsToDelete - Optional.empty(), // daysOld - Optional.empty(), // force - mapreduceEntityCleanupUtil, - clock, - DatastoreServiceFactory.getDatastoreService(), - response) - .run(); - assertThat(response.getStatus()).isEqualTo(SC_BAD_REQUEST); - assertThat(response.getPayload()) - .isEqualTo("Do not specify a non-positive integer for the number of jobs to delete"); - } - -} diff --git a/javatests/google/registry/batch/TestMapreduceEntityCleanupUtil.java b/javatests/google/registry/batch/TestMapreduceEntityCleanupUtil.java deleted file mode 100644 index 6f0b3a168..000000000 --- a/javatests/google/registry/batch/TestMapreduceEntityCleanupUtil.java +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2017 The Nomulus Authors. All Rights Reserved. -// -//Licensed under the Apache License, Version 2.0 (the "License"); -//you may not use this file except in compliance with the License. -//You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, software -//distributed under the License is distributed on an "AS IS" BASIS, -//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -//See the License for the specific language governing permissions and -//limitations under the License. - -package google.registry.batch; - -import java.util.Optional; -import javax.annotation.Nullable; -import org.joda.time.DateTime; - -/** - * Test harness for {@link MapreduceEntityCleanupUtil}. - * - *

It's a somewhat like a mock, in that it records the number of calls to {@link - * #findEligibleJobsByJobName}, and it allows the overriding of the number of jobs returned per - * search, to allow testing of subsequent searches. But it's not a full mock by any means. - */ -public class TestMapreduceEntityCleanupUtil extends MapreduceEntityCleanupUtil { - - private int maxNumberOfJobsPerSearch; - private int numSearches = 0; - - public TestMapreduceEntityCleanupUtil() { - this.maxNumberOfJobsPerSearch = MAX_NUMBER_OF_JOBS_PER_SEARCH; - } - - @Override - protected int getMaxNumberOfJobsPerSearch() { - return maxNumberOfJobsPerSearch; - } - - public void setMaxNumberOfJobsPerSearch(int maxNumberOfJobsPerSearch) { - this.maxNumberOfJobsPerSearch = maxNumberOfJobsPerSearch; - } - - @Override - public EligibleJobResults findEligibleJobsByJobName( - @Nullable String jobName, - DateTime cutoffDate, - Optional maxJobs, - boolean ignoreState, - Optional cursor) { - numSearches++; - return super.findEligibleJobsByJobName(jobName, cutoffDate, maxJobs, ignoreState, cursor); - } - - public int getNumSearchesPerformed() { - return numSearches; - } -} diff --git a/javatests/google/registry/module/backend/testdata/backend_routing.txt b/javatests/google/registry/module/backend/testdata/backend_routing.txt index c9731e33f..a0538b097 100644 --- a/javatests/google/registry/module/backend/testdata/backend_routing.txt +++ b/javatests/google/registry/module/backend/testdata/backend_routing.txt @@ -26,7 +26,6 @@ PATH CLASS METHOD /_dr/task/importRdeHosts RdeHostImportAction GET n INTERNAL APP IGNORED /_dr/task/linkRdeHosts RdeHostLinkAction GET n INTERNAL APP IGNORED /_dr/task/loadSnapshot LoadSnapshotAction POST n INTERNAL APP IGNORED -/_dr/task/mapreduceEntityCleanup MapreduceEntityCleanupAction GET n INTERNAL APP IGNORED /_dr/task/metrics MetricsExportAction POST n INTERNAL APP IGNORED /_dr/task/nordnUpload NordnUploadAction POST y INTERNAL APP IGNORED /_dr/task/nordnVerify NordnVerifyAction POST y INTERNAL APP IGNORED