Add a BEAM read connector for JPA entities (#1132)

* Add a BEAM read connector for JPA entities

Added a Read connector to load JPA entities from Cloud SQL.

Also attempted a fix to the null threadfactory problem.
This commit is contained in:
Weimin Yu 2021-05-05 15:45:03 -04:00 committed by GitHub
parent 3f6ec8f1b0
commit b38574a9fc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 260 additions and 6 deletions

View file

@ -23,11 +23,18 @@ import com.google.common.collect.Streams;
import google.registry.backup.AppEngineEnvironment;
import google.registry.model.ofy.ObjectifyService;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.persistence.transaction.QueryComposer;
import google.registry.persistence.transaction.TransactionManagerFactory;
import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import javax.persistence.criteria.CriteriaQuery;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Deduplicate;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.PTransform;
@ -36,6 +43,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
/**
@ -51,10 +59,143 @@ public final class RegistryJpaIO {
private RegistryJpaIO() {}
public static <R> Read<R, R> read(QueryComposerFactory<R> queryFactory) {
return Read.<R, R>builder().queryFactory(queryFactory).build();
}
public static <R, T> Read<R, T> read(
QueryComposerFactory<R> queryFactory, SerializableFunction<R, T> resultMapper) {
return Read.<R, T>builder().queryFactory(queryFactory).resultMapper(resultMapper).build();
}
public static <T> Write<T> write() {
return Write.<T>builder().build();
}
// TODO(mmuller): Consider detached JpaQueryComposer that works with any JpaTransactionManager
// instance, i.e., change composer.buildQuery() to composer.buildQuery(JpaTransactionManager).
// This way QueryComposer becomes reusable and serializable (at least with Hibernate), and this
// interface would no longer be necessary.
public interface QueryComposerFactory<T>
extends SerializableFunction<JpaTransactionManager, QueryComposer<T>> {}
/**
* A {@link PTransform transform} that executes a JPA {@link CriteriaQuery} and adds the results
* to the BEAM pipeline. Users have the option to transform the results before sending them to the
* next stages.
*
* <p>The BEAM pipeline may execute this transform multiple times due to transient failures,
* loading duplicate results into the pipeline. Before we add dedepuplication support, the easiest
* workaround is to map results to {@link KV} pairs, and apply the {@link Deduplicate} transform
* to the output of this transform:
*
* <pre>{@code
* PCollection<String> contactIds =
* pipeline
* .apply(RegistryJpaIO.read(
* (JpaTransactionManager tm) -> tm.createQueryComposer...,
* contact -> KV.of(contact.getRepoId(), contact.getContactId()))
* .withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())))
* .apply(Deduplicate.keyedValues())
* .apply(Values.create());
* }</pre>
*/
@AutoValue
public abstract static class Read<R, T> extends PTransform<PBegin, PCollection<T>> {
public static final String DEFAULT_NAME = "RegistryJpaIO.Read";
abstract String name();
abstract RegistryJpaIO.QueryComposerFactory<R> queryFactory();
abstract SerializableFunction<R, T> resultMapper();
abstract TransactionMode transactionMode();
abstract Coder<T> coder();
abstract Builder<R, T> toBuilder();
@Override
public PCollection<T> expand(PBegin input) {
return input
.apply("Starting " + name(), Create.of((Void) null))
.apply(
"Run query for " + name(),
ParDo.of(new QueryRunner<>(queryFactory(), resultMapper())))
.setCoder(coder());
}
public Read<R, T> withName(String name) {
return toBuilder().name(name).build();
}
public Read<R, T> withResultMapper(SerializableFunction<R, T> mapper) {
return toBuilder().resultMapper(mapper).build();
}
public Read<R, T> withTransactionMode(TransactionMode transactionMode) {
return toBuilder().transactionMode(transactionMode).build();
}
public Read<R, T> withCoder(Coder<T> coder) {
return toBuilder().coder(coder).build();
}
static <R, T> Builder<R, T> builder() {
return new AutoValue_RegistryJpaIO_Read.Builder()
.name(DEFAULT_NAME)
.resultMapper(x -> x)
.transactionMode(TransactionMode.TRANSACTIONAL)
.coder(SerializableCoder.of(Serializable.class));
}
@AutoValue.Builder
public abstract static class Builder<R, T> {
abstract Builder<R, T> name(String name);
abstract Builder<R, T> queryFactory(RegistryJpaIO.QueryComposerFactory<R> queryFactory);
abstract Builder<R, T> resultMapper(SerializableFunction<R, T> mapper);
abstract Builder<R, T> transactionMode(TransactionMode transactionMode);
abstract Builder<R, T> coder(Coder coder);
abstract Read<R, T> build();
}
static class QueryRunner<R, T> extends DoFn<Void, T> {
private final QueryComposerFactory<R> querySupplier;
private final SerializableFunction<R, T> resultMapper;
QueryRunner(QueryComposerFactory<R> querySupplier, SerializableFunction<R, T> resultMapper) {
this.querySupplier = querySupplier;
this.resultMapper = resultMapper;
}
@ProcessElement
public void processElement(OutputReceiver<T> outputReceiver) {
// TODO(b/187210388): JpaTransactionManager should support non-transactional query.
// TODO(weiminyu): add deduplication
jpaTm()
.transactNoRetry(
() ->
querySupplier.apply(jpaTm()).stream()
.map(resultMapper::apply)
.forEach(outputReceiver::output));
// TODO(weiminyu): improve performance by reshuffle.
}
}
public enum TransactionMode {
NOT_TRANSACTIONAL,
TRANSACTIONAL;
}
}
/**
* A {@link PTransform transform} that writes a PCollection of entities to the SQL database using
* the {@link JpaTransactionManager}.

View file

@ -0,0 +1,119 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.common;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import com.google.common.collect.ImmutableList;
import google.registry.beam.TestPipelineExtension;
import google.registry.beam.common.RegistryJpaIO.Read;
import google.registry.model.contact.ContactBase;
import google.registry.model.contact.ContactResource;
import google.registry.model.registrar.Registrar;
import google.registry.persistence.transaction.JpaTestRules;
import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.testing.AppEngineExtension;
import google.registry.testing.DatabaseHelper;
import google.registry.testing.DatastoreEntityExtension;
import google.registry.testing.FakeClock;
import google.registry.testing.SystemPropertyExtension;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Deduplicate;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
/** Unit tests for {@link RegistryJpaIO.Read}. */
public class RegistryJpaReadTest {
private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
private final FakeClock fakeClock = new FakeClock(START_TIME);
@RegisterExtension
@Order(Order.DEFAULT - 1)
final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension();
// The pipeline runner on Kokoro sometimes mistakes the platform as appengine, resulting in
// a null thread factory. The cause is unknown but it may be due to the interaction with
// the DatastoreEntityExtension above. To work around the problem, we explicitly unset the
// relevant property before test starts.
@RegisterExtension
final transient SystemPropertyExtension systemPropertyExtension =
new SystemPropertyExtension().setProperty("com.google.appengine.runtime.environment", null);
@RegisterExtension
final transient JpaIntegrationTestExtension database =
new JpaTestRules.Builder().withClock(fakeClock).buildIntegrationTestRule();
@RegisterExtension
final transient TestPipelineExtension testPipeline =
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
private transient ImmutableList<ContactResource> contacts;
@BeforeEach
void beforeEach() {
Registrar ofyRegistrar = AppEngineExtension.makeRegistrar2();
jpaTm().transact(() -> jpaTm().put(ofyRegistrar));
ImmutableList.Builder<ContactResource> builder = new ImmutableList.Builder<>();
for (int i = 0; i < 3; i++) {
ContactResource contact = DatabaseHelper.newContactResource("contact_" + i);
builder.add(contact);
}
contacts = builder.build();
jpaTm().transact(() -> jpaTm().putAll(contacts));
}
@Test
void nonTransactionalQuery_noDedupe() {
Read<ContactResource, String> read =
RegistryJpaIO.read(
(JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class),
ContactBase::getContactId);
PCollection<String> repoIds = testPipeline.apply(read);
PAssert.that(repoIds).containsInAnyOrder("contact_0", "contact_1", "contact_2");
testPipeline.run();
}
@Test
void nonTransactionalQuery_dedupe() {
// This method only serves as an example of deduplication. Duplicates are not actually added.
Read<ContactResource, KV<String, String>> read =
RegistryJpaIO.read(
(JpaTransactionManager jpaTm) -> jpaTm.createQueryComposer(ContactResource.class),
contact -> KV.of(contact.getRepoId(), contact.getContactId()))
.withCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
PCollection<String> repoIds =
testPipeline
.apply(read)
.apply("Deduplicate", Deduplicate.keyedValues())
.apply("Get values", Values.create());
PAssert.that(repoIds).containsInAnyOrder("contact_0", "contact_1", "contact_2");
testPipeline.run();
}
}

View file

@ -37,7 +37,6 @@ import google.registry.testing.DatastoreEntityExtension;
import google.registry.testing.FakeClock;
import google.registry.testing.InjectExtension;
import java.io.Serializable;
import java.nio.file.Path;
import java.util.stream.Collectors;
import org.apache.beam.sdk.transforms.Create;
import org.joda.time.DateTime;
@ -45,7 +44,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
/** Unit test for {@link RegistryJpaIO.Write}. */
class RegistryJpaWriteTest implements Serializable {
@ -64,10 +62,6 @@ class RegistryJpaWriteTest implements Serializable {
final transient JpaIntegrationTestExtension database =
new JpaTestRules.Builder().withClock(fakeClock).buildIntegrationTestRule();
@SuppressWarnings("WeakerAccess")
@TempDir
transient Path tmpDir;
@RegisterExtension
final transient TestPipelineExtension testPipeline =
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);