diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java index 03b3501a0..bde73fb89 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -14,6 +14,7 @@ package google.registry.beam.common; +import static com.google.common.base.Preconditions.checkArgument; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static org.apache.beam.sdk.values.TypeDescriptors.integers; @@ -405,7 +406,13 @@ public final class RegistryJpaIO { .filter(Objects::nonNull) .collect(ImmutableList.toImmutableList()); try { - jpaTm().transact(() -> jpaTm().putAll(entities)); + jpaTm() + .transact( + () -> { + // Don't modify existing objects as it could lead to race conditions + entities.forEach(this::verifyObjectNonexistence); + jpaTm().putAll(entities); + }); counter.inc(entities.size()); } catch (RuntimeException e) { processSingly(entities); @@ -419,7 +426,13 @@ public final class RegistryJpaIO { private void processSingly(ImmutableList entities) { for (Object entity : entities) { try { - jpaTm().transact(() -> jpaTm().put(entity)); + jpaTm() + .transact( + () -> { + // Don't modify existing objects as it could lead to race conditions + verifyObjectNonexistence(entity); + jpaTm().put(entity); + }); counter.inc(); } catch (RuntimeException e) { throw new RuntimeException(toEntityKeyString(entity), e); @@ -445,5 +458,16 @@ public final class RegistryJpaIO { return "Non-SqlEntity: " + entity; } } + + /** SqlBatchWriter should not re-write existing entities due to potential race conditions. */ + private void verifyObjectNonexistence(Object obj) { + // We cannot rely on calling "insert" on the objects because the underlying JPA persist call + // adds the input object to the persistence context, meaning that any modifications (e.g. + // updateTimestamp) are reflected in the input object. Beam doesn't allow modification of + // input objects, so this throws an exception. + // TODO(go/non-datastore-allocateid): also check that all the objects have IDs + checkArgument( + !jpaTm().exists(obj), "Entities created in SqlBatchWriter must not already exist"); + } } } diff --git a/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java b/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java index c483ab4ac..4e2ebd419 100644 --- a/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java +++ b/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java @@ -17,9 +17,12 @@ package google.registry.beam.common; import static com.google.common.truth.Truth.assertThat; import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static google.registry.testing.DatabaseHelper.loadAllOf; import static google.registry.testing.DatabaseHelper.newContact; +import static org.junit.jupiter.api.Assertions.assertThrows; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import google.registry.beam.TestPipelineExtension; import google.registry.model.contact.Contact; import google.registry.persistence.transaction.JpaTestExtensions; @@ -28,6 +31,7 @@ import google.registry.testing.AppEngineExtension; import google.registry.testing.DatastoreEntityExtension; import google.registry.testing.FakeClock; import java.io.Serializable; +import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.transforms.Create; import org.joda.time.DateTime; import org.junit.jupiter.api.Order; @@ -65,8 +69,32 @@ class RegistryJpaWriteTest implements Serializable { .apply(RegistryJpaIO.write().withName("Contact").withBatchSize(4).withShards(2)); testPipeline.run().waitUntilFinish(); - assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Contact.class))) + assertThat(loadAllOf(Contact.class)) .comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp")) .containsExactlyElementsIn(contacts); } + + @Test + void testFailure_writeExistingEntity() { + // RegistryJpaIO.Write actions should not write existing objects to the database because the + // object could have been mutated in between creation and when the Write actually occurs, + // causing a race condition + jpaTm() + .transact( + () -> { + jpaTm().put(AppEngineExtension.makeRegistrar2()); + jpaTm().put(newContact("contact")); + }); + Contact contact = Iterables.getOnlyElement(loadAllOf(Contact.class)); + testPipeline + .apply(Create.of(contact)) + .apply(RegistryJpaIO.write().withName("Contact")); + // PipelineExecutionException caused by a RuntimeException caused by an IllegalArgumentException + assertThat( + assertThrows( + PipelineExecutionException.class, () -> testPipeline.run().waitUntilFinish())) + .hasCauseThat() + .hasCauseThat() + .isInstanceOf(IllegalArgumentException.class); + } }