Streamline how to fake an App Engine environment (#1348)

Both `DatastoreEntityExtension.PlaceholderEnvironment` and `AppEngineEnvironment` does the same thing, so there is no point having both of them exist. To use `AppEngineEnvionrment` as an autoclosable requires the user to be mindful of where a fake App Engine environment is required. It is better to set this either in the `DatastoreEntityExtension` for tests, or in the worker initializer in Beam. It also makes it easier to remove the fake environment when we are completely datastore free.

Also made a change to how `IdService` allocate Ids in Beam.
<!-- Reviewable:start -->
This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/google/nomulus/1348)
<!-- Reviewable:end -->
This commit is contained in:
Lai Jiang 2021-10-01 16:46:46 -04:00 committed by GitHub
parent f0a9073d4e
commit 2be5eff1f5
16 changed files with 140 additions and 190 deletions

View file

@ -57,8 +57,7 @@ public final class CommitLogImports {
*/
public static ImmutableList<ImmutableList<VersionedEntity>> loadEntitiesByTransaction(
InputStream inputStream) {
try (AppEngineEnvironment appEngineEnvironment = new AppEngineEnvironment();
InputStream input = new BufferedInputStream(inputStream)) {
try (InputStream input = new BufferedInputStream(inputStream)) {
Iterator<ImmutableObject> commitLogs = createDeserializingIterator(input, false);
checkState(commitLogs.hasNext());
checkState(commitLogs.next() instanceof CommitLogCheckpoint);

View file

@ -17,7 +17,6 @@ package google.registry.beam.common;
import static com.google.common.base.Verify.verify;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import google.registry.backup.AppEngineEnvironment;
import google.registry.model.contact.ContactResource;
import google.registry.persistence.transaction.CriteriaQueryBuilder;
import google.registry.persistence.transaction.JpaTransactionManager;
@ -59,18 +58,16 @@ public class JpaDemoPipeline implements Serializable {
public void processElement() {
// AppEngineEnvironment is needed as long as JPA entity classes still depends
// on Objectify.
try (AppEngineEnvironment allowOfyEntity = new AppEngineEnvironment()) {
int result =
(Integer)
jpaTm()
.transact(
() ->
jpaTm()
.getEntityManager()
.createNativeQuery("select 1;")
.getSingleResult());
verify(result == 1, "Expecting 1, got %s.", result);
}
int result =
(Integer)
jpaTm()
.transact(
() ->
jpaTm()
.getEntityManager()
.createNativeQuery("select 1;")
.getSingleResult());
verify(result == 1, "Expecting 1, got %s.", result);
counter.inc();
}
}));

View file

@ -20,11 +20,9 @@ import static org.apache.beam.sdk.values.TypeDescriptors.integers;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import google.registry.backup.AppEngineEnvironment;
import google.registry.beam.common.RegistryQuery.CriteriaQuerySupplier;
import google.registry.model.UpdateAutoTimestamp;
import google.registry.model.UpdateAutoTimestamp.DisableAutoUpdateResource;
import google.registry.model.ofy.ObjectifyService;
import google.registry.model.replay.SqlEntity;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.persistence.transaction.TransactionManagerFactory;
@ -211,14 +209,9 @@ public final class RegistryJpaIO {
@ProcessElement
public void processElement(OutputReceiver<T> outputReceiver) {
// AppEngineEnvironment is need for handling VKeys, which involve Ofy keys. Unlike
// SqlBatchWriter, it is unnecessary to initialize ObjectifyService in this class.
try (AppEngineEnvironment env = new AppEngineEnvironment()) {
// TODO(b/187210388): JpaTransactionManager should support non-transactional query.
jpaTm()
.transactNoRetry(
() -> query.stream().map(resultMapper::apply).forEach(outputReceiver::output));
}
jpaTm()
.transactNoRetry(
() -> query.stream().map(resultMapper::apply).forEach(outputReceiver::output));
}
}
}
@ -364,16 +357,6 @@ public final class RegistryJpaIO {
this.withAutoTimestamp = withAutoTimestamp;
}
@Setup
public void setup() {
// AppEngineEnvironment is needed as long as Objectify keys are still involved in the handling
// of SQL entities (e.g., in VKeys). ObjectifyService needs to be initialized when conversion
// between Ofy entity and Datastore entity is needed.
try (AppEngineEnvironment env = new AppEngineEnvironment()) {
ObjectifyService.initOfy();
}
}
@ProcessElement
public void processElement(@Element KV<ShardedKey<Integer>, Iterable<T>> kv) {
if (withAutoTimestamp) {
@ -386,19 +369,17 @@ public final class RegistryJpaIO {
}
private void actuallyProcessElement(@Element KV<ShardedKey<Integer>, Iterable<T>> kv) {
try (AppEngineEnvironment env = new AppEngineEnvironment()) {
ImmutableList<Object> entities =
Streams.stream(kv.getValue())
.map(this.jpaConverter::apply)
// TODO(b/177340730): post migration delete the line below.
.filter(Objects::nonNull)
.collect(ImmutableList.toImmutableList());
try {
jpaTm().transact(() -> jpaTm().putAll(entities));
counter.inc(entities.size());
} catch (RuntimeException e) {
processSingly(entities);
}
ImmutableList<Object> entities =
Streams.stream(kv.getValue())
.map(this.jpaConverter::apply)
// TODO(b/177340730): post migration delete the line below.
.filter(Objects::nonNull)
.collect(ImmutableList.toImmutableList());
try {
jpaTm().transact(() -> jpaTm().putAll(entities));
counter.inc(entities.size());
} catch (RuntimeException e) {
processSingly(entities);
}
}

View file

@ -20,6 +20,8 @@ import com.google.auto.service.AutoService;
import com.google.common.flogger.FluentLogger;
import dagger.Lazy;
import google.registry.config.RegistryEnvironment;
import google.registry.config.SystemPropertySetter;
import google.registry.model.AppEngineEnvironment;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.persistence.transaction.TransactionManagerFactory;
import org.apache.beam.sdk.harness.JvmInitializer;
@ -35,18 +37,26 @@ import org.apache.beam.sdk.options.PipelineOptions;
@AutoService(JvmInitializer.class)
public class RegistryPipelineWorkerInitializer implements JvmInitializer {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public static final String PROPERTY = "google.registry.beam";
@Override
public void beforeProcessing(PipelineOptions options) {
RegistryPipelineOptions registryOptions = options.as(RegistryPipelineOptions.class);
RegistryEnvironment environment = registryOptions.getRegistryEnvironment();
if (environment == null || environment.equals(RegistryEnvironment.UNITTEST)) {
return;
throw new RuntimeException(
"A registry environment must be specified in the pipeline options.");
}
logger.atInfo().log("Setting up RegistryEnvironment %s.", environment);
environment.setup();
Lazy<JpaTransactionManager> transactionManagerLazy =
toRegistryPipelineComponent(registryOptions).getJpaTransactionManager();
TransactionManagerFactory.setJpaTmOnBeamWorker(transactionManagerLazy::get);
// Masquarade all threads as App Engine threads so we can create Ofy keys in the pipeline. Also
// loads all ofy entities.
new AppEngineEnvironment("Beam").setEnvironmentForAllThreads();
// Set the system property so that we can call IdService.allocateId() without access to
// datastore.
SystemPropertySetter.PRODUCTION_IMPL.setProperty(PROPERTY, "true");
}
}

View file

@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.googlecode.objectify.Key;
import google.registry.backup.AppEngineEnvironment;
import google.registry.backup.VersionedEntity;
import google.registry.beam.common.RegistryJpaIO;
import google.registry.beam.initsql.Transforms.RemoveDomainBaseForeignKeys;
@ -230,9 +229,7 @@ public class InitSqlPipeline implements Serializable {
}
private static ImmutableList<String> toKindStrings(Collection<Class<?>> entityClasses) {
try (AppEngineEnvironment env = new AppEngineEnvironment()) {
return entityClasses.stream().map(Key::getKind).collect(ImmutableList.toImmutableList());
}
return entityClasses.stream().map(Key::getKind).collect(ImmutableList.toImmutableList());
}
public static void main(String[] args) {

View file

@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.backup;
package google.registry.model;
import com.google.apphosting.api.ApiProxy;
import com.google.apphosting.api.ApiProxy.Environment;
import com.google.common.collect.ImmutableMap;
import java.io.Closeable;
import google.registry.model.ofy.ObjectifyService;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
@ -38,26 +39,39 @@ import java.lang.reflect.Proxy;
* <p>Note that conversion from Objectify objects to Datastore {@code Entities} still requires the
* Datastore service.
*/
public class AppEngineEnvironment implements Closeable {
public class AppEngineEnvironment {
private boolean isPlaceHolderNeeded;
private Environment environment;
public AppEngineEnvironment() {
this("PlaceholderAppId");
}
public AppEngineEnvironment(String appId) {
isPlaceHolderNeeded = ApiProxy.getCurrentEnvironment() == null;
// isPlaceHolderNeeded may be true when we are invoked in a test with AppEngineExtension.
if (isPlaceHolderNeeded) {
ApiProxy.setEnvironmentForCurrentThread(createAppEngineEnvironment(appId));
}
environment = createAppEngineEnvironment(appId);
}
@Override
public void close() {
if (isPlaceHolderNeeded) {
ApiProxy.setEnvironmentForCurrentThread(null);
public void setEnvironmentForCurrentThread() {
ApiProxy.setEnvironmentForCurrentThread(environment);
ObjectifyService.initOfy();
}
public void setEnvironmentForAllThreads() {
setEnvironmentForCurrentThread();
ApiProxy.setEnvironmentFactory(() -> environment);
}
public void unsetEnvironmentForCurrentThread() {
ApiProxy.clearEnvironmentForCurrentThread();
}
public void unsetEnvironmentForAllThreads() {
try {
Method method = ApiProxy.class.getDeclaredMethod("clearEnvironmentFactory");
method.setAccessible(true);
method.invoke(null);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}

View file

@ -18,13 +18,14 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.common.annotations.VisibleForTesting;
import google.registry.beam.common.RegistryPipelineWorkerInitializer;
import google.registry.config.RegistryEnvironment;
import java.util.concurrent.atomic.AtomicLong;
/**
* Allocates a globally unique {@link Long} number to use as an Ofy {@code @Id}.
*
* <p>In non-test environments the Id is generated by Datastore, whereas in tests it's from an
* <p>In non-test, non-beam environments the Id is generated by Datastore, otherwise it's from an
* atomic long number that's incremented every time this method is called.
*/
public final class IdService {
@ -35,13 +36,25 @@ public final class IdService {
*/
private static final String APP_WIDE_ALLOCATION_KIND = "common";
/** Counts of used ids for use in unit tests. Outside tests this is never used. */
private static final AtomicLong nextTestId = new AtomicLong(1); // ids cannot be zero
/**
* Counts of used ids for use in unit tests or Beam.
*
* <p>Note that one should only use self-allocate Ids in Beam for entities whose Ids are not
* important and are not persisted back to the database, i. e. nowhere the uniqueness of the ID is
* required.
*/
private static final AtomicLong nextSelfAllocatedId = new AtomicLong(1); // ids cannot be zero
private static final boolean isSelfAllocated() {
return RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get())
|| "true".equals(System.getProperty(RegistryPipelineWorkerInitializer.PROPERTY, "false"));
}
/** Allocates an id. */
// TODO(b/201547855): Find a way to allocate a unique ID without datastore.
public static long allocateId() {
return RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get())
? nextTestId.getAndIncrement()
return isSelfAllocated()
? nextSelfAllocatedId.getAndIncrement()
: DatastoreServiceFactory.getDatastoreService()
.allocateIds(APP_WIDE_ALLOCATION_KIND, 1)
.iterator()
@ -49,13 +62,11 @@ public final class IdService {
.getId();
}
/** Resets the global test id counter (i.e. sets the next id to 1). */
/** Resets the global self-allocated id counter (i.e. sets the next id to 1). */
@VisibleForTesting
public static void resetNextTestId() {
public static void resetSelfAllocatedId() {
checkState(
RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get()),
"Can't call resetTestIdCounts() from RegistryEnvironment.%s",
RegistryEnvironment.get());
nextTestId.set(1); // ids cannot be zero
isSelfAllocated(), "Can only call resetSelfAllocatedId() in unit tests or Beam pipelines");
nextSelfAllocatedId.set(1); // ids cannot be zero
}
}

View file

@ -62,7 +62,8 @@ public class RegistryJpaReadTest {
@RegisterExtension
@Order(Order.DEFAULT - 1)
final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension();
final transient DatastoreEntityExtension datastore =
new DatastoreEntityExtension().allThreads(true);
@RegisterExtension
final transient JpaIntegrationTestExtension database =

View file

@ -71,7 +71,7 @@ class CommitLogTransformsTest implements Serializable {
@RegisterExtension
@Order(value = 1)
final transient DatastoreEntityExtension datastoreEntityExtension =
new DatastoreEntityExtension();
new DatastoreEntityExtension().allThreads(true);
@RegisterExtension
final transient TestPipelineExtension testPipeline =

View file

@ -30,7 +30,6 @@ import static google.registry.util.DateTimeUtils.START_OF_TIME;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.googlecode.objectify.Key;
import google.registry.backup.AppEngineEnvironment;
import google.registry.beam.TestPipelineExtension;
import google.registry.flows.domain.DomainFlowUtils;
import google.registry.model.billing.BillingEvent;
@ -106,7 +105,8 @@ class InitSqlPipelineTest {
@RegisterExtension
@Order(Order.DEFAULT - 1)
final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension();
final transient DatastoreEntityExtension datastore =
new DatastoreEntityExtension("test").allThreads(true);
@RegisterExtension final transient InjectExtension injectExtension = new InjectExtension();
@ -331,20 +331,18 @@ class InitSqlPipelineTest {
.as(InitSqlPipelineOptions.class);
InitSqlPipeline initSqlPipeline = new InitSqlPipeline(options);
initSqlPipeline.run(testPipeline).waitUntilFinish();
try (AppEngineEnvironment env = new AppEngineEnvironment("test")) {
assertHostResourceEquals(
jpaTm().transact(() -> jpaTm().loadByKey(hostResource.createVKey())), hostResource);
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Registrar.class)))
.comparingElementsUsing(immutableObjectCorrespondence("lastUpdateTime"))
.containsExactly(registrar1, registrar2);
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(ContactResource.class)))
.comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp"))
.containsExactly(contact1, contact2);
assertDomainEquals(jpaTm().transact(() -> jpaTm().loadByKey(domain.createVKey())), domain);
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Cursor.class)))
.comparingElementsUsing(immutableObjectCorrespondence())
.containsExactly(globalCursor, tldCursor);
}
assertHostResourceEquals(
jpaTm().transact(() -> jpaTm().loadByKey(hostResource.createVKey())), hostResource);
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Registrar.class)))
.comparingElementsUsing(immutableObjectCorrespondence("lastUpdateTime"))
.containsExactly(registrar1, registrar2);
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(ContactResource.class)))
.comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp"))
.containsExactly(contact1, contact2);
assertDomainEquals(jpaTm().transact(() -> jpaTm().loadByKey(domain.createVKey())), domain);
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Cursor.class)))
.comparingElementsUsing(immutableObjectCorrespondence())
.containsExactly(globalCursor, tldCursor);
}
private static void assertHostResourceEquals(HostResource actual, HostResource expected) {

View file

@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import com.google.common.truth.Truth;
import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
import google.registry.backup.AppEngineEnvironment;
import google.registry.backup.VersionedEntity;
import java.io.Serializable;
import java.util.Collection;
@ -124,20 +123,18 @@ public final class InitSqlTestUtils {
public void processElement(
@Element KV<String, Iterable<VersionedEntity>> input,
OutputReceiver<String> out) {
try (AppEngineEnvironment env = new AppEngineEnvironment()) {
ImmutableList<KV<Long, Object>> actual =
Streams.stream(input.getValue())
.map(InitSqlTestUtils::rawEntityToOfyWithTimestamp)
.collect(ImmutableList.toImmutableList());
try {
Truth.assertThat(actual)
.containsExactlyElementsIn(
Stream.of(expected)
.map(InitSqlTestUtils::expectedToOfyWithTimestamp)
.collect(ImmutableList.toImmutableList()));
} catch (AssertionError e) {
out.output(e.toString());
}
ImmutableList<KV<Long, Object>> actual =
Streams.stream(input.getValue())
.map(InitSqlTestUtils::rawEntityToOfyWithTimestamp)
.collect(ImmutableList.toImmutableList());
try {
Truth.assertThat(actual)
.containsExactlyElementsIn(
Stream.of(expected)
.map(InitSqlTestUtils::expectedToOfyWithTimestamp)
.collect(ImmutableList.toImmutableList()));
} catch (AssertionError e) {
out.output(e.toString());
}
}
}));

