Check for entity nonexistence in SqlBatchWriter (#1824)

Passing in an already-existing instance is an antipattern because it can
lead to race conditions where something else modified the object in
between when the pipeline loaded it and when you're saving it. The Write
action should only be writing new entities.

We cannot check IDs for the objects (some IDs are not autogenerated so
they might exist already). We also cannot call `insert` on the objects
because the underlying JPA `persist` call adds the input object to the
persistence context, meaning that any modifications (e.g.
updateTimestamp) are reflected in the input object. Beam doesn't allow
modification of input objects.
This commit is contained in:
gbrodman 2022-10-27 14:46:26 -04:00 committed by GitHub
parent ca60ca159f
commit 9c6c210e21
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 55 additions and 3 deletions

View file

@ -14,6 +14,7 @@
package google.registry.beam.common;
import static com.google.common.base.Preconditions.checkArgument;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static org.apache.beam.sdk.values.TypeDescriptors.integers;
@ -405,7 +406,13 @@ public final class RegistryJpaIO {
.filter(Objects::nonNull)
.collect(ImmutableList.toImmutableList());
try {
jpaTm().transact(() -> jpaTm().putAll(entities));
jpaTm()
.transact(
() -> {
// Don't modify existing objects as it could lead to race conditions
entities.forEach(this::verifyObjectNonexistence);
jpaTm().putAll(entities);
});
counter.inc(entities.size());
} catch (RuntimeException e) {
processSingly(entities);
@ -419,7 +426,13 @@ public final class RegistryJpaIO {
private void processSingly(ImmutableList<Object> entities) {
for (Object entity : entities) {
try {
jpaTm().transact(() -> jpaTm().put(entity));
jpaTm()
.transact(
() -> {
// Don't modify existing objects as it could lead to race conditions
verifyObjectNonexistence(entity);
jpaTm().put(entity);
});
counter.inc();
} catch (RuntimeException e) {
throw new RuntimeException(toEntityKeyString(entity), e);
@ -445,5 +458,16 @@ public final class RegistryJpaIO {
return "Non-SqlEntity: " + entity;
}
}
/** SqlBatchWriter should not re-write existing entities due to potential race conditions. */
private void verifyObjectNonexistence(Object obj) {
// We cannot rely on calling "insert" on the objects because the underlying JPA persist call
// adds the input object to the persistence context, meaning that any modifications (e.g.
// updateTimestamp) are reflected in the input object. Beam doesn't allow modification of
// input objects, so this throws an exception.
// TODO(go/non-datastore-allocateid): also check that all the objects have IDs
checkArgument(
!jpaTm().exists(obj), "Entities created in SqlBatchWriter must not already exist");
}
}
}

View file

@ -17,9 +17,12 @@ package google.registry.beam.common;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.testing.DatabaseHelper.loadAllOf;
import static google.registry.testing.DatabaseHelper.newContact;
import static org.junit.jupiter.api.Assertions.assertThrows;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import google.registry.beam.TestPipelineExtension;
import google.registry.model.contact.Contact;
import google.registry.persistence.transaction.JpaTestExtensions;
@ -28,6 +31,7 @@ import google.registry.testing.AppEngineExtension;
import google.registry.testing.DatastoreEntityExtension;
import google.registry.testing.FakeClock;
import java.io.Serializable;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.transforms.Create;
import org.joda.time.DateTime;
import org.junit.jupiter.api.Order;
@ -65,8 +69,32 @@ class RegistryJpaWriteTest implements Serializable {
.apply(RegistryJpaIO.<Contact>write().withName("Contact").withBatchSize(4).withShards(2));
testPipeline.run().waitUntilFinish();
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(Contact.class)))
assertThat(loadAllOf(Contact.class))
.comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp"))
.containsExactlyElementsIn(contacts);
}
@Test
void testFailure_writeExistingEntity() {
// RegistryJpaIO.Write actions should not write existing objects to the database because the
// object could have been mutated in between creation and when the Write actually occurs,
// causing a race condition
jpaTm()
.transact(
() -> {
jpaTm().put(AppEngineExtension.makeRegistrar2());
jpaTm().put(newContact("contact"));
});
Contact contact = Iterables.getOnlyElement(loadAllOf(Contact.class));
testPipeline
.apply(Create.of(contact))
.apply(RegistryJpaIO.<Contact>write().withName("Contact"));
// PipelineExecutionException caused by a RuntimeException caused by an IllegalArgumentException
assertThat(
assertThrows(
PipelineExecutionException.class, () -> testPipeline.run().waitUntilFinish()))
.hasCauseThat()
.hasCauseThat()
.isInstanceOf(IllegalArgumentException.class);
}
}