Add Runnable overrides to ease use of Java 8 language features

Runnable and Callable are both @FunctionalInterfaces. The difference is
that Callable requires a return value whereas Runnable does not, so in
situations where we don't care about a return value, rather than having to
add an unnecessary 'return null;' at the end of the lambda, we can simply
use a non-returning Runnable instead.

Unfortunately, owing to legacy reasons, Runnable is not declared to throw
checked exceptions whereas Callable is, so in situations where checked
exceptions are thrown we still need to have a 'return null;' call at the
end.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=172935400
This commit is contained in:
mcilwain 2017-10-20 14:37:48 -07:00 committed by jianglai
parent d577a281b8
commit 1790914058
13 changed files with 141 additions and 134 deletions

View file

@ -143,12 +143,7 @@ public final class DeleteOldCommitLogsAction implements Runnable {
//
// We want to make sure we retry any load individually to reduce the chance of the entire
// shard failing, hence we wrap it in a transactNew.
Object object = ofy().transactNew(new Work<Object>() {
@Override
public Object run() {
return ofy().load().key(key).now();
}
});
Object object = ofy().transactNew(() -> ofy().load().key(key).now());
checkNotNull(object, "Received a key to a missing object. key: %s", key);
checkState(
object instanceof EppResource,

View file

@ -153,25 +153,27 @@ public class RestoreCommitLogsAction implements Runnable {
try {
deleteResult.now();
} catch (Exception e) {
retry(() -> deleteAsync(manifest.getDeletions()).now());
retrier.callWithRetry(
() -> deleteAsync(manifest.getDeletions()).now(), RuntimeException.class);
}
return manifest;
}
private void saveRaw(final List<Entity> entitiesToSave) {
private void saveRaw(List<Entity> entitiesToSave) {
if (dryRun) {
logger.infofmt("Would have saved entities: %s", entitiesToSave);
return;
}
retry(() -> datastoreService.put(entitiesToSave));
retrier.callWithRetry(() -> datastoreService.put(entitiesToSave), RuntimeException.class);
}
private void saveOfy(final Iterable<? extends ImmutableObject> objectsToSave) {
private void saveOfy(Iterable<? extends ImmutableObject> objectsToSave) {
if (dryRun) {
logger.infofmt("Would have saved entities: %s", objectsToSave);
return;
}
retry(() -> ofy().saveWithoutBackup().entities(objectsToSave).now());
retrier.callWithRetry(
() -> ofy().saveWithoutBackup().entities(objectsToSave).now(), RuntimeException.class);
}
private Result<?> deleteAsync(Set<Key<?>> keysToDelete) {
@ -183,13 +185,4 @@ public class RestoreCommitLogsAction implements Runnable {
: ofy().deleteWithoutBackup().keys(keysToDelete);
}
/** Retrier for saves and deletes, since we can't proceed with any failures. */
private void retry(final Runnable runnable) {
retrier.callWithRetry(
() -> {
runnable.run();
return null;
},
RuntimeException.class);
}
}

View file

@ -59,7 +59,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Multiset;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.Work;
import google.registry.batch.DeleteContactsAndHostsAction.DeletionResult.Type;
import google.registry.dns.DnsQueue;
import google.registry.flows.async.AsyncFlowMetrics;
@ -179,18 +178,13 @@ public class DeleteContactsAndHostsAction implements Runnable {
}
final List<TaskHandle> tasks =
deletionRequests.stream().map(DeletionRequest::task).collect(toImmutableList());
retrier.callWithRetry(
() -> {
queue.deleteTask(tasks);
return null;
},
TransientFailureException.class);
for (DeletionRequest deletionRequest : deletionRequests) {
retrier.callWithRetry(() -> queue.deleteTask(tasks), TransientFailureException.class);
deletionRequests.forEach(
deletionRequest ->
asyncFlowMetrics.recordAsyncFlowResult(
deletionRequest.getMetricOperationType(),
OperationResult.STALE,
deletionRequest.requestedTime());
}
deletionRequest.requestedTime()));
}
private void runMapreduce(ImmutableList<DeletionRequest> deletionRequests) {
@ -280,15 +274,15 @@ public class DeleteContactsAndHostsAction implements Runnable {
public void reduce(final DeletionRequest deletionRequest, ReducerInput<Boolean> values) {
final boolean hasNoActiveReferences = !Iterators.contains(values, true);
logger.infofmt("Processing async deletion request for %s", deletionRequest.key());
DeletionResult result = ofy().transactNew(new Work<DeletionResult>() {
@Override
@SuppressWarnings("unchecked")
public DeletionResult run() {
DeletionResult result =
ofy()
.transactNew(
() -> {
DeletionResult deletionResult =
attemptToDeleteResource(deletionRequest, hasNoActiveReferences);
getQueue(QUEUE_ASYNC_DELETE).deleteTask(deletionRequest.task());
return deletionResult;
}});
});
asyncFlowMetrics.recordAsyncFlowResult(
deletionRequest.getMetricOperationType(),
result.getMetricOperationResult(),
@ -369,10 +363,7 @@ public class DeleteContactsAndHostsAction implements Runnable {
} else {
resourceToSaveBuilder = resource.asBuilder();
}
resourceToSave = resourceToSaveBuilder
.setDeletionTime(now)
.setStatusValues(null)
.build();
resourceToSave = resourceToSaveBuilder.setDeletionTime(now).setStatusValues(null).build();
performDeleteTasks(resource, resourceToSave, now, historyEntry);
updateForeignKeyIndexDeletionTime(resourceToSave);
} else {

View file

@ -31,8 +31,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.VoidWork;
import com.googlecode.objectify.Work;
import google.registry.config.RegistryConfig.Config;
import google.registry.dns.DnsQueue;
import google.registry.mapreduce.MapreduceRunner;
@ -204,9 +202,7 @@ public class DeleteProberDataAction implements Runnable {
final Key<EppResourceIndex> eppIndex = Key.create(EppResourceIndex.create(domainKey));
final Key<? extends ForeignKeyIndex<?>> fki = ForeignKeyIndex.createKey(domain);
int entitiesDeleted = ofy().transact(new Work<Integer>() {
@Override
public Integer run() {
int entitiesDeleted = ofy().transact(() -> {
// This ancestor query selects all descendant HistoryEntries, BillingEvents, PollMessages,
// and TLD-specific entities, as well as the domain itself.
List<Key<Object>> domainAndDependentKeys = ofy().load().ancestor(domainKey).keys().list();
@ -221,16 +217,13 @@ public class DeleteProberDataAction implements Runnable {
ofy().deleteWithoutBackup().keys(allKeys);
}
return allKeys.size();
}
});
getContext().incrementCounter("domains hard-deleted");
getContext().incrementCounter("total entities hard-deleted", entitiesDeleted);
}
private void softDeleteDomain(final DomainResource domain) {
ofy().transactNew(new VoidWork() {
@Override
public void vrun() {
ofy().transactNew(() -> {
DomainResource deletedDomain = domain
.asBuilder()
.setDeletionTime(ofy().getTransactionTime())
@ -251,7 +244,7 @@ public class DeleteProberDataAction implements Runnable {
updateForeignKeyIndexDeletionTime(deletedDomain);
dnsQueue.addDomainRefreshTask(deletedDomain.getFullyQualifiedDomainName());
}
});
);
}
}
}

View file

@ -178,10 +178,7 @@ public class RefreshDnsOnHostRenameAction implements Runnable {
}
if (referencingHostKey != null) {
retrier.callWithRetry(
() -> {
dnsQueue.addDomainRefreshTask(domain.getFullyQualifiedDomainName());
return null;
},
() -> dnsQueue.addDomainRefreshTask(domain.getFullyQualifiedDomainName()),
TransientFailureException.class);
logger.infofmt(
"Enqueued DNS refresh for domain %s referenced by host %s.",
@ -242,15 +239,9 @@ public class RefreshDnsOnHostRenameAction implements Runnable {
}
final List<TaskHandle> tasks =
refreshRequests.stream().map(DnsRefreshRequest::task).collect(toImmutableList());
retrier.callWithRetry(
() -> {
queue.deleteTask(tasks);
return null;
},
TransientFailureException.class);
for (DnsRefreshRequest refreshRequest : refreshRequests) {
asyncFlowMetrics.recordAsyncFlowResult(DNS_REFRESH, result, refreshRequest.requestedTime());
}
retrier.callWithRetry(() -> queue.deleteTask(tasks), TransientFailureException.class);
refreshRequests.forEach(
r -> asyncFlowMetrics.recordAsyncFlowResult(DNS_REFRESH, result, r.requestedTime()));
}
/** A class that encapsulates the values of a request to refresh DNS for a renamed host. */

View file

@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import com.googlecode.objectify.VoidWork;
import google.registry.config.RegistryConfig.Config;
import google.registry.groups.GroupsConnection;
import google.registry.groups.GroupsConnection.Role;
@ -134,12 +133,7 @@ public final class SyncGroupMembersAction implements Runnable {
new ImmutableMap.Builder<>();
for (final Registrar registrar : dirtyRegistrars) {
try {
retrier.callWithRetry(
() -> {
syncRegistrarContacts(registrar);
return null;
},
RuntimeException.class);
retrier.callWithRetry(() -> syncRegistrarContacts(registrar), RuntimeException.class);
resultsBuilder.put(registrar, Optional.<Throwable>empty());
} catch (Throwable e) {
logger.severe(e, e.getMessage());
@ -171,11 +165,7 @@ public final class SyncGroupMembersAction implements Runnable {
registrarsToSave.add(result.getKey().asBuilder().setContactsRequireSyncing(false).build());
}
}
ofy().transactNew(new VoidWork() {
@Override
public void vrun() {
ofy().save().entities(registrarsToSave.build());
}});
ofy().transactNew(() -> ofy().save().entities(registrarsToSave.build()));
return errors;
}

View file

@ -106,11 +106,6 @@ public final class AsyncFlowEnqueuer {
* enqueuing a task.
*/
private void addTaskToQueueWithRetry(final Queue queue, final TaskOptions task) {
retrier.callWithRetry(
() -> {
queue.add(task);
return null;
},
TransientFailureException.class);
retrier.callWithRetry(() -> queue.add(task), TransientFailureException.class);
}
}

View file

@ -19,8 +19,6 @@ import static google.registry.model.ofy.ObjectifyService.ofy;
import com.google.appengine.api.users.User;
import com.google.common.base.Splitter;
import com.googlecode.objectify.VoidWork;
import com.googlecode.objectify.Work;
import com.googlecode.objectify.annotation.Entity;
import com.googlecode.objectify.annotation.Id;
import google.registry.model.ImmutableObject;
@ -54,24 +52,13 @@ public class GaeUserIdConverter extends ImmutableObject {
try {
// Perform these operations in a transactionless context to avoid enlisting in some outer
// transaction (if any).
ofy().doTransactionless(new VoidWork() {
@Override
public void vrun() {
ofy().saveWithoutBackup().entity(gaeUserIdConverter).now();
}});
ofy().doTransactionless(() -> ofy().saveWithoutBackup().entity(gaeUserIdConverter).now());
// The read must be done in its own transaction to avoid reading from the session cache.
return ofy().transactNew(new Work<String>() {
@Override
public String run() {
return ofy().load().entity(gaeUserIdConverter).safe().user.getUserId();
}});
return ofy()
.transactNew(() -> ofy().load().entity(gaeUserIdConverter).safe().user.getUserId());
} finally {
ofy().doTransactionless(new VoidWork() {
@Override
public void vrun() {
ofy().deleteWithoutBackup().entity(gaeUserIdConverter).now();
}});
ofy().doTransactionless(() -> ofy().deleteWithoutBackup().entity(gaeUserIdConverter).now());
}
}
}

View file

@ -200,6 +200,20 @@ public class Ofy {
return inTransaction() ? work.run() : transactNew(work);
}
/**
* Execute a transaction.
*
* <p>This overload is used for transactions that don't return a value, formerly implemented using
* VoidWork.
*/
public void transact(Runnable work) {
transact(
() -> {
work.run();
return null;
});
}
/** Pause the current transaction (if any) and complete this one before returning to it. */
public <R> R transactNew(Work<R> work) {
// Wrap the Work in a CommitLoggedWork so that we can give transactions a frozen view of time
@ -207,6 +221,20 @@ public class Ofy {
return transactCommitLoggedWork(new CommitLoggedWork<>(work, getClock()));
}
/**
* Pause the current transaction (if any) and complete this one before returning to it.
*
* <p>This overload is used for transactions that don't return a value, formerly implemented using
* VoidWork.
*/
public void transactNew(Runnable work) {
transactNew(
() -> {
work.run();
return null;
});
}
/**
* Transact with commit logs and retry with exponential backoff.
*

View file

@ -44,7 +44,6 @@ import java.io.ByteArrayInputStream;
import java.net.MalformedURLException;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.concurrent.Callable;
import javax.inject.Inject;
/**
@ -81,21 +80,20 @@ public class RdeReporter {
req.addHeader(new HTTPHeader(AUTHORIZATION, "Basic " + token));
req.setPayload(reportBytes);
logger.infofmt("Sending report:\n%s", new String(reportBytes, UTF_8));
HTTPResponse rsp = retrier.callWithRetry(
new Callable<HTTPResponse>() {
@Override
public HTTPResponse call() throws Exception {
HTTPResponse rsp = urlFetchService.fetch(req);
switch (rsp.getResponseCode()) {
HTTPResponse rsp =
retrier.callWithRetry(
() -> {
HTTPResponse rsp1 = urlFetchService.fetch(req);
switch (rsp1.getResponseCode()) {
case SC_OK:
case SC_BAD_REQUEST:
break;
default:
throw new UrlFetchException("PUT failed", req, rsp);
throw new UrlFetchException("PUT failed", req, rsp1);
}
return rsp;
}
}, SocketTimeoutException.class);
return rsp1;
},
SocketTimeoutException.class);
// Ensure the XML response is valid.
XjcIirdeaResult result = parseResult(rsp);

View file

@ -18,6 +18,7 @@ import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl;
import static com.google.common.base.Verify.verify;
import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8;
import static com.jcraft.jsch.ChannelSftp.OVERWRITE;
import static google.registry.model.common.Cursor.CursorType.RDE_UPLOAD_SFTP;
import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.model.rde.RdeMode.FULL;
@ -29,7 +30,6 @@ import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.tools.cloudstorage.GcsFilename;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteStreams;
import com.googlecode.objectify.VoidWork;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import dagger.Lazy;
@ -139,7 +139,7 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
throw new ServiceUnavailableException("Waiting for RdeStagingAction to complete");
}
DateTime sftpCursorTime = getCursorTimeOrStartOfTime(
ofy().load().key(Cursor.createKey(CursorType.RDE_UPLOAD_SFTP, Registry.get(tld))).now());
ofy().load().key(Cursor.createKey(RDE_UPLOAD_SFTP, Registry.get(tld))).now());
if (sftpCursorTime.plus(sftpCooldown).isAfter(clock.nowUtc())) {
// Fail the task good and hard so it retries until the cooldown passes.
logger.infofmt("tld=%s cursor=%s sftpCursor=%s", tld, watermark, sftpCursorTime);
@ -161,15 +161,15 @@ public final class RdeUploadAction implements Runnable, EscrowTask {
return null;
},
JSchException.class);
ofy().transact(new VoidWork() {
@Override
public void vrun() {
Cursor cursor =
ofy()
.transact(
() ->
ofy()
.save()
.entity(
Cursor.create(
CursorType.RDE_UPLOAD_SFTP, ofy().getTransactionTime(), Registry.get(tld));
ofy().save().entity(cursor).now();
}
});
RDE_UPLOAD_SFTP, ofy().getTransactionTime(), Registry.get(tld)))
.now());
response.setContentType(PLAIN_TEXT_UTF_8);
response.setPayload(String.format("OK %s %s\n", tld, watermark));
}

View file

@ -119,7 +119,7 @@ public class Retrier implements Serializable {
};
/**
* Retries a unit of work in the face of transient errors.
* Retries a unit of work in the face of transient errors and returns the result.
*
* <p>Retrying is done a fixed number of times, with exponential backoff, if the exception that is
* thrown is on a whitelist of retryable errors. If the error is not on the whitelist, or if the
@ -143,8 +143,20 @@ public class Retrier implements Serializable {
moreRetryableErrors);
}
/** Retries a unit of work in the face of transient errors. */
@SafeVarargs
public final void callWithRetry(
VoidCallable callable,
Class<? extends Throwable> retryableError,
Class<? extends Throwable>... moreRetryableErrors) {
callWithRetry(
callable.asCallable(),
retryableError,
moreRetryableErrors);
}
/**
* Retries a unit of work in the face of transient errors.
* Retries a unit of work in the face of transient errors and returns the result.
*
* <p>Retrying is done a fixed number of times, with exponential backoff, if the exception that is
* thrown is on a whitelist of retryable errors. If the error is not on the whitelist, or if the

View file

@ -0,0 +1,34 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.util;
import java.util.concurrent.Callable;
/**
* A functional interface for a version of {@link Callable} that returns no value.
*/
@FunctionalInterface
public interface VoidCallable {
void call() throws Exception;
/** Returns the VoidCallable as a {@link Callable} that returns null. */
public default Callable<Void> asCallable() {
return () -> {
call();
return null;
};
}
}