View file

@ -95,7 +95,7 @@ class LoadDatastoreSnapshotTest {
@RegisterExtension
@Order(value = 1)
final transient DatastoreEntityExtension datastoreEntityExtension =
new DatastoreEntityExtension();
new DatastoreEntityExtension().allThreads(true);
@RegisterExtension
final transient TestPipelineExtension testPipeline =

View file

@ -227,7 +227,8 @@ class InvoicingPipelineTest {
@RegisterExtension
@Order(Order.DEFAULT - 1)
final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension();
final transient DatastoreEntityExtension datastore =
new DatastoreEntityExtension().allThreads(true);
@RegisterExtension
final TestPipelineExtension pipeline =

View file

@ -116,7 +116,8 @@ class Spec11PipelineTest {
@RegisterExtension
@Order(Order.DEFAULT - 1)
final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension();
final transient DatastoreEntityExtension datastore =
new DatastoreEntityExtension().allThreads(true);
@TempDir Path tmpDir;

View file

@ -460,7 +460,7 @@ public final class AppEngineExtension implements BeforeEachCallback, AfterEachCa
if (withDatastore) {
ObjectifyService.initOfy();
// Reset id allocation in ObjectifyService so that ids are deterministic in tests.
IdService.resetNextTestId();
IdService.resetSelfAllocatedId();
this.ofyTestEntities.forEach(AppEngineExtension::register);
}
}

View file

@ -14,17 +14,10 @@
package google.registry.testing;
import static google.registry.model.ofy.ObjectifyService.auditedOfy;
import com.google.apphosting.api.ApiProxy;
import com.google.apphosting.api.ApiProxy.Environment;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import google.registry.model.AppEngineEnvironment;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
/**
* Allows instantiation of Datastore {@code Entity}s without the heavyweight {@link
@ -41,13 +34,23 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
* google.registry.model.domain.DomainBaseSqlTest} for example, and to <a
* href="https://junit.org/junit5/docs/current/user-guide/#extensions-registration-programmatic">
* JUnit 5 User Guide</a> for details of extension ordering.
*
* @see AppEngineEnvironment
*/
public class DatastoreEntityExtension implements BeforeEachCallback, AfterEachCallback {
private static final Environment PLACEHOLDER_ENV = new PlaceholderEnvironment();
private final AppEngineEnvironment environment;
private boolean allThreads = false;
public DatastoreEntityExtension(String appId) {
environment = new AppEngineEnvironment(appId);
}
public DatastoreEntityExtension() {
environment = new AppEngineEnvironment();
}
/**
* Whether all threads should be masqueraded as GAE threads.
*
@ -69,79 +72,19 @@ public class DatastoreEntityExtension implements BeforeEachCallback, AfterEachCa
@Override
public void beforeEach(ExtensionContext context) {
ApiProxy.setEnvironmentForCurrentThread(PLACEHOLDER_ENV);
// In order to create keys for entities they must be registered with Ofy. Calling this method
// will load the ObjectifyService class, whose static initialization block registers all Ofy
// entities.
auditedOfy();
if (allThreads) {
ApiProxy.setEnvironmentFactory(() -> PLACEHOLDER_ENV);
environment.setEnvironmentForAllThreads();
} else {
environment.setEnvironmentForCurrentThread();
}
}
@Override
public void afterEach(ExtensionContext context)
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
// Clear the cached instance.
ApiProxy.clearEnvironmentForCurrentThread();
public void afterEach(ExtensionContext context) {
if (allThreads) {
Method method = ApiProxy.class.getDeclaredMethod("clearEnvironmentFactory");
method.setAccessible(true);
method.invoke(null);
}
}
private static final class PlaceholderEnvironment implements Environment {
@Override
public String getAppId() {
return "PlaceholderAppId";
}
@Override
public Map<String, Object> getAttributes() {
return ImmutableMap.of();
}
@Override
public String getModuleId() {
throw new UnsupportedOperationException();
}
@Override
public String getVersionId() {
throw new UnsupportedOperationException();
}
@Override
public String getEmail() {
throw new UnsupportedOperationException();
}
@Override
public boolean isLoggedIn() {
throw new UnsupportedOperationException();
}
@Override
public boolean isAdmin() {
throw new UnsupportedOperationException();
}
@Override
public String getAuthDomain() {
throw new UnsupportedOperationException();
}
@SuppressWarnings("deprecation")
@Override
public String getRequestNamespace() {
throw new UnsupportedOperationException();
}
@Override
public long getRemainingMillis() {
throw new UnsupportedOperationException();
environment.unsetEnvironmentForAllThreads();
} else {
environment.unsetEnvironmentForCurrentThread();
}
}
}