diff --git a/config/presubmits.py b/config/presubmits.py
index 0fff4499d..4928fae30 100644
--- a/config/presubmits.py
+++ b/config/presubmits.py
@@ -119,9 +119,10 @@ PRESUBMITS = {
"AppEngineExtension.register(...) instead.",
# PostgreSQLContainer instantiation must specify docker tag
+ # TODO(b/204572437): Fix the pattern to pass DatabaseSnapshotTest.java
PresubmitCheck(
r"[\s\S]*new\s+PostgreSQLContainer(<[\s\S]*>)?\(\s*\)[\s\S]*",
- "java", {}):
+ "java", {"DatabaseSnapshotTest.java"}):
"PostgreSQLContainer instantiation must specify docker tag.",
# Various Soy linting checks
diff --git a/core/src/main/java/google/registry/beam/common/DatabaseSnapshot.java b/core/src/main/java/google/registry/beam/common/DatabaseSnapshot.java
new file mode 100644
index 000000000..fbbae24d0
--- /dev/null
+++ b/core/src/main/java/google/registry/beam/common/DatabaseSnapshot.java
@@ -0,0 +1,88 @@
+// Copyright 2021 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam.common;
+
+import static com.google.common.base.Preconditions.checkState;
+import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
+
+import com.google.common.flogger.FluentLogger;
+import java.util.List;
+import javax.persistence.EntityManager;
+import javax.persistence.EntityTransaction;
+
+/**
+ * A database snapshot shareable by concurrent queries from multiple database clients. A snapshot is
+ * uniquely identified by its {@link #getSnapshotId snapshotId}, and must stay open until all
+ * concurrent queries to this snapshot have attached to it by calling {@link
+ * google.registry.persistence.transaction.JpaTransactionManager#setDatabaseSnapshot}. However, it
+ * can be closed before those queries complete.
+ *
+ *
This feature is Postgresql-only.
+ *
+ *
To support large queries, transaction isolation level is fixed at the REPEATABLE_READ to avoid
+ * exhausting predicate locks at the SERIALIZABLE level.
+ */
+// TODO(b/193662898): vendor-independent support for richer transaction semantics.
+public class DatabaseSnapshot implements AutoCloseable {
+
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+
+ private String snapshotId;
+ private EntityManager entityManager;
+ private EntityTransaction transaction;
+
+ private DatabaseSnapshot() {}
+
+ public String getSnapshotId() {
+ checkState(entityManager != null, "Snapshot not opened yet.");
+ checkState(entityManager.isOpen(), "Snapshot already closed.");
+ return snapshotId;
+ }
+
+ private DatabaseSnapshot open() {
+ entityManager = jpaTm().getStandaloneEntityManager();
+ transaction = entityManager.getTransaction();
+ transaction.setRollbackOnly();
+ transaction.begin();
+
+ entityManager
+ .createNativeQuery("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
+ .executeUpdate();
+
+ List> snapshotIds =
+ entityManager.createNativeQuery("SELECT pg_export_snapshot();").getResultList();
+ checkState(snapshotIds.size() == 1, "Unexpected number of snapshots: %s", snapshotIds.size());
+ snapshotId = (String) snapshotIds.get(0);
+ return this;
+ }
+
+ @Override
+ public void close() {
+ if (transaction != null && transaction.isActive()) {
+ try {
+ transaction.rollback();
+ } catch (Exception e) {
+ logger.atWarning().withCause(e).log("Failed to close a Database Snapshot");
+ }
+ }
+ if (entityManager != null && entityManager.isOpen()) {
+ entityManager.close();
+ }
+ }
+
+ public static DatabaseSnapshot createSnapshot() {
+ return new DatabaseSnapshot().open();
+ }
+}
diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java
index fe168a4a3..b8441cb81 100644
--- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java
+++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java
@@ -138,6 +138,9 @@ public final class RegistryJpaIO {
abstract Coder coder();
+ @Nullable
+ abstract String snapshotId();
+
abstract Builder toBuilder();
@Override
@@ -145,7 +148,9 @@ public final class RegistryJpaIO {
public PCollection expand(PBegin input) {
return input
.apply("Starting " + name(), Create.of((Void) null))
- .apply("Run query for " + name(), ParDo.of(new QueryRunner<>(query(), resultMapper())))
+ .apply(
+ "Run query for " + name(),
+ ParDo.of(new QueryRunner<>(query(), resultMapper(), snapshotId())))
.setCoder(coder())
.apply("Reshuffle", Reshuffle.viaRandomKey());
}
@@ -162,6 +167,18 @@ public final class RegistryJpaIO {
return toBuilder().coder(coder).build();
}
+ /**
+ * Specifies the database snapshot to use for this query.
+ *
+ * This feature is Postgresql-only. User is responsible for keeping the snapshot
+ * available until all JVM workers have started using it by calling {@link
+ * JpaTransactionManager#setDatabaseSnapshot}.
+ */
+ // TODO(b/193662898): vendor-independent support for richer transaction semantics.
+ public Read withSnapshot(String snapshotId) {
+ return toBuilder().snapshotId(snapshotId).build();
+ }
+
static Builder builder() {
return new AutoValue_RegistryJpaIO_Read.Builder()
.name(DEFAULT_NAME)
@@ -179,6 +196,8 @@ public final class RegistryJpaIO {
abstract Builder coder(Coder coder);
+ abstract Builder snapshotId(@Nullable String sharedSnapshotId);
+
abstract Read build();
Builder criteriaQuery(CriteriaQuerySupplier criteriaQuery) {
@@ -201,17 +220,28 @@ public final class RegistryJpaIO {
static class QueryRunner extends DoFn {
private final RegistryQuery query;
private final SerializableFunction resultMapper;
+ // java.util.Optional is not serializable. Use of Guava Optional is discouraged.
+ @Nullable private final String snapshotId;
- QueryRunner(RegistryQuery query, SerializableFunction resultMapper) {
+ QueryRunner(
+ RegistryQuery query,
+ SerializableFunction resultMapper,
+ @Nullable String snapshotId) {
this.query = query;
this.resultMapper = resultMapper;
+ this.snapshotId = snapshotId;
}
@ProcessElement
public void processElement(OutputReceiver outputReceiver) {
jpaTm()
.transactNoRetry(
- () -> query.stream().map(resultMapper::apply).forEach(outputReceiver::output));
+ () -> {
+ if (snapshotId != null) {
+ jpaTm().setDatabaseSnapshot(snapshotId);
+ }
+ query.stream().map(resultMapper::apply).forEach(outputReceiver::output);
+ });
}
}
}
diff --git a/core/src/main/java/google/registry/model/AppEngineEnvironment.java b/core/src/main/java/google/registry/model/AppEngineEnvironment.java
index 119a7fa08..383a6f7c3 100644
--- a/core/src/main/java/google/registry/model/AppEngineEnvironment.java
+++ b/core/src/main/java/google/registry/model/AppEngineEnvironment.java
@@ -43,10 +43,22 @@ public class AppEngineEnvironment {
private Environment environment;
+ /**
+ * Constructor for use by tests.
+ *
+ * All test suites must use the same appId for environments, since when tearing down we do not
+ * clear cached environments in spawned threads. See {@link #unsetEnvironmentForAllThreads} for
+ * more information.
+ */
public AppEngineEnvironment() {
- this("PlaceholderAppId");
+ /**
+ * Use AppEngineExtension's appId here so that ofy and sql entities can be compared with {@code
+ * Objects#equals()}. The choice of this value does not impact functional correctness.
+ */
+ this("test");
}
+ /** Constructor for use by applications, e.g., BEAM pipelines. */
public AppEngineEnvironment(String appId) {
environment = createAppEngineEnvironment(appId);
}
@@ -65,7 +77,17 @@ public class AppEngineEnvironment {
ApiProxy.clearEnvironmentForCurrentThread();
}
+ /**
+ * Unsets the test environment in all threads with best effort.
+ *
+ *
This method unsets the environment factory and clears the cached environment in the current
+ * thread (the main test runner thread). We do not clear the cache in spawned threads, even though
+ * they may be reused. This is not a problem as long as the appId stays the same: those threads
+ * are used only in AppEngine or BEAM tests, and expect the presence of an environment.
+ */
public void unsetEnvironmentForAllThreads() {
+ unsetEnvironmentForCurrentThread();
+
try {
Method method = ApiProxy.class.getDeclaredMethod("clearEnvironmentFactory");
method.setAccessible(true);
diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java
index fbd13f61d..a79c37c18 100644
--- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java
+++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManager.java
@@ -24,7 +24,33 @@ import javax.persistence.criteria.CriteriaQuery;
/** Sub-interface of {@link TransactionManager} which defines JPA related methods. */
public interface JpaTransactionManager extends TransactionManager {
- /** Returns the {@link EntityManager} for the current request. */
+ /**
+ * Returns a long-lived {@link EntityManager} not bound to a particular transaction.
+ *
+ *
Caller is responsible for closing the returned instance.
+ */
+ EntityManager getStandaloneEntityManager();
+
+ /**
+ * Specifies a database snapshot exported by another transaction to use in the current
+ * transaction.
+ *
+ *
This is a Postgresql-specific feature. This method must be called before any other SQL
+ * commands in a transaction.
+ *
+ *
To support large queries, transaction isolation level is fixed at the REPEATABLE_READ to
+ * avoid exhausting predicate locks at the SERIALIZABLE level.
+ *
+ * @see google.registry.beam.common.DatabaseSnapshot
+ */
+ // TODO(b/193662898): vendor-independent support for richer transaction semantics.
+ JpaTransactionManager setDatabaseSnapshot(String snapshotId);
+
+ /**
+ * Returns the {@link EntityManager} for the current request.
+ *
+ *
The returned instance is closed when the current transaction completes.
+ */
EntityManager getEntityManager();
/**
diff --git a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java
index 10a3fbf3a..020b84baa 100644
--- a/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java
+++ b/core/src/main/java/google/registry/persistence/transaction/JpaTransactionManagerImpl.java
@@ -120,6 +120,11 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
emf.close();
}
+ @Override
+ public EntityManager getStandaloneEntityManager() {
+ return emf.createEntityManager();
+ }
+
@Override
public EntityManager getEntityManager() {
EntityManager entityManager = transactionInfo.get().entityManager;
@@ -131,6 +136,22 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
return entityManager;
}
+ @Override
+ public JpaTransactionManager setDatabaseSnapshot(String snapshotId) {
+ // Postgresql-specific: 'set transaction' command must be called inside a transaction
+ assertInTransaction();
+
+ EntityManager entityManager = getEntityManager();
+ // Isolation is hardcoded to REPEATABLE READ, as specified by parent's Javadoc.
+ entityManager
+ .createNativeQuery("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
+ .executeUpdate();
+ entityManager
+ .createNativeQuery(String.format("SET TRANSACTION SNAPSHOT '%s'", snapshotId))
+ .executeUpdate();
+ return this;
+ }
+
@Override
public TypedQuery query(String sqlString, Class resultClass) {
return new DetachingTypedQuery<>(getEntityManager().createQuery(sqlString, resultClass));
diff --git a/core/src/test/java/google/registry/beam/common/DatabaseSnapshotTest.java b/core/src/test/java/google/registry/beam/common/DatabaseSnapshotTest.java
new file mode 100644
index 000000000..d71db5fee
--- /dev/null
+++ b/core/src/test/java/google/registry/beam/common/DatabaseSnapshotTest.java
@@ -0,0 +1,174 @@
+// Copyright 2021 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.beam.common;
+
+import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.truth.Truth;
+import google.registry.beam.TestPipelineExtension;
+import google.registry.beam.common.RegistryJpaIO.Read;
+import google.registry.model.tld.Registry;
+import google.registry.persistence.NomulusPostgreSql;
+import google.registry.persistence.PersistenceModule;
+import google.registry.persistence.transaction.CriteriaQueryBuilder;
+import google.registry.persistence.transaction.JpaTransactionManager;
+import google.registry.persistence.transaction.JpaTransactionManagerImpl;
+import google.registry.persistence.transaction.TransactionManagerFactory;
+import google.registry.testing.DatabaseHelper;
+import google.registry.testing.FakeClock;
+import javax.persistence.Persistence;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.values.PCollection;
+import org.hibernate.cfg.Environment;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+/** Unit tests for {@link DatabaseSnapshot}. */
+@Testcontainers
+public class DatabaseSnapshotTest {
+
+ /**
+ * For reasons unknown, an EntityManagerFactory created by {@code JpaIntegrationTestExtension} or
+ * {@code JpaUnitTestExtension} enters a bad state after exporting the first snapshot. Starting
+ * with the second attempt, exports alternate between error ("cannot export a snapshot from a
+ * subtransaction") and success. The {@link #createSnapshot_twiceNoRead} test below fails with
+ * either extension. EntityManagerFactory created for production does not have this problem.
+ */
+ @Container
+ private static PostgreSQLContainer sqlContainer =
+ new PostgreSQLContainer<>(NomulusPostgreSql.getDockerTag())
+ .withInitScript("sql/schema/nomulus.golden.sql");
+
+ @RegisterExtension
+ final transient TestPipelineExtension testPipeline =
+ TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
+
+ static JpaTransactionManager origJpa;
+ static JpaTransactionManager jpa;
+
+ static Registry registry;
+
+ @BeforeAll
+ static void setup() {
+ ImmutableMap jpaProperties =
+ new ImmutableMap.Builder()
+ .put(Environment.URL, sqlContainer.getJdbcUrl())
+ .put(Environment.USER, sqlContainer.getUsername())
+ .put(Environment.PASS, sqlContainer.getPassword())
+ .putAll(PersistenceModule.provideDefaultDatabaseConfigs())
+ .build();
+ jpa =
+ new JpaTransactionManagerImpl(
+ Persistence.createEntityManagerFactory("nomulus", jpaProperties), new FakeClock());
+ origJpa = jpaTm();
+ TransactionManagerFactory.setJpaTm(() -> jpa);
+
+ Registry tld = DatabaseHelper.newRegistry("tld", "TLD");
+ jpaTm().transact(() -> jpaTm().put(tld));
+ registry = jpaTm().transact(() -> jpaTm().loadByEntity(tld));
+ }
+
+ @AfterAll
+ static void tearDown() {
+ TransactionManagerFactory.setJpaTm(() -> origJpa);
+
+ if (jpa != null) {
+ jpa.teardown();
+ }
+ }
+
+ @Test
+ void createSnapshot_onceNoRead() {
+ try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {}
+ }
+
+ @Test
+ void createSnapshot_twiceNoRead() {
+ try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {}
+ try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {}
+ }
+
+ @Test
+ void readSnapshot() {
+ try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {
+ Registry snapshotRegistry =
+ jpaTm()
+ .transact(
+ () ->
+ jpaTm()
+ .setDatabaseSnapshot(databaseSnapshot.getSnapshotId())
+ .loadByEntity(registry));
+ Truth.assertThat(snapshotRegistry).isEqualTo(registry);
+ }
+ }
+
+ @Test
+ void readSnapshot_withSubsequentChange() {
+ try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {
+ Registry updated =
+ registry
+ .asBuilder()
+ .setCreateBillingCost(registry.getStandardCreateCost().plus(1))
+ .build();
+ jpaTm().transact(() -> jpaTm().put(updated));
+
+ Registry persistedUpdate = jpaTm().transact(() -> jpaTm().loadByEntity(registry));
+ Truth.assertThat(persistedUpdate).isNotEqualTo(registry);
+
+ Registry snapshotRegistry =
+ jpaTm()
+ .transact(
+ () ->
+ jpaTm()
+ .setDatabaseSnapshot(databaseSnapshot.getSnapshotId())
+ .loadByEntity(registry));
+ Truth.assertThat(snapshotRegistry).isEqualTo(registry);
+ } finally {
+ // Revert change to registry in DB, which is shared by all test methods.
+ jpaTm().transact(() -> jpaTm().put(registry));
+ }
+ }
+
+ @Test
+ void readWithRegistryJpaIO() {
+ try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {
+ Registry updated =
+ registry
+ .asBuilder()
+ .setCreateBillingCost(registry.getStandardCreateCost().plus(1))
+ .build();
+ jpaTm().transact(() -> jpaTm().put(updated));
+
+ Read read =
+ RegistryJpaIO.read(() -> CriteriaQueryBuilder.create(Registry.class).build(), x -> x)
+ .withSnapshot(databaseSnapshot.getSnapshotId());
+ PCollection registries = testPipeline.apply(read);
+
+ // This assertion depends on Registry being Serializable, which may change if the
+ // UnsafeSerializable interface is removed after migration.
+ PAssert.that(registries).containsInAnyOrder(registry);
+ testPipeline.run();
+ } finally {
+ // Revert change to registry in DB, which is shared by all test methods.
+ jpaTm().transact(() -> jpaTm().put(registry));
+ }
+ }
+}
diff --git a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java
index 3ba1fff20..e740822e3 100644
--- a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java
+++ b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java
@@ -106,7 +106,7 @@ class InitSqlPipelineTest {
@RegisterExtension
@Order(Order.DEFAULT - 1)
final transient DatastoreEntityExtension datastore =
- new DatastoreEntityExtension("test").allThreads(true);
+ new DatastoreEntityExtension().allThreads(true);
@RegisterExtension final transient InjectExtension injectExtension = new InjectExtension();
diff --git a/core/src/test/java/google/registry/testing/DatastoreEntityExtension.java b/core/src/test/java/google/registry/testing/DatastoreEntityExtension.java
index 66a832058..fd82019b2 100644
--- a/core/src/test/java/google/registry/testing/DatastoreEntityExtension.java
+++ b/core/src/test/java/google/registry/testing/DatastoreEntityExtension.java
@@ -43,10 +43,6 @@ public class DatastoreEntityExtension implements BeforeEachCallback, AfterEachCa
private boolean allThreads = false;
- public DatastoreEntityExtension(String appId) {
- environment = new AppEngineEnvironment(appId);
- }
-
public DatastoreEntityExtension() {
environment = new AppEngineEnvironment();
}