Replace TransactionManager.Work with Supplier (#397)

* Replace TransactionManager.Work with Supplier

Replace the "Work" class with the equivalent java.util.function.Supplier.
This commit is contained in:
Michael Muller 2019-12-04 12:00:13 -05:00 committed by GitHub
parent 3d3d390a9f
commit 1e1c8cdd80
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 127 additions and 106 deletions

View file

@ -109,7 +109,9 @@ public class FlowRunner {
} }
} }
/** Exception for explicitly propagating an EppException out of the transactional {@code Work}. */ /**
* Exception for explicitly propagating an EppException out of the transactional {@code Supplier}.
*/
private static class EppRuntimeException extends RuntimeException { private static class EppRuntimeException extends RuntimeException {
EppRuntimeException(EppException cause) { EppRuntimeException(EppException cause) {
super(cause); super(cause);

View file

@ -30,28 +30,28 @@ import com.google.common.collect.ImmutableSet;
import com.googlecode.objectify.Key; import com.googlecode.objectify.Key;
import google.registry.model.BackupGroupRoot; import google.registry.model.BackupGroupRoot;
import google.registry.model.ImmutableObject; import google.registry.model.ImmutableObject;
import google.registry.model.transaction.TransactionManager.Work;
import google.registry.util.Clock; import google.registry.util.Clock;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.function.Supplier;
import org.joda.time.DateTime; import org.joda.time.DateTime;
/** Wrapper for {@link Work} that associates a time with each attempt. */ /** Wrapper for {@link Supplier} that associates a time with each attempt. */
class CommitLoggedWork<R> implements Runnable { class CommitLoggedWork<R> implements Runnable {
private final Work<R> work; private final Supplier<R> work;
private final Clock clock; private final Clock clock;
/** /**
* Temporary place to store the result of a non-void work. * Temporary place to store the result of a non-void work.
* *
* <p>We don't want to return the result directly because we are going to try to recover from a * <p>We don't want to return the result directly because we are going to try to recover from a
* {@link com.google.appengine.api.datastore.DatastoreTimeoutException} deep inside Objectify * {@link com.google.appengine.api.datastore.DatastoreTimeoutException} deep inside Objectify when
* when it tries to commit the transaction. When an exception is thrown the return value would be * it tries to commit the transaction. When an exception is thrown the return value would be lost,
* lost, but sometimes we will be able to determine that we actually succeeded despite the * but sometimes we will be able to determine that we actually succeeded despite the timeout, and
* timeout, and we'll want to get the result. * we'll want to get the result.
*/ */
private R result; private R result;
@ -76,7 +76,7 @@ class CommitLoggedWork<R> implements Runnable {
/** Lifecycle marker to track whether {@link #run} has been called. */ /** Lifecycle marker to track whether {@link #run} has been called. */
private boolean runCalled; private boolean runCalled;
CommitLoggedWork(Work<R> work, Clock clock) { CommitLoggedWork(Supplier<R> work, Clock clock) {
this.work = work; this.work = work;
this.clock = clock; this.clock = clock;
} }
@ -111,7 +111,7 @@ class CommitLoggedWork<R> implements Runnable {
// Set the time to be used for "now" within the transaction. // Set the time to be used for "now" within the transaction.
try { try {
Ofy.TRANSACTION_INFO.set(createNewTransactionInfo()); Ofy.TRANSACTION_INFO.set(createNewTransactionInfo());
result = work.run(); result = work.get();
saveCommitLog(Ofy.TRANSACTION_INFO.get()); saveCommitLog(Ofy.TRANSACTION_INFO.get());
} finally { } finally {
Ofy.TRANSACTION_INFO.set(previous); Ofy.TRANSACTION_INFO.set(previous);

View file

@ -17,6 +17,7 @@ package google.registry.model.ofy;
import static google.registry.model.ofy.ObjectifyService.ofy; import static google.registry.model.ofy.ObjectifyService.ofy;
import google.registry.model.transaction.TransactionManager; import google.registry.model.transaction.TransactionManager;
import java.util.function.Supplier;
import org.joda.time.DateTime; import org.joda.time.DateTime;
/** Datastore implementation of {@link TransactionManager}. */ /** Datastore implementation of {@link TransactionManager}. */
@ -44,7 +45,7 @@ public class DatastoreTransactionManager implements TransactionManager {
} }
@Override @Override
public <T> T transact(Work<T> work) { public <T> T transact(Supplier<T> work) {
return getOfy().transact(work); return getOfy().transact(work);
} }
@ -54,7 +55,7 @@ public class DatastoreTransactionManager implements TransactionManager {
} }
@Override @Override
public <T> T transactNew(Work<T> work) { public <T> T transactNew(Supplier<T> work) {
return getOfy().transactNew(work); return getOfy().transactNew(work);
} }
@ -64,7 +65,7 @@ public class DatastoreTransactionManager implements TransactionManager {
} }
@Override @Override
public <R> R transactNewReadOnly(Work<R> work) { public <R> R transactNewReadOnly(Supplier<R> work) {
return getOfy().transactNewReadOnly(work); return getOfy().transactNewReadOnly(work);
} }
@ -74,7 +75,7 @@ public class DatastoreTransactionManager implements TransactionManager {
} }
@Override @Override
public <R> R doTransactionless(Work<R> work) { public <R> R doTransactionless(Supplier<R> work) {
return getOfy().doTransactionless(work); return getOfy().doTransactionless(work);
} }

View file

@ -38,7 +38,6 @@ import com.googlecode.objectify.cmd.Saver;
import google.registry.model.annotations.NotBackedUp; import google.registry.model.annotations.NotBackedUp;
import google.registry.model.annotations.VirtualEntity; import google.registry.model.annotations.VirtualEntity;
import google.registry.model.ofy.ReadOnlyWork.KillTransactionException; import google.registry.model.ofy.ReadOnlyWork.KillTransactionException;
import google.registry.model.transaction.TransactionManager.Work;
import google.registry.util.Clock; import google.registry.util.Clock;
import google.registry.util.NonFinalForTesting; import google.registry.util.NonFinalForTesting;
import google.registry.util.Sleeper; import google.registry.util.Sleeper;
@ -46,6 +45,7 @@ import google.registry.util.SystemClock;
import google.registry.util.SystemSleeper; import google.registry.util.SystemSleeper;
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
import java.util.Objects; import java.util.Objects;
import java.util.function.Supplier;
import javax.inject.Inject; import javax.inject.Inject;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
@ -194,9 +194,9 @@ public class Ofy {
} }
/** Execute a transaction. */ /** Execute a transaction. */
<R> R transact(Work<R> work) { <R> R transact(Supplier<R> work) {
// If we are already in a transaction, don't wrap in a CommitLoggedWork. // If we are already in a transaction, don't wrap in a CommitLoggedWork.
return inTransaction() ? work.run() : transactNew(work); return inTransaction() ? work.get() : transactNew(work);
} }
/** /**
@ -214,7 +214,7 @@ public class Ofy {
} }
/** Pause the current transaction (if any) and complete this one before returning to it. */ /** Pause the current transaction (if any) and complete this one before returning to it. */
<R> R transactNew(Work<R> work) { <R> R transactNew(Supplier<R> work) {
// Wrap the Work in a CommitLoggedWork so that we can give transactions a frozen view of time // Wrap the Work in a CommitLoggedWork so that we can give transactions a frozen view of time
// and maintain commit logs for them. // and maintain commit logs for them.
return transactCommitLoggedWork(new CommitLoggedWork<>(work, getClock())); return transactCommitLoggedWork(new CommitLoggedWork<>(work, getClock()));
@ -298,7 +298,7 @@ public class Ofy {
} }
/** A read-only transaction is useful to get strongly consistent reads at a shared timestamp. */ /** A read-only transaction is useful to get strongly consistent reads at a shared timestamp. */
<R> R transactNewReadOnly(Work<R> work) { <R> R transactNewReadOnly(Supplier<R> work) {
ReadOnlyWork<R> readOnlyWork = new ReadOnlyWork<>(work, getClock()); ReadOnlyWork<R> readOnlyWork = new ReadOnlyWork<>(work, getClock());
try { try {
ofy().transactNew(() -> { ofy().transactNew(() -> {
@ -324,11 +324,11 @@ public class Ofy {
} }
/** Execute some work in a transactionless context. */ /** Execute some work in a transactionless context. */
<R> R doTransactionless(Work<R> work) { <R> R doTransactionless(Supplier<R> work) {
try { try {
com.googlecode.objectify.ObjectifyService.push( com.googlecode.objectify.ObjectifyService.push(
com.googlecode.objectify.ObjectifyService.ofy().transactionless()); com.googlecode.objectify.ObjectifyService.ofy().transactionless());
return work.run(); return work.get();
} finally { } finally {
com.googlecode.objectify.ObjectifyService.pop(); com.googlecode.objectify.ObjectifyService.pop();
} }
@ -342,11 +342,11 @@ public class Ofy {
* Note that unlike a transaction's fresh session cache, the contents of this cache will be * Note that unlike a transaction's fresh session cache, the contents of this cache will be
* discarded once the work completes, rather than being propagated into the enclosing session. * discarded once the work completes, rather than being propagated into the enclosing session.
*/ */
public <R> R doWithFreshSessionCache(Work<R> work) { public <R> R doWithFreshSessionCache(Supplier<R> work) {
try { try {
com.googlecode.objectify.ObjectifyService.push( com.googlecode.objectify.ObjectifyService.push(
com.googlecode.objectify.ObjectifyService.factory().begin()); com.googlecode.objectify.ObjectifyService.factory().begin());
return work.run(); return work.get();
} finally { } finally {
com.googlecode.objectify.ObjectifyService.pop(); com.googlecode.objectify.ObjectifyService.pop();
} }

View file

@ -14,13 +14,13 @@
package google.registry.model.ofy; package google.registry.model.ofy;
import google.registry.model.transaction.TransactionManager.Work;
import google.registry.util.Clock; import google.registry.util.Clock;
import java.util.function.Supplier;
/** Wrapper for {@link Work} that disallows mutations and fails the transaction at the end. */ /** Wrapper for {@link Supplier} that disallows mutations and fails the transaction at the end. */
class ReadOnlyWork<R> extends CommitLoggedWork<R> { class ReadOnlyWork<R> extends CommitLoggedWork<R> {
ReadOnlyWork(Work<R> work, Clock clock) { ReadOnlyWork(Supplier<R> work, Clock clock) {
super(work, clock); super(work, clock);
} }

View file

@ -16,6 +16,7 @@ package google.registry.model.transaction;
import com.google.common.flogger.FluentLogger; import com.google.common.flogger.FluentLogger;
import google.registry.util.Clock; import google.registry.util.Clock;
import java.util.function.Supplier;
import javax.persistence.EntityManager; import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory; import javax.persistence.EntityManagerFactory;
import javax.persistence.EntityTransaction; import javax.persistence.EntityTransaction;
@ -63,11 +64,11 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
} }
@Override @Override
public <T> T transact(Work<T> work) { public <T> T transact(Supplier<T> work) {
// TODO(shicong): Investigate removing transactNew functionality after migration as it may // TODO(shicong): Investigate removing transactNew functionality after migration as it may
// be same as this one. // be same as this one.
if (inTransaction()) { if (inTransaction()) {
return work.run(); return work.get();
} }
TransactionInfo txnInfo = transactionInfo.get(); TransactionInfo txnInfo = transactionInfo.get();
txnInfo.entityManager = emf.createEntityManager(); txnInfo.entityManager = emf.createEntityManager();
@ -76,7 +77,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
txn.begin(); txn.begin();
txnInfo.inTransaction = true; txnInfo.inTransaction = true;
txnInfo.transactionTime = clock.nowUtc(); txnInfo.transactionTime = clock.nowUtc();
T result = work.run(); T result = work.get();
txn.commit(); txn.commit();
return result; return result;
} catch (RuntimeException e) { } catch (RuntimeException e) {
@ -102,7 +103,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
} }
@Override @Override
public <T> T transactNew(Work<T> work) { public <T> T transactNew(Supplier<T> work) {
// TODO(shicong): Implements the functionality to start a new transaction. // TODO(shicong): Implements the functionality to start a new transaction.
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@ -114,7 +115,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
} }
@Override @Override
public <T> T transactNewReadOnly(Work<T> work) { public <T> T transactNewReadOnly(Supplier<T> work) {
// TODO(shicong): Implements read only transaction. // TODO(shicong): Implements read only transaction.
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@ -126,7 +127,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
} }
@Override @Override
public <T> T doTransactionless(Work<T> work) { public <T> T doTransactionless(Supplier<T> work) {
// TODO(shicong): Implements doTransactionless. // TODO(shicong): Implements doTransactionless.
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View file

@ -14,6 +14,7 @@
package google.registry.model.transaction; package google.registry.model.transaction;
import java.util.function.Supplier;
import org.joda.time.DateTime; import org.joda.time.DateTime;
/** /**
@ -21,12 +22,6 @@ import org.joda.time.DateTime;
*/ */
public interface TransactionManager { public interface TransactionManager {
/** This functional interface defines a method to execute a work and return the result. */
@FunctionalInterface
interface Work<R> {
R run();
}
/** Returns {@code true} if the caller is in a transaction. /** Returns {@code true} if the caller is in a transaction.
* *
* <p>Note that this function is kept for backward compatibility. We will review the use case * <p>Note that this function is kept for backward compatibility. We will review the use case
@ -42,18 +37,19 @@ public interface TransactionManager {
void assertInTransaction(); void assertInTransaction();
/** Executes the work in a transaction and returns the result. */ /** Executes the work in a transaction and returns the result. */
<T> T transact(Work<T> work); <T> T transact(Supplier<T> work);
/** Executes the work in a transaction. */ /** Executes the work in a transaction. */
void transact(Runnable work); void transact(Runnable work);
/** Pauses the current transaction (if any), executes the work in a new transaction /**
* and returns the result. * Pauses the current transaction (if any), executes the work in a new transaction and returns the
* result.
* *
* <p>Note that this function is kept for backward compatibility. We will review the use case * <p>Note that this function is kept for backward compatibility. We will review the use case
* later when adding the cloud sql implementation. * later when adding the cloud sql implementation.
*/ */
<T> T transactNew(Work<T> work); <T> T transactNew(Supplier<T> work);
/** Pauses the current transaction (if any) and executes the work in a new transaction. /** Pauses the current transaction (if any) and executes the work in a new transaction.
* *
@ -62,12 +58,13 @@ public interface TransactionManager {
*/ */
void transactNew(Runnable work); void transactNew(Runnable work);
/** Executes the work in a read-only transaction and returns the result. /**
* Executes the work in a read-only transaction and returns the result.
* *
* <p>Note that this function is kept for backward compatibility. We will review the use case * <p>Note that this function is kept for backward compatibility. We will review the use case
* later when adding the cloud sql implementation. * later when adding the cloud sql implementation.
*/ */
<R> R transactNewReadOnly(Work<R> work); <R> R transactNewReadOnly(Supplier<R> work);
/** Executes the work in a read-only transaction. /** Executes the work in a read-only transaction.
* *
@ -77,7 +74,7 @@ public interface TransactionManager {
void transactNewReadOnly(Runnable work); void transactNewReadOnly(Runnable work);
/** Executes the work in a transactionless context. */ /** Executes the work in a transactionless context. */
<R> R doTransactionless(Work<R> work); <R> R doTransactionless(Supplier<R> work);
/** Returns the time associated with the start of this particular transaction attempt. */ /** Returns the time associated with the start of this particular transaction attempt. */
DateTime getTransactionTime(); DateTime getTransactionTime();

View file

@ -45,12 +45,12 @@ import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DomainBase; import google.registry.model.domain.DomainBase;
import google.registry.model.eppcommon.Trid; import google.registry.model.eppcommon.Trid;
import google.registry.model.reporting.HistoryEntry; import google.registry.model.reporting.HistoryEntry;
import google.registry.model.transaction.TransactionManager.Work;
import google.registry.testing.AppEngineRule; import google.registry.testing.AppEngineRule;
import google.registry.testing.DatastoreHelper; import google.registry.testing.DatastoreHelper;
import google.registry.testing.FakeClock; import google.registry.testing.FakeClock;
import google.registry.util.SystemClock; import google.registry.util.SystemClock;
import java.util.ConcurrentModificationException; import java.util.ConcurrentModificationException;
import java.util.function.Supplier;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
@ -239,72 +239,88 @@ public class OfyTest {
@Test @Test
public void testTransact_transientFailureException_retries() { public void testTransact_transientFailureException_retries() {
assertThat(tm().transact(new Work<Integer>() { assertThat(
tm().transact(
new Supplier<Integer>() {
int count = 0; int count = 0;
@Override @Override
public Integer run() { public Integer get() {
count++; count++;
if (count == 3) { if (count == 3) {
return count; return count;
} }
throw new TransientFailureException(""); throw new TransientFailureException("");
}})).isEqualTo(3); }
}))
.isEqualTo(3);
} }
@Test @Test
public void testTransact_datastoreTimeoutException_noManifest_retries() { public void testTransact_datastoreTimeoutException_noManifest_retries() {
assertThat(tm().transact(new Work<Integer>() { assertThat(
tm().transact(
new Supplier<Integer>() {
int count = 0; int count = 0;
@Override @Override
public Integer run() { public Integer get() {
// We don't write anything in this transaction, so there is no commit log manifest. // We don't write anything in this transaction, so there is no commit log
// Therefore it's always safe to retry since nothing got written. // manifest.
count++; // Therefore it's always safe to retry since nothing got written.
if (count == 3) { count++;
return count; if (count == 3) {
} return count;
throw new DatastoreTimeoutException(""); }
}})).isEqualTo(3); throw new DatastoreTimeoutException("");
}
}))
.isEqualTo(3);
} }
@Test @Test
public void testTransact_datastoreTimeoutException_manifestNotWrittenToDatastore_retries() { public void testTransact_datastoreTimeoutException_manifestNotWrittenToDatastore_retries() {
assertThat(tm().transact(new Work<Integer>() { assertThat(
tm().transact(
new Supplier<Integer>() {
int count = 0; int count = 0;
@Override @Override
public Integer run() { public Integer get() {
// There will be something in the manifest now, but it won't be committed if we throw. // There will be something in the manifest now, but it won't be committed if
ofy().save().entity(someObject); // we throw.
count++; ofy().save().entity(someObject);
if (count == 3) { count++;
return count; if (count == 3) {
} return count;
throw new DatastoreTimeoutException(""); }
}})).isEqualTo(3); throw new DatastoreTimeoutException("");
}
}))
.isEqualTo(3);
} }
@Test @Test
public void testTransact_datastoreTimeoutException_manifestWrittenToDatastore_returnsSuccess() { public void testTransact_datastoreTimeoutException_manifestWrittenToDatastore_returnsSuccess() {
// A work unit that throws if it is ever retried. // A work unit that throws if it is ever retried.
Work work = new Work<Void>() { Supplier work =
boolean firstCallToVrun = true; new Supplier<Void>() {
boolean firstCallToVrun = true;
@Override @Override
public Void run() { public Void get() {
if (firstCallToVrun) { if (firstCallToVrun) {
firstCallToVrun = false; firstCallToVrun = false;
ofy().save().entity(someObject); ofy().save().entity(someObject);
return null; return null;
} }
fail("Shouldn't have retried."); fail("Shouldn't have retried.");
return null; return null;
}}; }
};
// A commit logged work that throws on the first attempt to get its result. // A commit logged work that throws on the first attempt to get its result.
CommitLoggedWork<Void> commitLoggedWork = new CommitLoggedWork<Void>(work, new SystemClock()) { CommitLoggedWork<Void> commitLoggedWork = new CommitLoggedWork<Void>(work, new SystemClock()) {
boolean firstCallToGetResult = true; boolean firstCallToGetResult = true;
@ -323,18 +339,22 @@ public class OfyTest {
} }
void doReadOnlyRetryTest(final RuntimeException e) { void doReadOnlyRetryTest(final RuntimeException e) {
assertThat(tm().transactNewReadOnly(new Work<Integer>() { assertThat(
tm().transactNewReadOnly(
new Supplier<Integer>() {
int count = 0; int count = 0;
@Override @Override
public Integer run() { public Integer get() {
count++; count++;
if (count == 3) { if (count == 3) {
return count; return count;
} }
throw e; throw new TransientFailureException("");
}})).isEqualTo(3); }
}))
.isEqualTo(3);
} }
@Test @Test