diff --git a/config/presubmits.py b/config/presubmits.py
index 128f6feaa..829c93b43 100644
--- a/config/presubmits.py
+++ b/config/presubmits.py
@@ -109,13 +109,6 @@ PRESUBMITS = {
"System.(out|err).println is only allowed in tools/ packages. Please "
"use a logger instead.",
- # PostgreSQLContainer instantiation must specify docker tag
- # TODO(b/204572437): Fix the pattern to pass DatabaseSnapshotTest.java
- PresubmitCheck(
- r"[\s\S]*new\s+PostgreSQLContainer(<[\s\S]*>)?\(\s*\)[\s\S]*",
- "java", {"DatabaseSnapshotTest.java"}):
- "PostgreSQLContainer instantiation must specify docker tag.",
-
# Various Soy linting checks
PresubmitCheck(
r".* (/\*)?\* {?@param ",
diff --git a/core/src/main/java/google/registry/beam/common/DatabaseSnapshot.java b/core/src/main/java/google/registry/beam/common/DatabaseSnapshot.java
deleted file mode 100644
index 7f385cc2f..000000000
--- a/core/src/main/java/google/registry/beam/common/DatabaseSnapshot.java
+++ /dev/null
@@ -1,88 +0,0 @@
-// Copyright 2021 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam.common;
-
-import static com.google.common.base.Preconditions.checkState;
-import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
-
-import com.google.common.flogger.FluentLogger;
-import java.util.List;
-import javax.persistence.EntityManager;
-import javax.persistence.EntityTransaction;
-
-/**
- * A database snapshot shareable by concurrent queries from multiple database clients. A snapshot is
- * uniquely identified by its {@link #getSnapshotId snapshotId}, and must stay open until all
- * concurrent queries to this snapshot have attached to it by calling {@link
- * google.registry.persistence.transaction.JpaTransactionManager#setDatabaseSnapshot}. However, it
- * can be closed before those queries complete.
- *
- *
This feature is Postgresql-only.
- *
- *
To support large queries, transaction isolation level is fixed at the REPEATABLE_READ to avoid
- * exhausting predicate locks at the SERIALIZABLE level.
- */
-// TODO(b/193662898): vendor-independent support for richer transaction semantics.
-public class DatabaseSnapshot implements AutoCloseable {
-
- private static final FluentLogger logger = FluentLogger.forEnclosingClass();
-
- private String snapshotId;
- private EntityManager entityManager;
- private EntityTransaction transaction;
-
- private DatabaseSnapshot() {}
-
- public String getSnapshotId() {
- checkState(entityManager != null, "Snapshot not opened yet.");
- checkState(entityManager.isOpen(), "Snapshot already closed.");
- return snapshotId;
- }
-
- private DatabaseSnapshot open() {
- entityManager = tm().getStandaloneEntityManager();
- transaction = entityManager.getTransaction();
- transaction.setRollbackOnly();
- transaction.begin();
-
- entityManager
- .createNativeQuery("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
- .executeUpdate();
-
- List> snapshotIds =
- entityManager.createNativeQuery("SELECT pg_export_snapshot();").getResultList();
- checkState(snapshotIds.size() == 1, "Unexpected number of snapshots: %s", snapshotIds.size());
- snapshotId = (String) snapshotIds.get(0);
- return this;
- }
-
- @Override
- public void close() {
- if (transaction != null && transaction.isActive()) {
- try {
- transaction.rollback();
- } catch (Exception e) {
- logger.atWarning().withCause(e).log("Failed to close a Database Snapshot");
- }
- }
- if (entityManager != null && entityManager.isOpen()) {
- entityManager.close();
- }
- }
-
- public static DatabaseSnapshot createSnapshot() {
- return new DatabaseSnapshot().open();
- }
-}
diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java
index 39fd70f1c..599ae866a 100644
--- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java
+++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java
@@ -122,6 +122,7 @@ public final class RegistryJpaIO {
@AutoValue
public abstract static class Read extends PTransform> {
+ private static final long serialVersionUID = 6906842877429561700L;
public static final String DEFAULT_NAME = "RegistryJpaIO.Read";
abstract String name();
@@ -133,9 +134,6 @@ public final class RegistryJpaIO {
@Nullable
abstract Coder coder();
- @Nullable
- abstract String snapshotId();
-
abstract Builder toBuilder();
@Override
@@ -145,8 +143,7 @@ public final class RegistryJpaIO {
input
.apply("Starting " + name(), Create.of((Void) null))
.apply(
- "Run query for " + name(),
- ParDo.of(new QueryRunner<>(query(), resultMapper(), snapshotId())));
+ "Run query for " + name(), ParDo.of(new QueryRunner<>(query(), resultMapper())));
if (coder() != null) {
output = output.setCoder(coder());
}
@@ -165,18 +162,6 @@ public final class RegistryJpaIO {
return toBuilder().coder(coder).build();
}
- /**
- * Specifies the database snapshot to use for this query.
- *
- *
This feature is Postgresql-only. User is responsible for keeping the snapshot
- * available until all JVM workers have started using it by calling {@link
- * JpaTransactionManager#setDatabaseSnapshot}.
- */
- // TODO(b/193662898): vendor-independent support for richer transaction semantics.
- public Read withSnapshot(@Nullable String snapshotId) {
- return toBuilder().snapshotId(snapshotId).build();
- }
-
static Builder builder() {
return new AutoValue_RegistryJpaIO_Read.Builder().name(DEFAULT_NAME);
}
@@ -192,8 +177,6 @@ public final class RegistryJpaIO {
abstract Builder coder(Coder coder);
- abstract Builder snapshotId(@Nullable String sharedSnapshotId);
-
abstract Read build();
Builder criteriaQuery(CriteriaQuerySupplier criteriaQuery) {
@@ -214,27 +197,20 @@ public final class RegistryJpaIO {
}
static class QueryRunner extends DoFn {
+
+ private static final long serialVersionUID = 7293891513058653334L;
private final RegistryQuery query;
private final SerializableFunction resultMapper;
- // java.util.Optional is not serializable. Use of Guava Optional is discouraged.
- @Nullable private final String snapshotId;
- QueryRunner(
- RegistryQuery query,
- SerializableFunction resultMapper,
- @Nullable String snapshotId) {
+ QueryRunner(RegistryQuery query, SerializableFunction resultMapper) {
this.query = query;
this.resultMapper = resultMapper;
- this.snapshotId = snapshotId;
}
@ProcessElement
public void processElement(OutputReceiver outputReceiver) {
tm().transactNoRetry(
() -> {
- if (snapshotId != null) {
- tm().setDatabaseSnapshot(snapshotId);
- }
query.stream().map(resultMapper::apply).forEach(outputReceiver::output);
});
}
@@ -256,8 +232,8 @@ public final class RegistryJpaIO {
@AutoValue
public abstract static class Write extends PTransform, PCollection> {
+ private static final long serialVersionUID = -4023583243078410323L;
public static final String DEFAULT_NAME = "RegistryJpaIO.Write";
-
public static final int DEFAULT_BATCH_SIZE = 1;
public abstract String name();
@@ -321,6 +297,8 @@ public final class RegistryJpaIO {
/** Writes a batch of entities to a SQL database through a {@link JpaTransactionManager}. */
private static class SqlBatchWriter extends DoFn, Iterable>, Void> {
+
+ private static final long serialVersionUID = -7519944406319472690L;
private final Counter counter;
private final SerializableFunction jpaConverter;
@@ -337,7 +315,7 @@ public final class RegistryJpaIO {
private void actuallyProcessElement(@Element KV, Iterable> kv) {
ImmutableList