diff --git a/java/google/registry/backup/DeleteOldCommitLogsAction.java b/java/google/registry/backup/DeleteOldCommitLogsAction.java index ed589cbff..c3647a905 100644 --- a/java/google/registry/backup/DeleteOldCommitLogsAction.java +++ b/java/google/registry/backup/DeleteOldCommitLogsAction.java @@ -143,12 +143,7 @@ public final class DeleteOldCommitLogsAction implements Runnable { // // We want to make sure we retry any load individually to reduce the chance of the entire // shard failing, hence we wrap it in a transactNew. - Object object = ofy().transactNew(new Work() { - @Override - public Object run() { - return ofy().load().key(key).now(); - } - }); + Object object = ofy().transactNew(() -> ofy().load().key(key).now()); checkNotNull(object, "Received a key to a missing object. key: %s", key); checkState( object instanceof EppResource, diff --git a/java/google/registry/backup/RestoreCommitLogsAction.java b/java/google/registry/backup/RestoreCommitLogsAction.java index 02a5a323f..c06d16474 100644 --- a/java/google/registry/backup/RestoreCommitLogsAction.java +++ b/java/google/registry/backup/RestoreCommitLogsAction.java @@ -153,25 +153,27 @@ public class RestoreCommitLogsAction implements Runnable { try { deleteResult.now(); } catch (Exception e) { - retry(() -> deleteAsync(manifest.getDeletions()).now()); + retrier.callWithRetry( + () -> deleteAsync(manifest.getDeletions()).now(), RuntimeException.class); } return manifest; } - private void saveRaw(final List entitiesToSave) { + private void saveRaw(List entitiesToSave) { if (dryRun) { logger.infofmt("Would have saved entities: %s", entitiesToSave); return; } - retry(() -> datastoreService.put(entitiesToSave)); + retrier.callWithRetry(() -> datastoreService.put(entitiesToSave), RuntimeException.class); } - private void saveOfy(final Iterable objectsToSave) { + private void saveOfy(Iterable objectsToSave) { if (dryRun) { logger.infofmt("Would have saved entities: %s", objectsToSave); return; } - retry(() -> ofy().saveWithoutBackup().entities(objectsToSave).now()); + retrier.callWithRetry( + () -> ofy().saveWithoutBackup().entities(objectsToSave).now(), RuntimeException.class); } private Result deleteAsync(Set> keysToDelete) { @@ -183,13 +185,4 @@ public class RestoreCommitLogsAction implements Runnable { : ofy().deleteWithoutBackup().keys(keysToDelete); } - /** Retrier for saves and deletes, since we can't proceed with any failures. */ - private void retry(final Runnable runnable) { - retrier.callWithRetry( - () -> { - runnable.run(); - return null; - }, - RuntimeException.class); - } } diff --git a/java/google/registry/batch/DeleteContactsAndHostsAction.java b/java/google/registry/batch/DeleteContactsAndHostsAction.java index e96395ec5..8cf9bd7d6 100644 --- a/java/google/registry/batch/DeleteContactsAndHostsAction.java +++ b/java/google/registry/batch/DeleteContactsAndHostsAction.java @@ -59,7 +59,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Multiset; import com.googlecode.objectify.Key; -import com.googlecode.objectify.Work; import google.registry.batch.DeleteContactsAndHostsAction.DeletionResult.Type; import google.registry.dns.DnsQueue; import google.registry.flows.async.AsyncFlowMetrics; @@ -179,18 +178,13 @@ public class DeleteContactsAndHostsAction implements Runnable { } final List tasks = deletionRequests.stream().map(DeletionRequest::task).collect(toImmutableList()); - retrier.callWithRetry( - () -> { - queue.deleteTask(tasks); - return null; - }, - TransientFailureException.class); - for (DeletionRequest deletionRequest : deletionRequests) { - asyncFlowMetrics.recordAsyncFlowResult( - deletionRequest.getMetricOperationType(), - OperationResult.STALE, - deletionRequest.requestedTime()); - } + retrier.callWithRetry(() -> queue.deleteTask(tasks), TransientFailureException.class); + deletionRequests.forEach( + deletionRequest -> + asyncFlowMetrics.recordAsyncFlowResult( + deletionRequest.getMetricOperationType(), + OperationResult.STALE, + deletionRequest.requestedTime())); } private void runMapreduce(ImmutableList deletionRequests) { @@ -280,15 +274,15 @@ public class DeleteContactsAndHostsAction implements Runnable { public void reduce(final DeletionRequest deletionRequest, ReducerInput values) { final boolean hasNoActiveReferences = !Iterators.contains(values, true); logger.infofmt("Processing async deletion request for %s", deletionRequest.key()); - DeletionResult result = ofy().transactNew(new Work() { - @Override - @SuppressWarnings("unchecked") - public DeletionResult run() { - DeletionResult deletionResult = - attemptToDeleteResource(deletionRequest, hasNoActiveReferences); - getQueue(QUEUE_ASYNC_DELETE).deleteTask(deletionRequest.task()); - return deletionResult; - }}); + DeletionResult result = + ofy() + .transactNew( + () -> { + DeletionResult deletionResult = + attemptToDeleteResource(deletionRequest, hasNoActiveReferences); + getQueue(QUEUE_ASYNC_DELETE).deleteTask(deletionRequest.task()); + return deletionResult; + }); asyncFlowMetrics.recordAsyncFlowResult( deletionRequest.getMetricOperationType(), result.getMetricOperationResult(), @@ -369,10 +363,7 @@ public class DeleteContactsAndHostsAction implements Runnable { } else { resourceToSaveBuilder = resource.asBuilder(); } - resourceToSave = resourceToSaveBuilder - .setDeletionTime(now) - .setStatusValues(null) - .build(); + resourceToSave = resourceToSaveBuilder.setDeletionTime(now).setStatusValues(null).build(); performDeleteTasks(resource, resourceToSave, now, historyEntry); updateForeignKeyIndexDeletionTime(resourceToSave); } else { diff --git a/java/google/registry/batch/DeleteProberDataAction.java b/java/google/registry/batch/DeleteProberDataAction.java index a62206b73..bfcae3496 100644 --- a/java/google/registry/batch/DeleteProberDataAction.java +++ b/java/google/registry/batch/DeleteProberDataAction.java @@ -31,8 +31,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.googlecode.objectify.Key; -import com.googlecode.objectify.VoidWork; -import com.googlecode.objectify.Work; import google.registry.config.RegistryConfig.Config; import google.registry.dns.DnsQueue; import google.registry.mapreduce.MapreduceRunner; @@ -204,9 +202,7 @@ public class DeleteProberDataAction implements Runnable { final Key eppIndex = Key.create(EppResourceIndex.create(domainKey)); final Key> fki = ForeignKeyIndex.createKey(domain); - int entitiesDeleted = ofy().transact(new Work() { - @Override - public Integer run() { + int entitiesDeleted = ofy().transact(() -> { // This ancestor query selects all descendant HistoryEntries, BillingEvents, PollMessages, // and TLD-specific entities, as well as the domain itself. List> domainAndDependentKeys = ofy().load().ancestor(domainKey).keys().list(); @@ -221,16 +217,13 @@ public class DeleteProberDataAction implements Runnable { ofy().deleteWithoutBackup().keys(allKeys); } return allKeys.size(); - } - }); + }); getContext().incrementCounter("domains hard-deleted"); getContext().incrementCounter("total entities hard-deleted", entitiesDeleted); } private void softDeleteDomain(final DomainResource domain) { - ofy().transactNew(new VoidWork() { - @Override - public void vrun() { + ofy().transactNew(() -> { DomainResource deletedDomain = domain .asBuilder() .setDeletionTime(ofy().getTransactionTime()) @@ -251,7 +244,7 @@ public class DeleteProberDataAction implements Runnable { updateForeignKeyIndexDeletionTime(deletedDomain); dnsQueue.addDomainRefreshTask(deletedDomain.getFullyQualifiedDomainName()); } - }); + ); } } } diff --git a/java/google/registry/batch/RefreshDnsOnHostRenameAction.java b/java/google/registry/batch/RefreshDnsOnHostRenameAction.java index 580928459..cedfbd2d0 100644 --- a/java/google/registry/batch/RefreshDnsOnHostRenameAction.java +++ b/java/google/registry/batch/RefreshDnsOnHostRenameAction.java @@ -178,10 +178,7 @@ public class RefreshDnsOnHostRenameAction implements Runnable { } if (referencingHostKey != null) { retrier.callWithRetry( - () -> { - dnsQueue.addDomainRefreshTask(domain.getFullyQualifiedDomainName()); - return null; - }, + () -> dnsQueue.addDomainRefreshTask(domain.getFullyQualifiedDomainName()), TransientFailureException.class); logger.infofmt( "Enqueued DNS refresh for domain %s referenced by host %s.", @@ -242,15 +239,9 @@ public class RefreshDnsOnHostRenameAction implements Runnable { } final List tasks = refreshRequests.stream().map(DnsRefreshRequest::task).collect(toImmutableList()); - retrier.callWithRetry( - () -> { - queue.deleteTask(tasks); - return null; - }, - TransientFailureException.class); - for (DnsRefreshRequest refreshRequest : refreshRequests) { - asyncFlowMetrics.recordAsyncFlowResult(DNS_REFRESH, result, refreshRequest.requestedTime()); - } + retrier.callWithRetry(() -> queue.deleteTask(tasks), TransientFailureException.class); + refreshRequests.forEach( + r -> asyncFlowMetrics.recordAsyncFlowResult(DNS_REFRESH, result, r.requestedTime())); } /** A class that encapsulates the values of a request to refresh DNS for a renamed host. */ diff --git a/java/google/registry/export/SyncGroupMembersAction.java b/java/google/registry/export/SyncGroupMembersAction.java index 227f903ec..5635c2747 100644 --- a/java/google/registry/export/SyncGroupMembersAction.java +++ b/java/google/registry/export/SyncGroupMembersAction.java @@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import com.google.common.collect.Streams; -import com.googlecode.objectify.VoidWork; import google.registry.config.RegistryConfig.Config; import google.registry.groups.GroupsConnection; import google.registry.groups.GroupsConnection.Role; @@ -134,12 +133,7 @@ public final class SyncGroupMembersAction implements Runnable { new ImmutableMap.Builder<>(); for (final Registrar registrar : dirtyRegistrars) { try { - retrier.callWithRetry( - () -> { - syncRegistrarContacts(registrar); - return null; - }, - RuntimeException.class); + retrier.callWithRetry(() -> syncRegistrarContacts(registrar), RuntimeException.class); resultsBuilder.put(registrar, Optional.empty()); } catch (Throwable e) { logger.severe(e, e.getMessage()); @@ -171,11 +165,7 @@ public final class SyncGroupMembersAction implements Runnable { registrarsToSave.add(result.getKey().asBuilder().setContactsRequireSyncing(false).build()); } } - ofy().transactNew(new VoidWork() { - @Override - public void vrun() { - ofy().save().entities(registrarsToSave.build()); - }}); + ofy().transactNew(() -> ofy().save().entities(registrarsToSave.build())); return errors; } diff --git a/java/google/registry/flows/async/AsyncFlowEnqueuer.java b/java/google/registry/flows/async/AsyncFlowEnqueuer.java index 7b592b0c9..1d30ba42b 100644 --- a/java/google/registry/flows/async/AsyncFlowEnqueuer.java +++ b/java/google/registry/flows/async/AsyncFlowEnqueuer.java @@ -106,11 +106,6 @@ public final class AsyncFlowEnqueuer { * enqueuing a task. */ private void addTaskToQueueWithRetry(final Queue queue, final TaskOptions task) { - retrier.callWithRetry( - () -> { - queue.add(task); - return null; - }, - TransientFailureException.class); + retrier.callWithRetry(() -> queue.add(task), TransientFailureException.class); } } diff --git a/java/google/registry/model/common/GaeUserIdConverter.java b/java/google/registry/model/common/GaeUserIdConverter.java index f2b8f7c28..a4fc98305 100644 --- a/java/google/registry/model/common/GaeUserIdConverter.java +++ b/java/google/registry/model/common/GaeUserIdConverter.java @@ -19,8 +19,6 @@ import static google.registry.model.ofy.ObjectifyService.ofy; import com.google.appengine.api.users.User; import com.google.common.base.Splitter; -import com.googlecode.objectify.VoidWork; -import com.googlecode.objectify.Work; import com.googlecode.objectify.annotation.Entity; import com.googlecode.objectify.annotation.Id; import google.registry.model.ImmutableObject; @@ -54,24 +52,13 @@ public class GaeUserIdConverter extends ImmutableObject { try { // Perform these operations in a transactionless context to avoid enlisting in some outer // transaction (if any). - ofy().doTransactionless(new VoidWork() { - @Override - public void vrun() { - ofy().saveWithoutBackup().entity(gaeUserIdConverter).now(); - }}); + ofy().doTransactionless(() -> ofy().saveWithoutBackup().entity(gaeUserIdConverter).now()); // The read must be done in its own transaction to avoid reading from the session cache. - return ofy().transactNew(new Work() { - @Override - public String run() { - return ofy().load().entity(gaeUserIdConverter).safe().user.getUserId(); - }}); + return ofy() + .transactNew(() -> ofy().load().entity(gaeUserIdConverter).safe().user.getUserId()); } finally { - ofy().doTransactionless(new VoidWork() { - @Override - public void vrun() { - ofy().deleteWithoutBackup().entity(gaeUserIdConverter).now(); - }}); + ofy().doTransactionless(() -> ofy().deleteWithoutBackup().entity(gaeUserIdConverter).now()); } } } diff --git a/java/google/registry/model/ofy/Ofy.java b/java/google/registry/model/ofy/Ofy.java index a071cdb66..4ea39c7f5 100644 --- a/java/google/registry/model/ofy/Ofy.java +++ b/java/google/registry/model/ofy/Ofy.java @@ -200,6 +200,20 @@ public class Ofy { return inTransaction() ? work.run() : transactNew(work); } + /** + * Execute a transaction. + * + *

This overload is used for transactions that don't return a value, formerly implemented using + * VoidWork. + */ + public void transact(Runnable work) { + transact( + () -> { + work.run(); + return null; + }); + } + /** Pause the current transaction (if any) and complete this one before returning to it. */ public R transactNew(Work work) { // Wrap the Work in a CommitLoggedWork so that we can give transactions a frozen view of time @@ -207,6 +221,20 @@ public class Ofy { return transactCommitLoggedWork(new CommitLoggedWork<>(work, getClock())); } + /** + * Pause the current transaction (if any) and complete this one before returning to it. + * + *

This overload is used for transactions that don't return a value, formerly implemented using + * VoidWork. + */ + public void transactNew(Runnable work) { + transactNew( + () -> { + work.run(); + return null; + }); + } + /** * Transact with commit logs and retry with exponential backoff. * diff --git a/java/google/registry/rde/RdeReporter.java b/java/google/registry/rde/RdeReporter.java index e2011456d..d95fdc952 100644 --- a/java/google/registry/rde/RdeReporter.java +++ b/java/google/registry/rde/RdeReporter.java @@ -44,7 +44,6 @@ import java.io.ByteArrayInputStream; import java.net.MalformedURLException; import java.net.SocketTimeoutException; import java.net.URL; -import java.util.concurrent.Callable; import javax.inject.Inject; /** @@ -81,21 +80,20 @@ public class RdeReporter { req.addHeader(new HTTPHeader(AUTHORIZATION, "Basic " + token)); req.setPayload(reportBytes); logger.infofmt("Sending report:\n%s", new String(reportBytes, UTF_8)); - HTTPResponse rsp = retrier.callWithRetry( - new Callable() { - @Override - public HTTPResponse call() throws Exception { - HTTPResponse rsp = urlFetchService.fetch(req); - switch (rsp.getResponseCode()) { - case SC_OK: - case SC_BAD_REQUEST: - break; - default: - throw new UrlFetchException("PUT failed", req, rsp); - } - return rsp; - } - }, SocketTimeoutException.class); + HTTPResponse rsp = + retrier.callWithRetry( + () -> { + HTTPResponse rsp1 = urlFetchService.fetch(req); + switch (rsp1.getResponseCode()) { + case SC_OK: + case SC_BAD_REQUEST: + break; + default: + throw new UrlFetchException("PUT failed", req, rsp1); + } + return rsp1; + }, + SocketTimeoutException.class); // Ensure the XML response is valid. XjcIirdeaResult result = parseResult(rsp); diff --git a/java/google/registry/rde/RdeUploadAction.java b/java/google/registry/rde/RdeUploadAction.java index ac0ea67f9..2bdbab131 100644 --- a/java/google/registry/rde/RdeUploadAction.java +++ b/java/google/registry/rde/RdeUploadAction.java @@ -18,6 +18,7 @@ import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl; import static com.google.common.base.Verify.verify; import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8; import static com.jcraft.jsch.ChannelSftp.OVERWRITE; +import static google.registry.model.common.Cursor.CursorType.RDE_UPLOAD_SFTP; import static google.registry.model.common.Cursor.getCursorTimeOrStartOfTime; import static google.registry.model.ofy.ObjectifyService.ofy; import static google.registry.model.rde.RdeMode.FULL; @@ -29,7 +30,6 @@ import com.google.appengine.api.taskqueue.Queue; import com.google.appengine.tools.cloudstorage.GcsFilename; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; -import com.googlecode.objectify.VoidWork; import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSchException; import dagger.Lazy; @@ -139,7 +139,7 @@ public final class RdeUploadAction implements Runnable, EscrowTask { throw new ServiceUnavailableException("Waiting for RdeStagingAction to complete"); } DateTime sftpCursorTime = getCursorTimeOrStartOfTime( - ofy().load().key(Cursor.createKey(CursorType.RDE_UPLOAD_SFTP, Registry.get(tld))).now()); + ofy().load().key(Cursor.createKey(RDE_UPLOAD_SFTP, Registry.get(tld))).now()); if (sftpCursorTime.plus(sftpCooldown).isAfter(clock.nowUtc())) { // Fail the task good and hard so it retries until the cooldown passes. logger.infofmt("tld=%s cursor=%s sftpCursor=%s", tld, watermark, sftpCursorTime); @@ -161,15 +161,15 @@ public final class RdeUploadAction implements Runnable, EscrowTask { return null; }, JSchException.class); - ofy().transact(new VoidWork() { - @Override - public void vrun() { - Cursor cursor = - Cursor.create( - CursorType.RDE_UPLOAD_SFTP, ofy().getTransactionTime(), Registry.get(tld)); - ofy().save().entity(cursor).now(); - } - }); + ofy() + .transact( + () -> + ofy() + .save() + .entity( + Cursor.create( + RDE_UPLOAD_SFTP, ofy().getTransactionTime(), Registry.get(tld))) + .now()); response.setContentType(PLAIN_TEXT_UTF_8); response.setPayload(String.format("OK %s %s\n", tld, watermark)); } diff --git a/java/google/registry/util/Retrier.java b/java/google/registry/util/Retrier.java index 1ddea1f9a..ca2fc4f13 100644 --- a/java/google/registry/util/Retrier.java +++ b/java/google/registry/util/Retrier.java @@ -119,7 +119,7 @@ public class Retrier implements Serializable { }; /** - * Retries a unit of work in the face of transient errors. + * Retries a unit of work in the face of transient errors and returns the result. * *

Retrying is done a fixed number of times, with exponential backoff, if the exception that is * thrown is on a whitelist of retryable errors. If the error is not on the whitelist, or if the @@ -143,8 +143,20 @@ public class Retrier implements Serializable { moreRetryableErrors); } + /** Retries a unit of work in the face of transient errors. */ + @SafeVarargs + public final void callWithRetry( + VoidCallable callable, + Class retryableError, + Class... moreRetryableErrors) { + callWithRetry( + callable.asCallable(), + retryableError, + moreRetryableErrors); + } + /** - * Retries a unit of work in the face of transient errors. + * Retries a unit of work in the face of transient errors and returns the result. * *

Retrying is done a fixed number of times, with exponential backoff, if the exception that is * thrown is on a whitelist of retryable errors. If the error is not on the whitelist, or if the diff --git a/java/google/registry/util/VoidCallable.java b/java/google/registry/util/VoidCallable.java new file mode 100644 index 000000000..b019351d6 --- /dev/null +++ b/java/google/registry/util/VoidCallable.java @@ -0,0 +1,34 @@ +// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.util; + +import java.util.concurrent.Callable; + +/** + * A functional interface for a version of {@link Callable} that returns no value. + */ +@FunctionalInterface +public interface VoidCallable { + + void call() throws Exception; + + /** Returns the VoidCallable as a {@link Callable} that returns null. */ + public default Callable asCallable() { + return () -> { + call(); + return null; + }; + } +}