mirror of
https://github.com/google/nomulus.git
synced 2025-05-13 16:07:15 +02:00
Add mapreduce cleanup action
------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=149432516
This commit is contained in:
parent
bd7db61606
commit
5d4287a375
11 changed files with 1171 additions and 2 deletions
|
@ -25,6 +25,7 @@ java_library(
|
||||||
"@com_google_appengine_api_1_0_sdk",
|
"@com_google_appengine_api_1_0_sdk",
|
||||||
"@com_google_appengine_tools_appengine_gcs_client",
|
"@com_google_appengine_tools_appengine_gcs_client",
|
||||||
"@com_google_appengine_tools_appengine_mapreduce",
|
"@com_google_appengine_tools_appengine_mapreduce",
|
||||||
|
"@com_google_appengine_tools_appengine_pipeline",
|
||||||
"@com_google_auto_factory",
|
"@com_google_auto_factory",
|
||||||
"@com_google_auto_value",
|
"@com_google_auto_value",
|
||||||
"@com_google_code_findbugs_jsr305",
|
"@com_google_code_findbugs_jsr305",
|
||||||
|
|
|
@ -14,12 +14,19 @@
|
||||||
|
|
||||||
package google.registry.batch;
|
package google.registry.batch;
|
||||||
|
|
||||||
|
import static google.registry.request.RequestParameters.extractOptionalBooleanParameter;
|
||||||
|
import static google.registry.request.RequestParameters.extractOptionalIntParameter;
|
||||||
|
import static google.registry.request.RequestParameters.extractOptionalParameter;
|
||||||
|
|
||||||
import com.google.api.services.bigquery.model.TableFieldSchema;
|
import com.google.api.services.bigquery.model.TableFieldSchema;
|
||||||
|
import com.google.common.base.Optional;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import dagger.Module;
|
import dagger.Module;
|
||||||
import dagger.Provides;
|
import dagger.Provides;
|
||||||
import dagger.multibindings.IntoMap;
|
import dagger.multibindings.IntoMap;
|
||||||
import dagger.multibindings.StringKey;
|
import dagger.multibindings.StringKey;
|
||||||
|
import google.registry.request.Parameter;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Dagger module for injecting common settings for batch actions.
|
* Dagger module for injecting common settings for batch actions.
|
||||||
|
@ -33,4 +40,34 @@ public class BatchModule {
|
||||||
static ImmutableList<TableFieldSchema> provideEntityIntegrityAlertsSchema() {
|
static ImmutableList<TableFieldSchema> provideEntityIntegrityAlertsSchema() {
|
||||||
return EntityIntegrityAlertsSchema.SCHEMA_FIELDS;
|
return EntityIntegrityAlertsSchema.SCHEMA_FIELDS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Provides
|
||||||
|
@Parameter("jobName")
|
||||||
|
static Optional<String> provideJobName(HttpServletRequest req) {
|
||||||
|
return extractOptionalParameter(req, "jobName");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Provides
|
||||||
|
@Parameter("jobId")
|
||||||
|
static Optional<String> provideJobId(HttpServletRequest req) {
|
||||||
|
return extractOptionalParameter(req, "jobId");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Provides
|
||||||
|
@Parameter("numJobsToDelete")
|
||||||
|
static Optional<Integer> provideNumJobsToDelete(HttpServletRequest req) {
|
||||||
|
return extractOptionalIntParameter(req, "numJobsToDelete");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Provides
|
||||||
|
@Parameter("daysOld")
|
||||||
|
static Optional<Integer> provideDaysOld(HttpServletRequest req) {
|
||||||
|
return extractOptionalIntParameter(req, "daysOld");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Provides
|
||||||
|
@Parameter("force")
|
||||||
|
static Optional<Boolean> provideForce(HttpServletRequest req) {
|
||||||
|
return extractOptionalBooleanParameter(req, "force");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
236
java/google/registry/batch/MapreduceEntityCleanupAction.java
Normal file
236
java/google/registry/batch/MapreduceEntityCleanupAction.java
Normal file
|
@ -0,0 +1,236 @@
|
||||||
|
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package google.registry.batch;
|
||||||
|
|
||||||
|
import com.google.appengine.api.datastore.DatastoreService;
|
||||||
|
import com.google.common.base.Optional;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import com.google.common.net.MediaType;
|
||||||
|
import google.registry.batch.MapreduceEntityCleanupUtil.EligibleJobResults;
|
||||||
|
import google.registry.mapreduce.MapreduceRunner;
|
||||||
|
import google.registry.request.Action;
|
||||||
|
import google.registry.request.Parameter;
|
||||||
|
import google.registry.request.Response;
|
||||||
|
import google.registry.util.Clock;
|
||||||
|
import google.registry.util.FormattingLogger;
|
||||||
|
import java.util.Set;
|
||||||
|
import javax.inject.Inject;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Action to delete entities associated with the App Engine Mapreduce library.
|
||||||
|
*
|
||||||
|
* <p>To delete a specific job, set the jobId parameter. To delete all jobs with a specific job name
|
||||||
|
* which are older than the specified age, set the jobName parameter. Otherwise, all jobs older than
|
||||||
|
* the specified age are deleted. Examples:
|
||||||
|
*
|
||||||
|
* <ul>
|
||||||
|
* <li>jobId=12345: delete only the root pipeline job with ID 12345, and all descendant jobs
|
||||||
|
* <li>jobName=Generate+Important+Files: delete all root pipeline jobs with the display name
|
||||||
|
* "Generate Important Files" (subject to the limits imposed by the daysOld and numJobsToDelete
|
||||||
|
* parameters), and all descendant jobs
|
||||||
|
* <li>(neither specified): delete all jobs (subject to the limits imposed by the daysOld and
|
||||||
|
* numJobsToDelete parameters)
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
* <p>More about display names: The pipeline library assigns each root pipeline job a "display
|
||||||
|
* name". You can see the display name of each job using the pipeline Web interface, available at
|
||||||
|
* /_ah/pipeline/list, where the display name column is confusingly labeled "Class Path". Usually,
|
||||||
|
* the display name is set to a fixed value by the mapreduce code. For instance, when a pipeline job
|
||||||
|
* is created by the {@link MapreduceRunner} class, the display name is set by the
|
||||||
|
* {@link MapreduceRunner#setJobName} method. When formulating a URL to invoke {@link
|
||||||
|
* MapreduceEntityCleanupAction}, the display name must of course be URL-encoded -- spaces are
|
||||||
|
* replaced by the plus sign, and so forth. For more information, see <a
|
||||||
|
* href="https://en.wikipedia.org/wiki/Percent-encoding">the Wikipedia article on percent
|
||||||
|
* encoding.</a>
|
||||||
|
*
|
||||||
|
* <p>The daysOld parameter specifies the minimum allowable age of a job in days for it to be
|
||||||
|
* eligible for deletion. Jobs will not be deleted if they are newer than this threshold, unless
|
||||||
|
* specifically named using the jobId parameter.
|
||||||
|
*
|
||||||
|
* <p>The numJobsToDelete parameter specifies the maximum number of jobs to delete. If this is fewer
|
||||||
|
* than would ordinarily be deleted, the jobs to be deleted are chosen arbitrarily.
|
||||||
|
*
|
||||||
|
* <p>The force parameter, if present and true, indicates that jobs should be deleted even if they
|
||||||
|
* are not in FINALIZED or STOPPED state.
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Action(path = "/_dr/task/mapreduceEntityCleanup")
|
||||||
|
public class MapreduceEntityCleanupAction implements Runnable {
|
||||||
|
|
||||||
|
private static final int DEFAULT_DAYS_OLD = 180;
|
||||||
|
private static final int DEFAULT_MAX_NUM_JOBS_TO_DELETE = 5;
|
||||||
|
|
||||||
|
private static final String ERROR_BOTH_JOB_ID_AND_NAME =
|
||||||
|
"Do not specify both a job ID and a job name";
|
||||||
|
private static final String ERROR_BOTH_JOB_ID_AND_NUMBER_OF_JOBS =
|
||||||
|
"Do not specify both a job ID and a number of jobs to delete";
|
||||||
|
private static final String ERROR_BOTH_JOB_ID_AND_DAYS_OLD =
|
||||||
|
"Do not specify both a job ID and a days old threshold";
|
||||||
|
|
||||||
|
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
|
||||||
|
|
||||||
|
private final Optional<String> jobId;
|
||||||
|
private final Optional<String> jobName;
|
||||||
|
private final Optional<Integer> numJobsToDelete;
|
||||||
|
private final Optional<Integer> daysOld;
|
||||||
|
private final Optional<Boolean> force;
|
||||||
|
private final MapreduceEntityCleanupUtil mapreduceEntityCleanupUtil;
|
||||||
|
private final Clock clock;
|
||||||
|
private final DatastoreService datastore;
|
||||||
|
private final Response response;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
MapreduceEntityCleanupAction(
|
||||||
|
@Parameter("jobId") Optional<String> jobId,
|
||||||
|
@Parameter("jobName") Optional<String> jobName,
|
||||||
|
@Parameter("numJobsToDelete") Optional<Integer> numJobsToDelete,
|
||||||
|
@Parameter("daysOld") Optional<Integer> daysOld,
|
||||||
|
@Parameter("force") Optional<Boolean> force,
|
||||||
|
MapreduceEntityCleanupUtil mapreduceEntityCleanupUtil,
|
||||||
|
Clock clock,
|
||||||
|
DatastoreService datastore,
|
||||||
|
Response response) {
|
||||||
|
this.jobId = jobId;
|
||||||
|
this.jobName = jobName;
|
||||||
|
this.numJobsToDelete = numJobsToDelete;
|
||||||
|
this.daysOld = daysOld;
|
||||||
|
this.force = force;
|
||||||
|
this.mapreduceEntityCleanupUtil = mapreduceEntityCleanupUtil;
|
||||||
|
this.clock = clock;
|
||||||
|
this.datastore = datastore;
|
||||||
|
this.response = response;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
if (jobId.isPresent()) {
|
||||||
|
runWithJobId();
|
||||||
|
} else {
|
||||||
|
runWithoutJobId();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void logSevereAndSetPayload(String message) {
|
||||||
|
logger.severe(message);
|
||||||
|
response.setPayload(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Delete the job with the specified job ID, checking for conflicting parameters. */
|
||||||
|
private void runWithJobId() {
|
||||||
|
if (jobName.isPresent()) {
|
||||||
|
logSevereAndSetPayload(ERROR_BOTH_JOB_ID_AND_NAME);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (numJobsToDelete.isPresent()) {
|
||||||
|
logSevereAndSetPayload(ERROR_BOTH_JOB_ID_AND_NUMBER_OF_JOBS);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (daysOld.isPresent()) {
|
||||||
|
logSevereAndSetPayload(ERROR_BOTH_JOB_ID_AND_DAYS_OLD);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
response.setPayload(requestDeletion(ImmutableSet.of(jobId.get()), true /* generatePayload */));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete jobs with a matching display name, or all jobs if no name is specified. Only pick jobs
|
||||||
|
* which are old enough.
|
||||||
|
*/
|
||||||
|
private void runWithoutJobId() {
|
||||||
|
int defaultedDaysOld = daysOld.or(DEFAULT_DAYS_OLD);
|
||||||
|
|
||||||
|
// Only generate the detailed response payload if there aren't too many jobs involved.
|
||||||
|
boolean generatePayload =
|
||||||
|
numJobsToDelete.isPresent() && (numJobsToDelete.get() <= DEFAULT_MAX_NUM_JOBS_TO_DELETE);
|
||||||
|
Optional<StringBuilder> payloadBuilder =
|
||||||
|
generatePayload ? Optional.of(new StringBuilder()) : Optional.<StringBuilder>absent();
|
||||||
|
String defaultPayload = "done";
|
||||||
|
|
||||||
|
// Since findEligibleJobsByJobName returns only a certain number of jobs, we must loop through
|
||||||
|
// until we find enough, requesting deletion as we go. We also stop if we don't find anything,
|
||||||
|
// or if there are no more jobs to be found (because no cursor is returned).
|
||||||
|
int numJobsDeletedSoFar = 0;
|
||||||
|
boolean isFirstTime = true;
|
||||||
|
Optional<String> cursor = Optional.<String>absent();
|
||||||
|
DateTime cutoffDate = clock.nowUtc().minusDays(defaultedDaysOld);
|
||||||
|
while ((isFirstTime || cursor.isPresent())
|
||||||
|
&& (!numJobsToDelete.isPresent() || (numJobsDeletedSoFar < numJobsToDelete.get()))) {
|
||||||
|
isFirstTime = false;
|
||||||
|
EligibleJobResults eligibleJobResults =
|
||||||
|
mapreduceEntityCleanupUtil.findEligibleJobsByJobName(
|
||||||
|
jobName.orNull(), cutoffDate, numJobsToDelete, force.or(false), cursor);
|
||||||
|
cursor = eligibleJobResults.cursor();
|
||||||
|
if (eligibleJobResults.eligibleJobs().isEmpty()) {
|
||||||
|
logger.infofmt(
|
||||||
|
"No eligible job with name '%s' older than %s days old.",
|
||||||
|
jobName.or("(null)"), defaultedDaysOld);
|
||||||
|
if (generatePayload) {
|
||||||
|
payloadBuilder.get().append("No eligible job.");
|
||||||
|
}
|
||||||
|
defaultPayload = "No eligible job.";
|
||||||
|
} else {
|
||||||
|
String payloadChunk = requestDeletion(eligibleJobResults.eligibleJobs(), generatePayload);
|
||||||
|
if (generatePayload) {
|
||||||
|
payloadBuilder.get().append(payloadChunk);
|
||||||
|
}
|
||||||
|
numJobsDeletedSoFar += eligibleJobResults.eligibleJobs().size();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.infofmt("A total of %s job(s) processed", numJobsDeletedSoFar);
|
||||||
|
if (generatePayload) {
|
||||||
|
payloadBuilder
|
||||||
|
.get()
|
||||||
|
.append(String.format("A total of %d job(s) processed\n", numJobsDeletedSoFar));
|
||||||
|
response.setPayload(payloadBuilder.get().toString());
|
||||||
|
} else {
|
||||||
|
response.setPayload(defaultPayload);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String requestDeletion(Set<String> actualJobIds, boolean generatePayload) {
|
||||||
|
Optional<StringBuilder> payloadChunkBuilder =
|
||||||
|
generatePayload ? Optional.of(new StringBuilder()) : Optional.<StringBuilder>absent();
|
||||||
|
int errorCount = 0;
|
||||||
|
for (String actualJobId : actualJobIds) {
|
||||||
|
Optional<String> error =
|
||||||
|
mapreduceEntityCleanupUtil.deleteJobAsync(datastore, actualJobId, force.or(false));
|
||||||
|
if (error.isPresent()) {
|
||||||
|
errorCount++;
|
||||||
|
}
|
||||||
|
logger.infofmt("%s: %s", actualJobId, error.or("deletion requested"));
|
||||||
|
if (payloadChunkBuilder.isPresent()) {
|
||||||
|
payloadChunkBuilder
|
||||||
|
.get()
|
||||||
|
.append(String.format("%s: %s\n", actualJobId, error.or("deletion requested")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logger.infofmt(
|
||||||
|
"successfully requested async deletion of %s job(s); errors received on %s",
|
||||||
|
actualJobIds.size() - errorCount,
|
||||||
|
errorCount);
|
||||||
|
if (payloadChunkBuilder.isPresent()) {
|
||||||
|
payloadChunkBuilder.get().append(String.format(
|
||||||
|
"successfully requested async deletion of %d job(s); errors received on %d\n",
|
||||||
|
actualJobIds.size() - errorCount,
|
||||||
|
errorCount));
|
||||||
|
return payloadChunkBuilder.get().toString();
|
||||||
|
} else {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
210
java/google/registry/batch/MapreduceEntityCleanupUtil.java
Normal file
210
java/google/registry/batch/MapreduceEntityCleanupUtil.java
Normal file
|
@ -0,0 +1,210 @@
|
||||||
|
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package google.registry.batch;
|
||||||
|
|
||||||
|
import com.google.appengine.api.datastore.BaseDatastoreService;
|
||||||
|
import com.google.appengine.api.datastore.Key;
|
||||||
|
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardedJobServiceFactory;
|
||||||
|
import com.google.appengine.tools.pipeline.NoSuchObjectException;
|
||||||
|
import com.google.appengine.tools.pipeline.impl.PipelineManager;
|
||||||
|
import com.google.appengine.tools.pipeline.impl.model.JobRecord;
|
||||||
|
import com.google.appengine.tools.pipeline.util.Pair;
|
||||||
|
import com.google.auto.value.AutoValue;
|
||||||
|
import com.google.common.base.Optional;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import javax.inject.Inject;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
/** Utilities used in mapreduce datastore entity cleanup. */
|
||||||
|
class MapreduceEntityCleanupUtil {
|
||||||
|
|
||||||
|
/** Number of jobs to fetch at a time using PipelineManager.queryRootPipelines. */
|
||||||
|
protected static final int MAX_NUMBER_OF_JOBS_PER_SEARCH = 100;
|
||||||
|
|
||||||
|
private static final ImmutableSet<String> JOB_PREFIXES =
|
||||||
|
ImmutableSet.of("", "map-", "sort-", "merge-", "reduce-");
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
MapreduceEntityCleanupUtil() {}
|
||||||
|
|
||||||
|
/** Return value from {@link #findEligibleJobsByJobName} */
|
||||||
|
@AutoValue
|
||||||
|
abstract static class EligibleJobResults {
|
||||||
|
static EligibleJobResults create(ImmutableSet<String> jobs, Optional<String> cursor) {
|
||||||
|
return new AutoValue_MapreduceEntityCleanupUtil_EligibleJobResults(jobs, cursor);
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract ImmutableSet<String> eligibleJobs();
|
||||||
|
abstract Optional<String> cursor();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the maximum number of jobs to return per search request.
|
||||||
|
*
|
||||||
|
* <p>This method is present to allow overriding by test subclasses.
|
||||||
|
*/
|
||||||
|
protected int getMaxNumberOfJobsPerSearch() {
|
||||||
|
return MAX_NUMBER_OF_JOBS_PER_SEARCH;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds the requested number of root pipeline jobs eligible for deletion.
|
||||||
|
*
|
||||||
|
* <p>Loops through the root jobs returned by the pipeline API, searching for those with a
|
||||||
|
* matching name in an appropriate state, and older than the specified cutoff date.
|
||||||
|
*
|
||||||
|
* <p>Regardless of the setting of maxJobs, a maximum of {@link
|
||||||
|
* #MAX_NUMBER_OF_JOBS_PER_SEARCH} will be returned. If there might be more jobs available to
|
||||||
|
* find, a cursor will be returned, which can be used in a subsequent call to {@link
|
||||||
|
* #findEligibleJobsByJobName} to continue the search.
|
||||||
|
*
|
||||||
|
* @param jobName the desired job name; if null, all jobs are considered to match
|
||||||
|
* @param cutoffDate eligible jobs must have both startTime and endTime before cutoffDate; if
|
||||||
|
* startTime and/or endTime are null, they are considered to be old enough -- this is because
|
||||||
|
* many jobs do lack at least one of these, and we don't want such jobs to stick around
|
||||||
|
* forever and not get deleted
|
||||||
|
* @param maxJobs the maximum number of jobs to return; if absent, return all eligible jobs (see
|
||||||
|
* note above about {@link #MAX_NUMBER_OF_JOBS_PER_SEARCH})
|
||||||
|
* @param ignoreState if true, jobs will be included regardless of the state
|
||||||
|
* @param cursor if present, a cursor returned from a previous call to the method; the search will
|
||||||
|
* be picked up where it left off
|
||||||
|
* @return job IDs of the eligible jobs
|
||||||
|
*/
|
||||||
|
EligibleJobResults findEligibleJobsByJobName(
|
||||||
|
@Nullable String jobName,
|
||||||
|
DateTime cutoffDate,
|
||||||
|
Optional<Integer> maxJobs,
|
||||||
|
boolean ignoreState,
|
||||||
|
Optional<String> cursor) {
|
||||||
|
if (maxJobs.isPresent() && (maxJobs.get() <= 0)) {
|
||||||
|
return EligibleJobResults.create(ImmutableSet.<String>of(), Optional.<String>absent());
|
||||||
|
}
|
||||||
|
Set<String> eligibleJobs = new HashSet<>();
|
||||||
|
Pair<? extends Iterable<JobRecord>, String> pair =
|
||||||
|
PipelineManager.queryRootPipelines(jobName, cursor.orNull(), getMaxNumberOfJobsPerSearch());
|
||||||
|
for (JobRecord jobRecord : pair.getFirst()) {
|
||||||
|
if (((jobRecord.getStartTime() == null)
|
||||||
|
|| jobRecord.getStartTime().before(cutoffDate.toDate()))
|
||||||
|
&& ((jobRecord.getEndTime() == null)
|
||||||
|
|| jobRecord.getEndTime().before(cutoffDate.toDate()))
|
||||||
|
&& (ignoreState
|
||||||
|
|| (jobRecord.getState() == JobRecord.State.FINALIZED)
|
||||||
|
|| (jobRecord.getState() == JobRecord.State.STOPPED))) {
|
||||||
|
eligibleJobs.add(jobRecord.getRootJobKey().getName());
|
||||||
|
if (maxJobs.isPresent() && (eligibleJobs.size() >= maxJobs.get())) {
|
||||||
|
return EligibleJobResults.create(
|
||||||
|
ImmutableSet.copyOf(eligibleJobs), Optional.<String>absent());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return EligibleJobResults.create(
|
||||||
|
ImmutableSet.copyOf(eligibleJobs), Optional.fromNullable(pair.getSecond()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Requests asynchronous deletion of entities associated with the specified job ID.
|
||||||
|
*
|
||||||
|
* <p>The mapreduce API is used to delete the MR-* entities, and the pipeline API is used to
|
||||||
|
* delete the main job records. No attempt is made to check whether the deletion succeeds, only
|
||||||
|
* whether it appeared to be a valid deletion request up front.
|
||||||
|
*
|
||||||
|
* @param datastore The datastore service, which can be either synchronous or asynchronous, since
|
||||||
|
* the only interaction with the database is via prepared queries
|
||||||
|
* @param jobId the root pipeline job ID to be deleted; if the jobId does not exist, the deletion
|
||||||
|
* will be apparently successful, because the underlying library routines do not complain
|
||||||
|
* @param force passed to the pipeline API, indicating whether jobs should be forcibly deleted
|
||||||
|
* even if they are not in a completed state; however, there is no force flag on the mapreduce
|
||||||
|
* API call, meaning that running jobs cannot be deleted
|
||||||
|
* @return an error string, or absent if no error was detected
|
||||||
|
*/
|
||||||
|
Optional<String> deleteJobAsync(
|
||||||
|
BaseDatastoreService datastore, String jobId, boolean force) {
|
||||||
|
|
||||||
|
// Try to delete the MR-* entities. This is always done asynchronously. A return value of false
|
||||||
|
// indicates that the job is in RUNNING state, and nothing has been done.
|
||||||
|
// TODO(mountford) check the state of all sharded jobs before deleting any
|
||||||
|
for (String mrShardedJobId : getPossibleIdsForPipelineJob(datastore, jobId)) {
|
||||||
|
if (!ShardedJobServiceFactory.getShardedJobService().cleanupJob(mrShardedJobId)) {
|
||||||
|
return Optional.of(String.format("Skipping; job %s is in running state", mrShardedJobId));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we are successful (meaning, MR-* entity deletion has been kicked off asynchronously),
|
||||||
|
// delete the pipeline-* entities as well.
|
||||||
|
try {
|
||||||
|
PipelineManager.deletePipelineRecords(jobId, force, true /* async */);
|
||||||
|
return Optional.absent();
|
||||||
|
} catch (NoSuchObjectException ex) {
|
||||||
|
return Optional.of("No such pipeline job");
|
||||||
|
} catch (IllegalStateException ex) {
|
||||||
|
return Optional.of("Job is not in FINALIZED or STOPPED state");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the possible MR-ShardedJob IDs associated with the specified pipeline job and any child
|
||||||
|
* jobs.
|
||||||
|
*
|
||||||
|
* @param datastore The datastore service, which can be either synchronous or asynchronous, since
|
||||||
|
* the only interaction with the database is via prepared queries
|
||||||
|
* @param jobId The pipeline job ID
|
||||||
|
* @return the IDs of MR-ShardedJob entities that the Mapreduce library might have created,
|
||||||
|
* depending on which steps of the mapreduce were used
|
||||||
|
*/
|
||||||
|
private ImmutableSet<String> getPossibleIdsForPipelineJob(
|
||||||
|
BaseDatastoreService datastore, String jobId) {
|
||||||
|
return getPossibleIdsForPipelineJobRecur(datastore, jobId, new HashSet<String>());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called by getPossibleIdsForPipelineJob(), and by itself recursively.
|
||||||
|
*
|
||||||
|
* @param datastore The datastore service, which can be either synchronous or asynchronous, since
|
||||||
|
* the only interaction with the database is via prepared queries
|
||||||
|
* @param jobId The pipeline job ID
|
||||||
|
* @param handledJobIds The set of job IDs which have been handled so far; this is a sanity check
|
||||||
|
* to prevent an infinite loop if, for some crazy reason, the job dependency graph is cyclic
|
||||||
|
* @return the IDs of MR-ShardedJob entities that the Mapreduce library might have created,
|
||||||
|
* depending on which steps of the mapreduce were used
|
||||||
|
*/
|
||||||
|
private ImmutableSet<String> getPossibleIdsForPipelineJobRecur(
|
||||||
|
BaseDatastoreService datastore, String jobId, Set<String> handledJobIds) {
|
||||||
|
if (handledJobIds.contains(jobId)) {
|
||||||
|
return ImmutableSet.<String>of();
|
||||||
|
}
|
||||||
|
handledJobIds.add(jobId);
|
||||||
|
|
||||||
|
JobRecord jobRecord;
|
||||||
|
try {
|
||||||
|
jobRecord = PipelineManager.getJob(jobId);
|
||||||
|
} catch (NoSuchObjectException e) {
|
||||||
|
return ImmutableSet.<String>of();
|
||||||
|
}
|
||||||
|
|
||||||
|
ImmutableSet.Builder<String> idSetBuilder = new ImmutableSet.Builder<>();
|
||||||
|
for (String jobPrefix : JOB_PREFIXES) {
|
||||||
|
idSetBuilder.add("MR-ShardedJob", jobPrefix + jobId);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Key childKey : jobRecord.getChildKeys()) {
|
||||||
|
idSetBuilder
|
||||||
|
.addAll(getPossibleIdsForPipelineJobRecur(datastore, childKey.getName(), handledJobIds));
|
||||||
|
}
|
||||||
|
return idSetBuilder.build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -143,6 +143,12 @@
|
||||||
<url-pattern>/_dr/task/pollBigqueryJob</url-pattern>
|
<url-pattern>/_dr/task/pollBigqueryJob</url-pattern>
|
||||||
</servlet-mapping>
|
</servlet-mapping>
|
||||||
|
|
||||||
|
<!-- Cleans up old mapreduce entities. -->
|
||||||
|
<servlet-mapping>
|
||||||
|
<servlet-name>backend-servlet</servlet-name>
|
||||||
|
<url-pattern>/_dr/task/mapreduceEntityCleanup</url-pattern>
|
||||||
|
</servlet-mapping>
|
||||||
|
|
||||||
<!-- Fans out a cron task over an adjustable range of TLDs. -->
|
<!-- Fans out a cron task over an adjustable range of TLDs. -->
|
||||||
<servlet-mapping>
|
<servlet-mapping>
|
||||||
<servlet-name>backend-servlet</servlet-name>
|
<servlet-name>backend-servlet</servlet-name>
|
||||||
|
|
|
@ -25,6 +25,7 @@ import google.registry.batch.BatchModule;
|
||||||
import google.registry.batch.DeleteContactsAndHostsAction;
|
import google.registry.batch.DeleteContactsAndHostsAction;
|
||||||
import google.registry.batch.DeleteProberDataAction;
|
import google.registry.batch.DeleteProberDataAction;
|
||||||
import google.registry.batch.ExpandRecurringBillingEventsAction;
|
import google.registry.batch.ExpandRecurringBillingEventsAction;
|
||||||
|
import google.registry.batch.MapreduceEntityCleanupAction;
|
||||||
import google.registry.batch.RefreshDnsOnHostRenameAction;
|
import google.registry.batch.RefreshDnsOnHostRenameAction;
|
||||||
import google.registry.batch.VerifyEntityIntegrityAction;
|
import google.registry.batch.VerifyEntityIntegrityAction;
|
||||||
import google.registry.cron.CommitLogFanoutAction;
|
import google.registry.cron.CommitLogFanoutAction;
|
||||||
|
@ -110,6 +111,7 @@ interface BackendRequestComponent {
|
||||||
ExportReservedTermsAction exportReservedTermsAction();
|
ExportReservedTermsAction exportReservedTermsAction();
|
||||||
ExportSnapshotAction exportSnapshotAction();
|
ExportSnapshotAction exportSnapshotAction();
|
||||||
LoadSnapshotAction loadSnapshotAction();
|
LoadSnapshotAction loadSnapshotAction();
|
||||||
|
MapreduceEntityCleanupAction mapreduceEntityCleanupAction();
|
||||||
MetricsExportAction metricsExportAction();
|
MetricsExportAction metricsExportAction();
|
||||||
NordnUploadAction nordnUploadAction();
|
NordnUploadAction nordnUploadAction();
|
||||||
NordnVerifyAction nordnVerifyAction();
|
NordnVerifyAction nordnVerifyAction();
|
||||||
|
|
|
@ -114,6 +114,19 @@ public final class RequestParameters {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns first GET or POST parameter associated with {@code name} as a boolean.
|
||||||
|
*
|
||||||
|
* @throws BadRequestException if request parameter is present but not a valid boolean
|
||||||
|
*/
|
||||||
|
public static Optional<Boolean> extractOptionalBooleanParameter(
|
||||||
|
HttpServletRequest req, String name) {
|
||||||
|
String stringParam = req.getParameter(name);
|
||||||
|
return isNullOrEmpty(stringParam)
|
||||||
|
? Optional.<Boolean>absent()
|
||||||
|
: Optional.of(Boolean.valueOf(stringParam));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns {@code true} if parameter is present and not empty and not {@code "false"}.
|
* Returns {@code true} if parameter is present and not empty and not {@code "false"}.
|
||||||
*
|
*
|
||||||
|
|
|
@ -24,6 +24,8 @@ java_library(
|
||||||
"@com_google_appengine_api_1_0_sdk//:testonly",
|
"@com_google_appengine_api_1_0_sdk//:testonly",
|
||||||
"@com_google_appengine_api_stubs",
|
"@com_google_appengine_api_stubs",
|
||||||
"@com_google_appengine_tools_appengine_gcs_client",
|
"@com_google_appengine_tools_appengine_gcs_client",
|
||||||
|
"@com_google_appengine_tools_appengine_mapreduce",
|
||||||
|
"@com_google_appengine_tools_appengine_pipeline",
|
||||||
"@com_google_code_findbugs_jsr305",
|
"@com_google_code_findbugs_jsr305",
|
||||||
"@com_google_dagger",
|
"@com_google_dagger",
|
||||||
"@com_google_guava",
|
"@com_google_guava",
|
||||||
|
@ -40,7 +42,7 @@ java_library(
|
||||||
|
|
||||||
GenTestRules(
|
GenTestRules(
|
||||||
name = "GeneratedTestRules",
|
name = "GeneratedTestRules",
|
||||||
default_test_size = "medium",
|
default_test_size = "large",
|
||||||
test_files = glob(["*Test.java"]),
|
test_files = glob(["*Test.java"]),
|
||||||
deps = [":batch"],
|
deps = [":batch"],
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,584 @@
|
||||||
|
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package google.registry.batch;
|
||||||
|
|
||||||
|
import static com.google.appengine.api.datastore.DatastoreServiceFactory.getDatastoreService;
|
||||||
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
|
import static javax.servlet.http.HttpServletResponse.SC_OK;
|
||||||
|
import static org.joda.time.DateTimeZone.UTC;
|
||||||
|
|
||||||
|
import com.google.appengine.api.datastore.DatastoreService;
|
||||||
|
import com.google.appengine.api.datastore.DatastoreServiceFactory;
|
||||||
|
import com.google.appengine.api.datastore.Entity;
|
||||||
|
import com.google.appengine.api.datastore.FetchOptions;
|
||||||
|
import com.google.appengine.api.datastore.Query;
|
||||||
|
import com.google.appengine.tools.mapreduce.MapReduceJob;
|
||||||
|
import com.google.appengine.tools.mapreduce.MapReduceSettings;
|
||||||
|
import com.google.appengine.tools.mapreduce.MapReduceSpecification;
|
||||||
|
import com.google.appengine.tools.mapreduce.Mapper;
|
||||||
|
import com.google.appengine.tools.mapreduce.Reducer;
|
||||||
|
import com.google.appengine.tools.mapreduce.ReducerInput;
|
||||||
|
import com.google.appengine.tools.mapreduce.impl.shardedjob.IncrementalTaskState;
|
||||||
|
import com.google.appengine.tools.mapreduce.impl.shardedjob.ShardRetryState;
|
||||||
|
import com.google.appengine.tools.mapreduce.inputs.InMemoryInput;
|
||||||
|
import com.google.appengine.tools.mapreduce.outputs.InMemoryOutput;
|
||||||
|
import com.google.appengine.tools.pipeline.JobSetting;
|
||||||
|
import com.google.appengine.tools.pipeline.PipelineService;
|
||||||
|
import com.google.appengine.tools.pipeline.PipelineServiceFactory;
|
||||||
|
import com.google.appengine.tools.pipeline.impl.model.Barrier;
|
||||||
|
import com.google.appengine.tools.pipeline.impl.model.FanoutTaskRecord;
|
||||||
|
import com.google.appengine.tools.pipeline.impl.model.JobInstanceRecord;
|
||||||
|
import com.google.appengine.tools.pipeline.impl.model.JobRecord;
|
||||||
|
import com.google.appengine.tools.pipeline.impl.model.ShardedValue;
|
||||||
|
import com.google.appengine.tools.pipeline.impl.model.Slot;
|
||||||
|
import com.google.common.base.Optional;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.net.MediaType;
|
||||||
|
import google.registry.testing.ExceptionRule;
|
||||||
|
import google.registry.testing.FakeClock;
|
||||||
|
import google.registry.testing.FakeResponse;
|
||||||
|
import google.registry.testing.mapreduce.MapreduceTestCase;
|
||||||
|
import java.util.List;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.JUnit4;
|
||||||
|
|
||||||
|
@RunWith(JUnit4.class)
|
||||||
|
public class MapreduceEntityCleanupActionTest
|
||||||
|
extends MapreduceTestCase<MapreduceEntityCleanupAction> {
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public final ExceptionRule thrown = new ExceptionRule();
|
||||||
|
|
||||||
|
private static final DatastoreService datastore = getDatastoreService();
|
||||||
|
private static final FetchOptions FETCH_OPTIONS = FetchOptions.Builder.withChunkSize(200);
|
||||||
|
private static final String QUEUE_NAME = "mapreduce";
|
||||||
|
|
||||||
|
private final TestMapreduceEntityCleanupUtil mapreduceEntityCleanupUtil =
|
||||||
|
new TestMapreduceEntityCleanupUtil();
|
||||||
|
private final FakeClock clock = new FakeClock(DateTime.now(UTC));
|
||||||
|
private final FakeResponse response = new FakeResponse();
|
||||||
|
|
||||||
|
private static final ImmutableList<List<String>> inputStrings = ImmutableList.<List<String>>of(
|
||||||
|
ImmutableList.of("a", "b", "c"),
|
||||||
|
ImmutableList.of("d", "e", "f", "g", "h"),
|
||||||
|
ImmutableList.of("i", "j", "k"),
|
||||||
|
ImmutableList.of("l"),
|
||||||
|
ImmutableList.of("m", "n"));
|
||||||
|
|
||||||
|
private static final InMemoryInput<String> input = new InMemoryInput<>(inputStrings);
|
||||||
|
|
||||||
|
private static class TestMapper extends Mapper<String, String, String> {
|
||||||
|
private static final long serialVersionUID = 8472979502634351364L;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void map(String s) {
|
||||||
|
emit(s, s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestReducer extends Reducer<String, String, String> {
|
||||||
|
private static final long serialVersionUID = 5344368517893904668L;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reduce(final String key, ReducerInput<String> values) {
|
||||||
|
while (values.hasNext()) {
|
||||||
|
emit(values.next());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String createMapreduce(String jobName) throws Exception {
|
||||||
|
MapReduceJob<String, String, String, String, List<List<String>>> mapReduceJob =
|
||||||
|
new MapReduceJob<>(
|
||||||
|
new MapReduceSpecification.Builder<String, String, String, String, List<List<String>>>()
|
||||||
|
.setJobName(jobName)
|
||||||
|
.setInput(input)
|
||||||
|
.setMapper(new TestMapper())
|
||||||
|
.setReducer(new TestReducer())
|
||||||
|
.setOutput(new InMemoryOutput<String>())
|
||||||
|
.setNumReducers(2)
|
||||||
|
.build(),
|
||||||
|
new MapReduceSettings.Builder().setWorkerQueueName(QUEUE_NAME).build());
|
||||||
|
PipelineService pipelineService = PipelineServiceFactory.newPipelineService();
|
||||||
|
return pipelineService.startNewPipeline(mapReduceJob, new JobSetting.OnQueue(QUEUE_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
private void setJobIdAndJobName(Optional<String> jobId, Optional<String> jobName) {
|
||||||
|
setJobIdJobNameAndDaysOld(jobId, jobName, Optional.<Integer>absent());
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
private void setAnyJob() {
|
||||||
|
setJobIdJobNameAndDaysOld(
|
||||||
|
Optional.<String>absent(), Optional.<String>absent(), Optional.<Integer>absent());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setAnyJobAndDaysOld(int daysOld) {
|
||||||
|
setJobIdJobNameAndDaysOld(
|
||||||
|
Optional.<String>absent(), Optional.<String>absent(), Optional.<Integer>of(daysOld));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setJobId(String jobId) {
|
||||||
|
setJobIdJobNameAndDaysOld(
|
||||||
|
Optional.of(jobId), Optional.<String>absent(), Optional.<Integer>absent());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setJobName(String jobName) {
|
||||||
|
setJobIdJobNameAndDaysOld(
|
||||||
|
Optional.<String>absent(), Optional.of(jobName), Optional.<Integer>absent());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setJobNameAndDaysOld(String jobName, int daysOld) {
|
||||||
|
setJobIdJobNameAndDaysOld(
|
||||||
|
Optional.<String>absent(), Optional.of(jobName), Optional.<Integer>of(daysOld));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setJobIdJobNameAndDaysOld(
|
||||||
|
Optional<String> jobId, Optional<String> jobName, Optional<Integer> daysOld) {
|
||||||
|
clock.setTo(DateTime.now(UTC));
|
||||||
|
action = new MapreduceEntityCleanupAction(
|
||||||
|
jobId,
|
||||||
|
jobName,
|
||||||
|
Optional.<Integer>absent(), // numJobsToDelete
|
||||||
|
daysOld,
|
||||||
|
Optional.<Boolean>absent(), // force
|
||||||
|
mapreduceEntityCleanupUtil,
|
||||||
|
clock,
|
||||||
|
DatastoreServiceFactory.getDatastoreService(),
|
||||||
|
response);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the keys of a particular kind.
|
||||||
|
*
|
||||||
|
* <p>We really just care about the count of keys, but the exception messages are much clearer if
|
||||||
|
* we print out the complete list, rather than just the count.
|
||||||
|
*/
|
||||||
|
private static List<Entity> getKeys(String kind) {
|
||||||
|
return datastore.prepare(new Query(kind).setKeysOnly()).asList(FETCH_OPTIONS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void assertNumMapreducesAndShardedJobs(
|
||||||
|
int numberOfMapreduces, int numberOfShardedJobEntities) {
|
||||||
|
assertThat(getKeys(JobRecord.DATA_STORE_KIND)).hasSize(14 * numberOfMapreduces);
|
||||||
|
assertThat(getKeys(Slot.DATA_STORE_KIND)).hasSize(32 * numberOfMapreduces);
|
||||||
|
assertThat(getKeys(ShardedValue.DATA_STORE_KIND)).isEmpty();
|
||||||
|
assertThat(getKeys(Barrier.DATA_STORE_KIND)).hasSize(28 * numberOfMapreduces);
|
||||||
|
assertThat(getKeys(JobInstanceRecord.DATA_STORE_KIND)).hasSize(14 * numberOfMapreduces);
|
||||||
|
assertThat(getKeys(FanoutTaskRecord.DATA_STORE_KIND)).hasSize(18 * numberOfMapreduces);
|
||||||
|
assertThat(getKeys("MR-ShardedJob")).hasSize(numberOfShardedJobEntities);
|
||||||
|
assertThat(getKeys("MR-ShardedValue")).isEmpty();
|
||||||
|
assertThat(getKeys(ShardRetryState.Serializer.makeKey("dummy").getKind())).isEmpty();
|
||||||
|
assertThat(getKeys(IncrementalTaskState.Serializer.makeKey("dummy").getKind()))
|
||||||
|
.hasSize(numberOfMapreduces * (2 + 2 * inputStrings.size()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCleanup_succeeds() throws Exception {
|
||||||
|
|
||||||
|
// Create and run the mapreduce.
|
||||||
|
String jobId = createMapreduce("jobname");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
|
||||||
|
// The desired entities should be present.
|
||||||
|
assertNumMapreducesAndShardedJobs(1, 3);
|
||||||
|
|
||||||
|
// Now run the cleanup action.
|
||||||
|
setJobId(jobId);
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).isEqualTo(
|
||||||
|
jobId
|
||||||
|
+ ": deletion requested\n"
|
||||||
|
+ "successfully requested async deletion of 1 job(s); errors received on 0\n");
|
||||||
|
|
||||||
|
// The entities should still be there, because everything executes asynchronously, and in fact
|
||||||
|
// there should be more pipeline entities, because the deletion tasks are there as well.
|
||||||
|
// However, the MR-* sharded entities are gone for some reason.
|
||||||
|
assertThat(getKeys(JobRecord.DATA_STORE_KIND)).hasSize(17);
|
||||||
|
assertThat(getKeys(Slot.DATA_STORE_KIND)).hasSize(38);
|
||||||
|
assertThat(getKeys(ShardedValue.DATA_STORE_KIND)).isEmpty();
|
||||||
|
assertThat(getKeys(Barrier.DATA_STORE_KIND)).hasSize(34);
|
||||||
|
assertThat(getKeys(JobInstanceRecord.DATA_STORE_KIND)).hasSize(17);
|
||||||
|
assertThat(getKeys(FanoutTaskRecord.DATA_STORE_KIND)).hasSize(21);
|
||||||
|
assertThat(getKeys("MR-ShardedJob")).isEmpty();
|
||||||
|
assertThat(getKeys("MR-ShardedValue")).isEmpty();
|
||||||
|
assertThat(getKeys(ShardRetryState.Serializer.makeKey("dummy").getKind())).isEmpty();
|
||||||
|
assertThat(getKeys(IncrementalTaskState.Serializer.makeKey("dummy").getKind()))
|
||||||
|
.hasSize(2 + 2 * inputStrings.size());
|
||||||
|
|
||||||
|
// Run the async deletion.
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
|
||||||
|
// Everything should be gone.
|
||||||
|
assertNumMapreducesAndShardedJobs(0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNonexistentJobName_fails() throws Exception {
|
||||||
|
setJobName("nonexistent");
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).isEqualTo("No eligible job.");
|
||||||
|
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJobNameOfRecentJob_fails() throws Exception {
|
||||||
|
createMapreduce("jobname");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
setJobName("jobname");
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
|
||||||
|
assertThat(response.getPayload()).isEqualTo("No eligible job.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJobNameOfRecentJob_succeedsWithDaysOldParameter() throws Exception {
|
||||||
|
createMapreduce("jobname");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
setJobNameAndDaysOld("jobname", 0);
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).isEqualTo("done");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
assertNumMapreducesAndShardedJobs(0, 0);
|
||||||
|
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteZeroJobs_succeeds() throws Exception {
|
||||||
|
createMapreduce("jobname");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
action = new MapreduceEntityCleanupAction(
|
||||||
|
Optional.<String>absent(), // jobId
|
||||||
|
Optional.<String>absent(), // jobName
|
||||||
|
Optional.<Integer>of(0), // numJobsToDelete
|
||||||
|
Optional.<Integer>absent(), // daysOld
|
||||||
|
Optional.<Boolean>absent(), // force
|
||||||
|
mapreduceEntityCleanupUtil,
|
||||||
|
clock,
|
||||||
|
DatastoreServiceFactory.getDatastoreService(),
|
||||||
|
response);
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).isEqualTo("A total of 0 job(s) processed\n");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
assertNumMapreducesAndShardedJobs(1, 3);
|
||||||
|
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAnyJob_fails() throws Exception {
|
||||||
|
createMapreduce("jobname");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
setAnyJob();
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).isEqualTo("No eligible job.");
|
||||||
|
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAnyJob_succeedsWithDaysOldParameter() throws Exception {
|
||||||
|
createMapreduce("jobname");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
setJobNameAndDaysOld("jobname", 0);
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).isEqualTo("done");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
assertNumMapreducesAndShardedJobs(0, 0);
|
||||||
|
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNonexistentJobId_succeeds() throws Exception {
|
||||||
|
setJobId("nonexistent");
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).isEqualTo(
|
||||||
|
"nonexistent: deletion requested\n"
|
||||||
|
+ "successfully requested async deletion of 1 job(s); errors received on 0\n");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
assertNumMapreducesAndShardedJobs(0, 0);
|
||||||
|
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteTwoJobs_succeedsWithDaysOldParameter() throws Exception {
|
||||||
|
createMapreduce("jobname1");
|
||||||
|
createMapreduce("jobname2");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
setAnyJobAndDaysOld(0);
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).isEqualTo("done");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
assertNumMapreducesAndShardedJobs(0, 0);
|
||||||
|
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteTwoJobsInTwoBatches_succeeds() throws Exception {
|
||||||
|
createMapreduce("jobname1");
|
||||||
|
createMapreduce("jobname2");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
setAnyJobAndDaysOld(0);
|
||||||
|
mapreduceEntityCleanupUtil.setMaxNumberOfJobsPerSearch(1);
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).isEqualTo("done");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
assertNumMapreducesAndShardedJobs(0, 0);
|
||||||
|
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteOneOfTwoJobs_succeedsWithDaysOldParameter() throws Exception {
|
||||||
|
createMapreduce("jobname1");
|
||||||
|
createMapreduce("jobname2");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
clock.setTo(DateTime.now(UTC));
|
||||||
|
action = new MapreduceEntityCleanupAction(
|
||||||
|
Optional.<String>absent(), // jobId
|
||||||
|
Optional.<String>absent(), // jobName
|
||||||
|
Optional.<Integer>of(1), // numJobsToDelete
|
||||||
|
Optional.<Integer>of(0), // daysOld
|
||||||
|
Optional.<Boolean>absent(), // force
|
||||||
|
mapreduceEntityCleanupUtil,
|
||||||
|
clock,
|
||||||
|
DatastoreServiceFactory.getDatastoreService(),
|
||||||
|
response);
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).endsWith(
|
||||||
|
": deletion requested\n"
|
||||||
|
+ "successfully requested async deletion of 1 job(s); errors received on 0\n"
|
||||||
|
+ "A total of 1 job(s) processed\n");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
assertNumMapreducesAndShardedJobs(1, 3);
|
||||||
|
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteOneOfTwoJobsByJobName_succeedsWithDaysOldParameter() throws Exception {
|
||||||
|
createMapreduce("jobname1");
|
||||||
|
createMapreduce("jobname2");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
setJobNameAndDaysOld("jobname1", 0);
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).isEqualTo("done");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
assertNumMapreducesAndShardedJobs(1, 3);
|
||||||
|
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteOneOfTwoJobsByJobId_succeeds() throws Exception {
|
||||||
|
String jobId1 = createMapreduce("jobname1");
|
||||||
|
createMapreduce("jobname2");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
setJobId(jobId1);
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).endsWith(
|
||||||
|
": deletion requested\n"
|
||||||
|
+ "successfully requested async deletion of 1 job(s); errors received on 0\n");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
assertNumMapreducesAndShardedJobs(1, 3);
|
||||||
|
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteTwoJobsByJobId_succeeds() throws Exception {
|
||||||
|
String jobId1 = createMapreduce("jobname1");
|
||||||
|
String jobId2 = createMapreduce("jobname2");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
setJobId(jobId1);
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).isEqualTo(
|
||||||
|
jobId1
|
||||||
|
+ ": deletion requested\n"
|
||||||
|
+ "successfully requested async deletion of 1 job(s); errors received on 0\n");
|
||||||
|
|
||||||
|
FakeResponse response2 = new FakeResponse();
|
||||||
|
clock.setTo(DateTime.now(UTC));
|
||||||
|
action = new MapreduceEntityCleanupAction(
|
||||||
|
Optional.of(jobId2), // jobId
|
||||||
|
Optional.<String>absent(), // jobName
|
||||||
|
Optional.<Integer>absent(), // numJobsToDelete
|
||||||
|
Optional.<Integer>absent(), // daysOld
|
||||||
|
Optional.<Boolean>absent(), // force
|
||||||
|
mapreduceEntityCleanupUtil,
|
||||||
|
clock,
|
||||||
|
DatastoreServiceFactory.getDatastoreService(),
|
||||||
|
response2);
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response2.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response2.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response2.getPayload()).isEqualTo(
|
||||||
|
jobId2
|
||||||
|
+ ": deletion requested\n"
|
||||||
|
+ "successfully requested async deletion of 1 job(s); errors received on 0\n");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
assertNumMapreducesAndShardedJobs(0, 0);
|
||||||
|
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteOfRunningJob_fails() throws Exception {
|
||||||
|
String jobId = createMapreduce("jobname");
|
||||||
|
executeTasks(QUEUE_NAME, clock, Optional.<Integer>of(10));
|
||||||
|
setJobId(jobId);
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).endsWith(
|
||||||
|
": Job is not in FINALIZED or STOPPED state\n"
|
||||||
|
+ "successfully requested async deletion of 0 job(s); errors received on 1\n");
|
||||||
|
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteOfRunningJob_succeedsWithForce() throws Exception {
|
||||||
|
String jobId = createMapreduce("jobname");
|
||||||
|
executeTasks(QUEUE_NAME, clock, Optional.<Integer>of(10));
|
||||||
|
clock.setTo(DateTime.now(UTC));
|
||||||
|
action = new MapreduceEntityCleanupAction(
|
||||||
|
Optional.of(jobId),
|
||||||
|
Optional.<String>absent(), // jobName
|
||||||
|
Optional.<Integer>absent(), // numJobsToDelete
|
||||||
|
Optional.<Integer>absent(), // daysOld
|
||||||
|
Optional.of(true), // force
|
||||||
|
mapreduceEntityCleanupUtil,
|
||||||
|
clock,
|
||||||
|
DatastoreServiceFactory.getDatastoreService(),
|
||||||
|
response);
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).endsWith(
|
||||||
|
": deletion requested\n"
|
||||||
|
+ "successfully requested async deletion of 1 job(s); errors received on 0\n");
|
||||||
|
executeTasksUntilEmpty(QUEUE_NAME, clock);
|
||||||
|
assertNumMapreducesAndShardedJobs(0, 0);
|
||||||
|
assertThat(mapreduceEntityCleanupUtil.getNumSearchesPerformed()).isEqualTo(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJobIdAndJobName_fails() throws Exception {
|
||||||
|
setJobIdJobNameAndDaysOld(
|
||||||
|
Optional.of("jobid"), Optional.of("jobname"), Optional.<Integer>absent());
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload()).isEqualTo("Do not specify both a job ID and a job name");
|
||||||
|
assertNumMapreducesAndShardedJobs(0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJobIdAndDaysOld_fails() throws Exception {
|
||||||
|
setJobIdJobNameAndDaysOld(Optional.of("jobid"), Optional.<String>absent(), Optional.of(0));
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload())
|
||||||
|
.isEqualTo("Do not specify both a job ID and a days old threshold");
|
||||||
|
assertNumMapreducesAndShardedJobs(0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJobIdAndNumJobs_fails() throws Exception {
|
||||||
|
clock.setTo(DateTime.now(UTC));
|
||||||
|
action = new MapreduceEntityCleanupAction(
|
||||||
|
Optional.of("jobid"),
|
||||||
|
Optional.<String>absent(), // jobName
|
||||||
|
Optional.of(1), // numJobsToDelete
|
||||||
|
Optional.<Integer>absent(), // daysOld
|
||||||
|
Optional.<Boolean>absent(), // force
|
||||||
|
mapreduceEntityCleanupUtil,
|
||||||
|
clock,
|
||||||
|
DatastoreServiceFactory.getDatastoreService(),
|
||||||
|
response);
|
||||||
|
|
||||||
|
action.run();
|
||||||
|
|
||||||
|
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||||
|
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||||
|
assertThat(response.getPayload())
|
||||||
|
.isEqualTo("Do not specify both a job ID and a number of jobs to delete");
|
||||||
|
assertNumMapreducesAndShardedJobs(0, 0);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,60 @@
|
||||||
|
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
//Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
//you may not use this file except in compliance with the License.
|
||||||
|
//You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
//Unless required by applicable law or agreed to in writing, software
|
||||||
|
//distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
//See the License for the specific language governing permissions and
|
||||||
|
//limitations under the License.
|
||||||
|
|
||||||
|
package google.registry.batch;
|
||||||
|
|
||||||
|
import com.google.common.base.Optional;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test harness for {@link MapreduceEntityCleanupUtil}.
|
||||||
|
*
|
||||||
|
* <p>It's a somewhat like a mock, in that it records the number of calls to {@link
|
||||||
|
* #findEligibleJobsByJobName}, and it allows the overriding of the number of jobs returned per
|
||||||
|
* search, to allow testing of subsequent searches. But it's not a full mock by any means.
|
||||||
|
*/
|
||||||
|
public class TestMapreduceEntityCleanupUtil extends MapreduceEntityCleanupUtil {
|
||||||
|
|
||||||
|
private int maxNumberOfJobsPerSearch;
|
||||||
|
private int numSearches = 0;
|
||||||
|
|
||||||
|
public TestMapreduceEntityCleanupUtil() {
|
||||||
|
this.maxNumberOfJobsPerSearch = MAX_NUMBER_OF_JOBS_PER_SEARCH;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getMaxNumberOfJobsPerSearch() {
|
||||||
|
return maxNumberOfJobsPerSearch;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxNumberOfJobsPerSearch(int maxNumberOfJobsPerSearch) {
|
||||||
|
this.maxNumberOfJobsPerSearch = maxNumberOfJobsPerSearch;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EligibleJobResults findEligibleJobsByJobName(
|
||||||
|
@Nullable String jobName,
|
||||||
|
DateTime cutoffDate,
|
||||||
|
Optional<Integer> maxJobs,
|
||||||
|
boolean ignoreState,
|
||||||
|
Optional<String> cursor) {
|
||||||
|
numSearches++;
|
||||||
|
return super.findEligibleJobsByJobName(jobName, cutoffDate, maxJobs, ignoreState, cursor);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getNumSearchesPerformed() {
|
||||||
|
return numSearches;
|
||||||
|
}
|
||||||
|
}
|
|
@ -115,6 +115,8 @@ public abstract class MapreduceTestCase<T> extends ShardableTestCase {
|
||||||
String pathInfo = taskStateInfo.getUrl();
|
String pathInfo = taskStateInfo.getUrl();
|
||||||
if (pathInfo.startsWith("/_dr/mapreduce/")) {
|
if (pathInfo.startsWith("/_dr/mapreduce/")) {
|
||||||
pathInfo = pathInfo.replace("/_dr/mapreduce", "");
|
pathInfo = pathInfo.replace("/_dr/mapreduce", "");
|
||||||
|
} else if (pathInfo.startsWith("/mapreduce/")) {
|
||||||
|
pathInfo = pathInfo.replace("/mapreduce", "");
|
||||||
} else if (pathInfo.startsWith("/")) {
|
} else if (pathInfo.startsWith("/")) {
|
||||||
pathInfo = pathInfo.replace("/_ah/", "");
|
pathInfo = pathInfo.replace("/_ah/", "");
|
||||||
pathInfo = pathInfo.substring(pathInfo.indexOf('/'));
|
pathInfo = pathInfo.substring(pathInfo.indexOf('/'));
|
||||||
|
@ -176,7 +178,23 @@ public abstract class MapreduceTestCase<T> extends ShardableTestCase {
|
||||||
*/
|
*/
|
||||||
protected void executeTasksUntilEmpty(String queueName, @Nullable FakeClock clock)
|
protected void executeTasksUntilEmpty(String queueName, @Nullable FakeClock clock)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
while (true) {
|
executeTasks(queueName, clock, Optional.<Integer>absent());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executes mapreduce tasks, increment the clock between each task.
|
||||||
|
*
|
||||||
|
* <p>Incrementing the clock between tasks is important if tasks have transactions inside the
|
||||||
|
* mapper or reducer, which don't have access to the fake clock.
|
||||||
|
*
|
||||||
|
* <p>The maxTasks parameter determines how many tasks (at most) will be run. If maxTasks is
|
||||||
|
* absent(), all tasks are run until the queue is empty. If maxTasks is zero, no tasks are run.
|
||||||
|
*/
|
||||||
|
protected void executeTasks(
|
||||||
|
String queueName, @Nullable FakeClock clock, Optional<Integer> maxTasks) throws Exception {
|
||||||
|
for (int numTasksDeleted = 0;
|
||||||
|
!maxTasks.isPresent() || (numTasksDeleted < maxTasks.get());
|
||||||
|
numTasksDeleted++) {
|
||||||
ofy().clearSessionCache();
|
ofy().clearSessionCache();
|
||||||
// We have to re-acquire task list every time, because local implementation returns a copy.
|
// We have to re-acquire task list every time, because local implementation returns a copy.
|
||||||
List<QueueStateInfo.TaskStateInfo> taskInfo =
|
List<QueueStateInfo.TaskStateInfo> taskInfo =
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue