From b38574a9fcf2980dfa02d5548d45851752ce24ca Mon Sep 17 00:00:00 2001 From: Weimin Yu Date: Wed, 5 May 2021 15:45:03 -0400 Subject: [PATCH] Add a BEAM read connector for JPA entities (#1132) * Add a BEAM read connector for JPA entities Added a Read connector to load JPA entities from Cloud SQL. Also attempted a fix to the null threadfactory problem. --- .../registry/beam/common/RegistryJpaIO.java | 141 ++++++++++++++++++ .../beam/common/RegistryJpaReadTest.java | 119 +++++++++++++++ .../beam/common/RegistryJpaWriteTest.java | 6 - 3 files changed, 260 insertions(+), 6 deletions(-) create mode 100644 core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java index dbeb14843..2f169c0ef 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -23,11 +23,18 @@ import com.google.common.collect.Streams; import google.registry.backup.AppEngineEnvironment; import google.registry.model.ofy.ObjectifyService; import google.registry.persistence.transaction.JpaTransactionManager; +import google.registry.persistence.transaction.QueryComposer; import google.registry.persistence.transaction.TransactionManagerFactory; +import java.io.Serializable; import java.util.Objects; import java.util.concurrent.ThreadLocalRandom; +import javax.persistence.criteria.CriteriaQuery; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Deduplicate; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.PTransform; @@ -36,6 +43,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; /** @@ -51,10 +59,143 @@ public final class RegistryJpaIO { private RegistryJpaIO() {} + public static Read read(QueryComposerFactory queryFactory) { + return Read.builder().queryFactory(queryFactory).build(); + } + + public static Read read( + QueryComposerFactory queryFactory, SerializableFunction resultMapper) { + return Read.builder().queryFactory(queryFactory).resultMapper(resultMapper).build(); + } + public static Write write() { return Write.builder().build(); } + // TODO(mmuller): Consider detached JpaQueryComposer that works with any JpaTransactionManager + // instance, i.e., change composer.buildQuery() to composer.buildQuery(JpaTransactionManager). + // This way QueryComposer becomes reusable and serializable (at least with Hibernate), and this + // interface would no longer be necessary. + public interface QueryComposerFactory + extends SerializableFunction> {} + + /** + * A {@link PTransform transform} that executes a JPA {@link CriteriaQuery} and adds the results + * to the BEAM pipeline. Users have the option to transform the results before sending them to the + * next stages. + * + *

The BEAM pipeline may execute this transform multiple times due to transient failures, + * loading duplicate results into the pipeline. Before we add dedepuplication support, the easiest + * workaround is to map results to {@link KV} pairs, and apply the {@link Deduplicate} transform + * to the output of this transform: + * + *

{@code
+   * PCollection contactIds =
+   *     pipeline
+   *         .apply(RegistryJpaIO.read(
+   *             (JpaTransactionManager tm) -> tm.createQueryComposer...,
+   *             contact -> KV.of(contact.getRepoId(), contact.getContactId()))
+   *         .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))
+   *         .apply(Deduplicate.keyedValues())
+   *         .apply(Values.create());
+   * }
+ */ + @AutoValue + public abstract static class Read extends PTransform> { + + public static final String DEFAULT_NAME = "RegistryJpaIO.Read"; + + abstract String name(); + + abstract RegistryJpaIO.QueryComposerFactory queryFactory(); + + abstract SerializableFunction resultMapper(); + + abstract TransactionMode transactionMode(); + + abstract Coder coder(); + + abstract Builder toBuilder(); + + @Override + public PCollection expand(PBegin input) { + return input + .apply("Starting " + name(), Create.of((Void) null)) + .apply( + "Run query for " + name(), + ParDo.of(new QueryRunner<>(queryFactory(), resultMapper()))) + .setCoder(coder()); + } + + public Read withName(String name) { + return toBuilder().name(name).build(); + } + + public Read withResultMapper(SerializableFunction mapper) { + return toBuilder().resultMapper(mapper).build(); + } + + public Read withTransactionMode(TransactionMode transactionMode) { + return toBuilder().transactionMode(transactionMode).build(); + } + + public Read withCoder(Coder coder) { + return toBuilder().coder(coder).build(); + } + + static Builder builder() { + return new AutoValue_RegistryJpaIO_Read.Builder() + .name(DEFAULT_NAME) + .resultMapper(x -> x) + .transactionMode(TransactionMode.TRANSACTIONAL) + .coder(SerializableCoder.of(Serializable.class)); + } + + @AutoValue.Builder + public abstract static class Builder { + + abstract Builder name(String name); + + abstract Builder queryFactory(RegistryJpaIO.QueryComposerFactory queryFactory); + + abstract Builder resultMapper(SerializableFunction mapper); + + abstract Builder transactionMode(TransactionMode transactionMode); + + abstract Builder coder(Coder coder); + + abstract Read build(); + } + + static class QueryRunner extends DoFn { + private final QueryComposerFactory querySupplier; + private final SerializableFunction resultMapper; + + QueryRunner(QueryComposerFactory querySupplier, SerializableFunction resultMapper) { + this.querySupplier = querySupplier; + this.resultMapper = resultMapper; + } + + @ProcessElement + public void processElement(OutputReceiver outputReceiver) { + // TODO(b/187210388): JpaTransactionManager should support non-transactional query. + // TODO(weiminyu): add deduplication + jpaTm() + .transactNoRetry( + () -> + querySupplier.apply(jpaTm()).stream() + .map(resultMapper::apply) + .forEach(outputReceiver::output)); + // TODO(weiminyu): improve performance by reshuffle. + } + } + + public enum TransactionMode { + NOT_TRANSACTIONAL, + TRANSACTIONAL; + } + } + /** * A {@link PTransform transform} that writes a PCollection of entities to the SQL database using * the {@link JpaTransactionManager}. diff --git a/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java b/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java new file mode 100644 index 000000000..fa153209b --- /dev/null +++ b/core/src/test/java/google/registry/beam/common/RegistryJpaReadTest.java @@ -0,0 +1,119 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.common; + +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; + +import com.google.common.collect.ImmutableList; +import google.registry.beam.TestPipelineExtension; +import google.registry.beam.common.RegistryJpaIO.Read; +import google.registry.model.contact.ContactBase; +import google.registry.model.contact.ContactResource; +import google.registry.model.registrar.Registrar; +import google.registry.persistence.transaction.JpaTestRules; +import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension; +import google.registry.persistence.transaction.JpaTransactionManager; +import google.registry.testing.AppEngineExtension; +import google.registry.testing.DatabaseHelper; +import google.registry.testing.DatastoreEntityExtension; +import google.registry.testing.FakeClock; +import google.registry.testing.SystemPropertyExtension; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Deduplicate; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.DateTime; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** Unit tests for {@link RegistryJpaIO.Read}. */ +public class RegistryJpaReadTest { + + private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z"); + + private final FakeClock fakeClock = new FakeClock(START_TIME); + + @RegisterExtension + @Order(Order.DEFAULT - 1) + final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension(); + + // The pipeline runner on Kokoro sometimes mistakes the platform as appengine, resulting in + // a null thread factory. The cause is unknown but it may be due to the interaction with + // the DatastoreEntityExtension above. To work around the problem, we explicitly unset the + // relevant property before test starts. + @RegisterExtension + final transient SystemPropertyExtension systemPropertyExtension = + new SystemPropertyExtension().setProperty("com.google.appengine.runtime.environment", null); + + @RegisterExtension + final transient JpaIntegrationTestExtension database = + new JpaTestRules.Builder().withClock(fakeClock).buildIntegrationTestRule(); + + @RegisterExtension + final transient TestPipelineExtension testPipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); + + private transient ImmutableList contacts; + + @BeforeEach + void beforeEach() { + Registrar ofyRegistrar = AppEngineExtension.makeRegistrar2(); + jpaTm().transact(() -> jpaTm().put(ofyRegistrar)); + + ImmutableList.Builder builder = new ImmutableList.Builder<>(); + + for (int i = 0; i < 3; i++) { + ContactResource contact = DatabaseHelper.newContactResource("contact_" + i); + builder.add(contact); + } + contacts = builder.build(); + jpaTm().transact(() -> jpaTm().putAll(contacts)); + } + + @Test + void nonTransactionalQuery_noDedupe() { + Read read = + RegistryJpaIO.read( + (JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class), + ContactBase::getContactId); + PCollection repoIds = testPipeline.apply(read); + + PAssert.that(repoIds).containsInAnyOrder("contact_0", "contact_1", "contact_2"); + testPipeline.run(); + } + + @Test + void nonTransactionalQuery_dedupe() { + // This method only serves as an example of deduplication. Duplicates are not actually added. + Read> read = + RegistryJpaIO.read( + (JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class), + contact -> KV.of(contact.getRepoId(), contact.getContactId())) + .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + PCollection repoIds = + testPipeline + .apply(read) + .apply("Deduplicate", Deduplicate.keyedValues()) + .apply("Get values", Values.create()); + + PAssert.that(repoIds).containsInAnyOrder("contact_0", "contact_1", "contact_2"); + testPipeline.run(); + } +} diff --git a/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java b/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java index c14cdc343..ce64f468b 100644 --- a/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java +++ b/core/src/test/java/google/registry/beam/common/RegistryJpaWriteTest.java @@ -37,7 +37,6 @@ import google.registry.testing.DatastoreEntityExtension; import google.registry.testing.FakeClock; import google.registry.testing.InjectExtension; import java.io.Serializable; -import java.nio.file.Path; import java.util.stream.Collectors; import org.apache.beam.sdk.transforms.Create; import org.joda.time.DateTime; @@ -45,7 +44,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.junit.jupiter.api.io.TempDir; /** Unit test for {@link RegistryJpaIO.Write}. */ class RegistryJpaWriteTest implements Serializable { @@ -64,10 +62,6 @@ class RegistryJpaWriteTest implements Serializable { final transient JpaIntegrationTestExtension database = new JpaTestRules.Builder().withClock(fakeClock).buildIntegrationTestRule(); - @SuppressWarnings("WeakerAccess") - @TempDir - transient Path tmpDir; - @RegisterExtension final transient TestPipelineExtension testPipeline = TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);