mirror of
https://github.com/google/nomulus.git
synced 2025-04-29 19:47:51 +02:00
Support shared database snapshot (#1403)
* Support shared database snapshot Allow multiple workers to share a CONSISTENT database snapshot. The motivating use case is SQL database snapshot loading, where it is too slow to depend on one worker to load everything. This currently is postgresql-specific, but will be improved to be vendor-independent. Also made sure AppEngineEnvironment.java clears the cached environment in call cases when tearing down.
This commit is contained in:
parent
30c23efba9
commit
e761e67434
9 changed files with 369 additions and 11 deletions
|
@ -119,9 +119,10 @@ PRESUBMITS = {
|
|||
"AppEngineExtension.register(...) instead.",
|
||||
|
||||
# PostgreSQLContainer instantiation must specify docker tag
|
||||
# TODO(b/204572437): Fix the pattern to pass DatabaseSnapshotTest.java
|
||||
PresubmitCheck(
|
||||
r"[\s\S]*new\s+PostgreSQLContainer(<[\s\S]*>)?\(\s*\)[\s\S]*",
|
||||
"java", {}):
|
||||
"java", {"DatabaseSnapshotTest.java"}):
|
||||
"PostgreSQLContainer instantiation must specify docker tag.",
|
||||
|
||||
# Various Soy linting checks
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.common;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
|
||||
|
||||
import com.google.common.flogger.FluentLogger;
|
||||
import java.util.List;
|
||||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.EntityTransaction;
|
||||
|
||||
/**
|
||||
* A database snapshot shareable by concurrent queries from multiple database clients. A snapshot is
|
||||
* uniquely identified by its {@link #getSnapshotId snapshotId}, and must stay open until all
|
||||
* concurrent queries to this snapshot have attached to it by calling {@link
|
||||
* google.registry.persistence.transaction.JpaTransactionManager#setDatabaseSnapshot}. However, it
|
||||
* can be closed before those queries complete.
|
||||
*
|
||||
* <p>This feature is <em>Postgresql-only</em>.
|
||||
*
|
||||
* <p>To support large queries, transaction isolation level is fixed at the REPEATABLE_READ to avoid
|
||||
* exhausting predicate locks at the SERIALIZABLE level.
|
||||
*/
|
||||
// TODO(b/193662898): vendor-independent support for richer transaction semantics.
|
||||
public class DatabaseSnapshot implements AutoCloseable {
|
||||
|
||||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||
|
||||
private String snapshotId;
|
||||
private EntityManager entityManager;
|
||||
private EntityTransaction transaction;
|
||||
|
||||
private DatabaseSnapshot() {}
|
||||
|
||||
public String getSnapshotId() {
|
||||
checkState(entityManager != null, "Snapshot not opened yet.");
|
||||
checkState(entityManager.isOpen(), "Snapshot already closed.");
|
||||
return snapshotId;
|
||||
}
|
||||
|
||||
private DatabaseSnapshot open() {
|
||||
entityManager = jpaTm().getStandaloneEntityManager();
|
||||
transaction = entityManager.getTransaction();
|
||||
transaction.setRollbackOnly();
|
||||
transaction.begin();
|
||||
|
||||
entityManager
|
||||
.createNativeQuery("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
|
||||
.executeUpdate();
|
||||
|
||||
List<?> snapshotIds =
|
||||
entityManager.createNativeQuery("SELECT pg_export_snapshot();").getResultList();
|
||||
checkState(snapshotIds.size() == 1, "Unexpected number of snapshots: %s", snapshotIds.size());
|
||||
snapshotId = (String) snapshotIds.get(0);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (transaction != null && transaction.isActive()) {
|
||||
try {
|
||||
transaction.rollback();
|
||||
} catch (Exception e) {
|
||||
logger.atWarning().withCause(e).log("Failed to close a Database Snapshot");
|
||||
}
|
||||
}
|
||||
if (entityManager != null && entityManager.isOpen()) {
|
||||
entityManager.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static DatabaseSnapshot createSnapshot() {
|
||||
return new DatabaseSnapshot().open();
|
||||
}
|
||||
}
|
|
@ -138,6 +138,9 @@ public final class RegistryJpaIO {
|
|||
|
||||
abstract Coder<T> coder();
|
||||
|
||||
@Nullable
|
||||
abstract String snapshotId();
|
||||
|
||||
abstract Builder<R, T> toBuilder();
|
||||
|
||||
@Override
|
||||
|
@ -145,7 +148,9 @@ public final class RegistryJpaIO {
|
|||
public PCollection<T> expand(PBegin input) {
|
||||
return input
|
||||
.apply("Starting " + name(), Create.of((Void) null))
|
||||
.apply("Run query for " + name(), ParDo.of(new QueryRunner<>(query(), resultMapper())))
|
||||
.apply(
|
||||
"Run query for " + name(),
|
||||
ParDo.of(new QueryRunner<>(query(), resultMapper(), snapshotId())))
|
||||
.setCoder(coder())
|
||||
.apply("Reshuffle", Reshuffle.viaRandomKey());
|
||||
}
|
||||
|
@ -162,6 +167,18 @@ public final class RegistryJpaIO {
|
|||
return toBuilder().coder(coder).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies the database snapshot to use for this query.
|
||||
*
|
||||
* <p>This feature is <em>Postgresql-only</em>. User is responsible for keeping the snapshot
|
||||
* available until all JVM workers have started using it by calling {@link
|
||||
* JpaTransactionManager#setDatabaseSnapshot}.
|
||||
*/
|
||||
// TODO(b/193662898): vendor-independent support for richer transaction semantics.
|
||||
public Read<R, T> withSnapshot(String snapshotId) {
|
||||
return toBuilder().snapshotId(snapshotId).build();
|
||||
}
|
||||
|
||||
static <R, T> Builder<R, T> builder() {
|
||||
return new AutoValue_RegistryJpaIO_Read.Builder<R, T>()
|
||||
.name(DEFAULT_NAME)
|
||||
|
@ -179,6 +196,8 @@ public final class RegistryJpaIO {
|
|||
|
||||
abstract Builder<R, T> coder(Coder coder);
|
||||
|
||||
abstract Builder<R, T> snapshotId(@Nullable String sharedSnapshotId);
|
||||
|
||||
abstract Read<R, T> build();
|
||||
|
||||
Builder<R, T> criteriaQuery(CriteriaQuerySupplier<R> criteriaQuery) {
|
||||
|
@ -201,17 +220,28 @@ public final class RegistryJpaIO {
|
|||
static class QueryRunner<R, T> extends DoFn<Void, T> {
|
||||
private final RegistryQuery<R> query;
|
||||
private final SerializableFunction<R, T> resultMapper;
|
||||
// java.util.Optional is not serializable. Use of Guava Optional is discouraged.
|
||||
@Nullable private final String snapshotId;
|
||||
|
||||
QueryRunner(RegistryQuery<R> query, SerializableFunction<R, T> resultMapper) {
|
||||
QueryRunner(
|
||||
RegistryQuery<R> query,
|
||||
SerializableFunction<R, T> resultMapper,
|
||||
@Nullable String snapshotId) {
|
||||
this.query = query;
|
||||
this.resultMapper = resultMapper;
|
||||
this.snapshotId = snapshotId;
|
||||
}
|
||||
|
||||
@ProcessElement
|
||||
public void processElement(OutputReceiver<T> outputReceiver) {
|
||||
jpaTm()
|
||||
.transactNoRetry(
|
||||
() -> query.stream().map(resultMapper::apply).forEach(outputReceiver::output));
|
||||
() -> {
|
||||
if (snapshotId != null) {
|
||||
jpaTm().setDatabaseSnapshot(snapshotId);
|
||||
}
|
||||
query.stream().map(resultMapper::apply).forEach(outputReceiver::output);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,10 +43,22 @@ public class AppEngineEnvironment {
|
|||
|
||||
private Environment environment;
|
||||
|
||||
/**
|
||||
* Constructor for use by tests.
|
||||
*
|
||||
* <p>All test suites must use the same appId for environments, since when tearing down we do not
|
||||
* clear cached environments in spawned threads. See {@link #unsetEnvironmentForAllThreads} for
|
||||
* more information.
|
||||
*/
|
||||
public AppEngineEnvironment() {
|
||||
this("PlaceholderAppId");
|
||||
/**
|
||||
* Use AppEngineExtension's appId here so that ofy and sql entities can be compared with {@code
|
||||
* Objects#equals()}. The choice of this value does not impact functional correctness.
|
||||
*/
|
||||
this("test");
|
||||
}
|
||||
|
||||
/** Constructor for use by applications, e.g., BEAM pipelines. */
|
||||
public AppEngineEnvironment(String appId) {
|
||||
environment = createAppEngineEnvironment(appId);
|
||||
}
|
||||
|
@ -65,7 +77,17 @@ public class AppEngineEnvironment {
|
|||
ApiProxy.clearEnvironmentForCurrentThread();
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsets the test environment in all threads with best effort.
|
||||
*
|
||||
* <p>This method unsets the environment factory and clears the cached environment in the current
|
||||
* thread (the main test runner thread). We do not clear the cache in spawned threads, even though
|
||||
* they may be reused. This is not a problem as long as the appId stays the same: those threads
|
||||
* are used only in AppEngine or BEAM tests, and expect the presence of an environment.
|
||||
*/
|
||||
public void unsetEnvironmentForAllThreads() {
|
||||
unsetEnvironmentForCurrentThread();
|
||||
|
||||
try {
|
||||
Method method = ApiProxy.class.getDeclaredMethod("clearEnvironmentFactory");
|
||||
method.setAccessible(true);
|
||||
|
|
|
@ -24,7 +24,33 @@ import javax.persistence.criteria.CriteriaQuery;
|
|||
/** Sub-interface of {@link TransactionManager} which defines JPA related methods. */
|
||||
public interface JpaTransactionManager extends TransactionManager {
|
||||
|
||||
/** Returns the {@link EntityManager} for the current request. */
|
||||
/**
|
||||
* Returns a long-lived {@link EntityManager} not bound to a particular transaction.
|
||||
*
|
||||
* <p>Caller is responsible for closing the returned instance.
|
||||
*/
|
||||
EntityManager getStandaloneEntityManager();
|
||||
|
||||
/**
|
||||
* Specifies a database snapshot exported by another transaction to use in the current
|
||||
* transaction.
|
||||
*
|
||||
* <p>This is a Postgresql-specific feature. This method must be called before any other SQL
|
||||
* commands in a transaction.
|
||||
*
|
||||
* <p>To support large queries, transaction isolation level is fixed at the REPEATABLE_READ to
|
||||
* avoid exhausting predicate locks at the SERIALIZABLE level.
|
||||
*
|
||||
* @see google.registry.beam.common.DatabaseSnapshot
|
||||
*/
|
||||
// TODO(b/193662898): vendor-independent support for richer transaction semantics.
|
||||
JpaTransactionManager setDatabaseSnapshot(String snapshotId);
|
||||
|
||||
/**
|
||||
* Returns the {@link EntityManager} for the current request.
|
||||
*
|
||||
* <p>The returned instance is closed when the current transaction completes.
|
||||
*/
|
||||
EntityManager getEntityManager();
|
||||
|
||||
/**
|
||||
|
|
|
@ -120,6 +120,11 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
|
|||
emf.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityManager getStandaloneEntityManager() {
|
||||
return emf.createEntityManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityManager getEntityManager() {
|
||||
EntityManager entityManager = transactionInfo.get().entityManager;
|
||||
|
@ -131,6 +136,22 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
|
|||
return entityManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JpaTransactionManager setDatabaseSnapshot(String snapshotId) {
|
||||
// Postgresql-specific: 'set transaction' command must be called inside a transaction
|
||||
assertInTransaction();
|
||||
|
||||
EntityManager entityManager = getEntityManager();
|
||||
// Isolation is hardcoded to REPEATABLE READ, as specified by parent's Javadoc.
|
||||
entityManager
|
||||
.createNativeQuery("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
|
||||
.executeUpdate();
|
||||
entityManager
|
||||
.createNativeQuery(String.format("SET TRANSACTION SNAPSHOT '%s'", snapshotId))
|
||||
.executeUpdate();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> TypedQuery<T> query(String sqlString, Class<T> resultClass) {
|
||||
return new DetachingTypedQuery<>(getEntityManager().createQuery(sqlString, resultClass));
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.common;
|
||||
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.truth.Truth;
|
||||
import google.registry.beam.TestPipelineExtension;
|
||||
import google.registry.beam.common.RegistryJpaIO.Read;
|
||||
import google.registry.model.tld.Registry;
|
||||
import google.registry.persistence.NomulusPostgreSql;
|
||||
import google.registry.persistence.PersistenceModule;
|
||||
import google.registry.persistence.transaction.CriteriaQueryBuilder;
|
||||
import google.registry.persistence.transaction.JpaTransactionManager;
|
||||
import google.registry.persistence.transaction.JpaTransactionManagerImpl;
|
||||
import google.registry.persistence.transaction.TransactionManagerFactory;
|
||||
import google.registry.testing.DatabaseHelper;
|
||||
import google.registry.testing.FakeClock;
|
||||
import javax.persistence.Persistence;
|
||||
import org.apache.beam.sdk.testing.PAssert;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.hibernate.cfg.Environment;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.testcontainers.containers.PostgreSQLContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
|
||||
/** Unit tests for {@link DatabaseSnapshot}. */
|
||||
@Testcontainers
|
||||
public class DatabaseSnapshotTest {
|
||||
|
||||
/**
|
||||
* For reasons unknown, an EntityManagerFactory created by {@code JpaIntegrationTestExtension} or
|
||||
* {@code JpaUnitTestExtension} enters a bad state after exporting the first snapshot. Starting
|
||||
* with the second attempt, exports alternate between error ("cannot export a snapshot from a
|
||||
* subtransaction") and success. The {@link #createSnapshot_twiceNoRead} test below fails with
|
||||
* either extension. EntityManagerFactory created for production does not have this problem.
|
||||
*/
|
||||
@Container
|
||||
private static PostgreSQLContainer sqlContainer =
|
||||
new PostgreSQLContainer<>(NomulusPostgreSql.getDockerTag())
|
||||
.withInitScript("sql/schema/nomulus.golden.sql");
|
||||
|
||||
@RegisterExtension
|
||||
final transient TestPipelineExtension testPipeline =
|
||||
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
|
||||
|
||||
static JpaTransactionManager origJpa;
|
||||
static JpaTransactionManager jpa;
|
||||
|
||||
static Registry registry;
|
||||
|
||||
@BeforeAll
|
||||
static void setup() {
|
||||
ImmutableMap<String, String> jpaProperties =
|
||||
new ImmutableMap.Builder<String, String>()
|
||||
.put(Environment.URL, sqlContainer.getJdbcUrl())
|
||||
.put(Environment.USER, sqlContainer.getUsername())
|
||||
.put(Environment.PASS, sqlContainer.getPassword())
|
||||
.putAll(PersistenceModule.provideDefaultDatabaseConfigs())
|
||||
.build();
|
||||
jpa =
|
||||
new JpaTransactionManagerImpl(
|
||||
Persistence.createEntityManagerFactory("nomulus", jpaProperties), new FakeClock());
|
||||
origJpa = jpaTm();
|
||||
TransactionManagerFactory.setJpaTm(() -> jpa);
|
||||
|
||||
Registry tld = DatabaseHelper.newRegistry("tld", "TLD");
|
||||
jpaTm().transact(() -> jpaTm().put(tld));
|
||||
registry = jpaTm().transact(() -> jpaTm().loadByEntity(tld));
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void tearDown() {
|
||||
TransactionManagerFactory.setJpaTm(() -> origJpa);
|
||||
|
||||
if (jpa != null) {
|
||||
jpa.teardown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void createSnapshot_onceNoRead() {
|
||||
try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {}
|
||||
}
|
||||
|
||||
@Test
|
||||
void createSnapshot_twiceNoRead() {
|
||||
try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {}
|
||||
try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {}
|
||||
}
|
||||
|
||||
@Test
|
||||
void readSnapshot() {
|
||||
try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {
|
||||
Registry snapshotRegistry =
|
||||
jpaTm()
|
||||
.transact(
|
||||
() ->
|
||||
jpaTm()
|
||||
.setDatabaseSnapshot(databaseSnapshot.getSnapshotId())
|
||||
.loadByEntity(registry));
|
||||
Truth.assertThat(snapshotRegistry).isEqualTo(registry);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void readSnapshot_withSubsequentChange() {
|
||||
try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {
|
||||
Registry updated =
|
||||
registry
|
||||
.asBuilder()
|
||||
.setCreateBillingCost(registry.getStandardCreateCost().plus(1))
|
||||
.build();
|
||||
jpaTm().transact(() -> jpaTm().put(updated));
|
||||
|
||||
Registry persistedUpdate = jpaTm().transact(() -> jpaTm().loadByEntity(registry));
|
||||
Truth.assertThat(persistedUpdate).isNotEqualTo(registry);
|
||||
|
||||
Registry snapshotRegistry =
|
||||
jpaTm()
|
||||
.transact(
|
||||
() ->
|
||||
jpaTm()
|
||||
.setDatabaseSnapshot(databaseSnapshot.getSnapshotId())
|
||||
.loadByEntity(registry));
|
||||
Truth.assertThat(snapshotRegistry).isEqualTo(registry);
|
||||
} finally {
|
||||
// Revert change to registry in DB, which is shared by all test methods.
|
||||
jpaTm().transact(() -> jpaTm().put(registry));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void readWithRegistryJpaIO() {
|
||||
try (DatabaseSnapshot databaseSnapshot = DatabaseSnapshot.createSnapshot()) {
|
||||
Registry updated =
|
||||
registry
|
||||
.asBuilder()
|
||||
.setCreateBillingCost(registry.getStandardCreateCost().plus(1))
|
||||
.build();
|
||||
jpaTm().transact(() -> jpaTm().put(updated));
|
||||
|
||||
Read<Registry, Registry> read =
|
||||
RegistryJpaIO.read(() -> CriteriaQueryBuilder.create(Registry.class).build(), x -> x)
|
||||
.withSnapshot(databaseSnapshot.getSnapshotId());
|
||||
PCollection<Registry> registries = testPipeline.apply(read);
|
||||
|
||||
// This assertion depends on Registry being Serializable, which may change if the
|
||||
// UnsafeSerializable interface is removed after migration.
|
||||
PAssert.that(registries).containsInAnyOrder(registry);
|
||||
testPipeline.run();
|
||||
} finally {
|
||||
// Revert change to registry in DB, which is shared by all test methods.
|
||||
jpaTm().transact(() -> jpaTm().put(registry));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -106,7 +106,7 @@ class InitSqlPipelineTest {
|
|||
@RegisterExtension
|
||||
@Order(Order.DEFAULT - 1)
|
||||
final transient DatastoreEntityExtension datastore =
|
||||
new DatastoreEntityExtension("test").allThreads(true);
|
||||
new DatastoreEntityExtension().allThreads(true);
|
||||
|
||||
@RegisterExtension final transient InjectExtension injectExtension = new InjectExtension();
|
||||
|
||||
|
|
|
@ -43,10 +43,6 @@ public class DatastoreEntityExtension implements BeforeEachCallback, AfterEachCa
|
|||
|
||||
private boolean allThreads = false;
|
||||
|
||||
public DatastoreEntityExtension(String appId) {
|
||||
environment = new AppEngineEnvironment(appId);
|
||||
}
|
||||
|
||||
public DatastoreEntityExtension() {
|
||||
environment = new AppEngineEnvironment();
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue