mirror of
https://github.com/google/nomulus.git
synced 2025-07-24 11:38:35 +02:00
Rewrite the JPA output connector for BEAM (#995)
* Rewrite the JPA output connector for BEAM Following BEAM's IO connector style, added a RegistryJpaIO class to hold IO connectors, and implemented the Write connector as a static inner class in it. The JpaTransactionManager used by the Write connector retrieves SQL credentials from the SecretManager. Cleaned up SQL-related pipeline parameters. Converted the InitSqlPipeline to use RegistryJpaIO.
This commit is contained in:
parent
a52a8695e3
commit
c7c03874c0
15 changed files with 1563 additions and 1065 deletions
|
@ -0,0 +1,229 @@
|
|||
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.common;
|
||||
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
|
||||
import static org.apache.beam.sdk.values.TypeDescriptors.integers;
|
||||
|
||||
import com.google.auto.value.AutoValue;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Streams;
|
||||
import google.registry.backup.AppEngineEnvironment;
|
||||
import google.registry.model.ofy.ObjectifyService;
|
||||
import google.registry.persistence.transaction.JpaTransactionManager;
|
||||
import google.registry.persistence.transaction.TransactionManagerFactory;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import org.apache.beam.sdk.metrics.Counter;
|
||||
import org.apache.beam.sdk.metrics.Metrics;
|
||||
import org.apache.beam.sdk.transforms.DoFn;
|
||||
import org.apache.beam.sdk.transforms.GroupIntoBatches;
|
||||
import org.apache.beam.sdk.transforms.PTransform;
|
||||
import org.apache.beam.sdk.transforms.ParDo;
|
||||
import org.apache.beam.sdk.transforms.SerializableFunction;
|
||||
import org.apache.beam.sdk.transforms.WithKeys;
|
||||
import org.apache.beam.sdk.util.ShardedKey;
|
||||
import org.apache.beam.sdk.values.KV;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
|
||||
/**
|
||||
* Contains IO {@link PTransform transforms} for a BEAM pipeline that interacts with a single
|
||||
* database through a {@link JpaTransactionManager}.
|
||||
*
|
||||
* <p>The {@code JpaTransactionManager} is instantiated once on each pipeline worker VM (through
|
||||
* {@link RegistryPipelineWorkerInitializer}), made available through the static method {@link
|
||||
* TransactionManagerFactory#jpaTm()}, and is shared by all threads on the VM. Configuration is
|
||||
* through {@link RegistryPipelineOptions}.
|
||||
*/
|
||||
public final class RegistryJpaIO {
|
||||
|
||||
private RegistryJpaIO() {}
|
||||
|
||||
public static <T> Write<T> write() {
|
||||
return Write.<T>builder().build();
|
||||
}
|
||||
|
||||
/**
|
||||
* A {@link PTransform transform} that writes a PCollection of entities to the SQL database using
|
||||
* the {@link JpaTransactionManager}.
|
||||
*
|
||||
* <p>Unlike typical BEAM {@link Write} transforms, the output type of this transform is {@code
|
||||
* PCollection<Void>} instead of {@link org.apache.beam.sdk.values.PDone}. This deviation allows
|
||||
* the sequencing of multiple {@code PCollections}: we have use cases where one collection of data
|
||||
* must be completely written before another can start (due to foreign key constraints in the
|
||||
* latter).
|
||||
*
|
||||
* @param <T> type of the entities to be written
|
||||
*/
|
||||
@AutoValue
|
||||
public abstract static class Write<T> extends PTransform<PCollection<T>, PCollection<Void>> {
|
||||
|
||||
public static final String DEFAULT_NAME = "RegistryJpaIO.Write";
|
||||
|
||||
public static final int DEFAULT_BATCH_SIZE = 1;
|
||||
|
||||
/** The default number of write shard. Please refer to {@link #shards} for more information. */
|
||||
public static final int DEFAULT_SHARDS = 1;
|
||||
|
||||
public abstract String name();
|
||||
|
||||
/** Number of elements to be written in one call. */
|
||||
public abstract int batchSize();
|
||||
|
||||
/**
|
||||
* The number of shards the output should be split into.
|
||||
*
|
||||
* <p>This value is a hint to the pipeline runner on the level of parallelism, and should be
|
||||
* significantly greater than the number of threads working on this transformation (see next
|
||||
* paragraph for more information). On the other hand, it should not be too large to the point
|
||||
* that the number of elements per shard is lower than {@link #batchSize()}. As a rule of thumb,
|
||||
* the following constraint should hold: {@code shards * batchSize * nThreads <=
|
||||
* inputElementCount}. Although it is not always possible to determine the number of threads
|
||||
* working on this transform, when the pipeline run is IO-bound, it most likely is close to the
|
||||
* total number of threads in the pipeline, which is explained below.
|
||||
*
|
||||
* <p>With Cloud Dataflow runner, the total number of worker threads in a batch pipeline (which
|
||||
* includes all existing Registry pipelines) is the number of vCPUs used by the pipeline, and
|
||||
* can be set by the {@code --maxNumWorkers} and {@code --workerMachineType} parameters. The
|
||||
* number of worker threads in a streaming pipeline can be set by the {@code --maxNumWorkers}
|
||||
* and {@code --numberOfWorkerHarnessThreads} parameters.
|
||||
*
|
||||
* <p>Note that connections on the database server are a limited resource, therefore the number
|
||||
* of threads that interact with the database should be set to an appropriate limit. Again, we
|
||||
* cannot control this number, but can influence it by controlling the total number of threads.
|
||||
*/
|
||||
public abstract int shards();
|
||||
|
||||
public abstract SerializableFunction<T, Object> jpaConverter();
|
||||
|
||||
public Write<T> withName(String name) {
|
||||
return toBuilder().name(name).build();
|
||||
}
|
||||
|
||||
public Write<T> withBatchSize(int batchSize) {
|
||||
return toBuilder().batchSize(batchSize).build();
|
||||
}
|
||||
|
||||
public Write<T> withShards(int shards) {
|
||||
return toBuilder().shards(shards).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* An optional function that converts the input entities to a form that can be written into the
|
||||
* database.
|
||||
*/
|
||||
public Write<T> withJpaConverter(SerializableFunction<T, Object> jpaConverter) {
|
||||
return toBuilder().jpaConverter(jpaConverter).build();
|
||||
}
|
||||
|
||||
abstract Builder<T> toBuilder();
|
||||
|
||||
@Override
|
||||
public PCollection<Void> expand(PCollection<T> input) {
|
||||
return input
|
||||
.apply(
|
||||
"Shard data " + name(),
|
||||
WithKeys.<Integer, T>of(e -> ThreadLocalRandom.current().nextInt(shards()))
|
||||
.withKeyType(integers()))
|
||||
// The call to withShardedKey() is performance critical. The resulting transform ensures
|
||||
// that data is spread evenly across all worker threads.
|
||||
.apply(
|
||||
"Group into batches " + name(),
|
||||
GroupIntoBatches.<Integer, T>ofSize(batchSize()).withShardedKey())
|
||||
.apply(
|
||||
"Write in batch for " + name(),
|
||||
ParDo.of(new SqlBatchWriter<>(name(), jpaConverter())));
|
||||
}
|
||||
|
||||
static <T> Builder<T> builder() {
|
||||
return new AutoValue_RegistryJpaIO_Write.Builder<T>()
|
||||
.name(DEFAULT_NAME)
|
||||
.batchSize(DEFAULT_BATCH_SIZE)
|
||||
.shards(DEFAULT_SHARDS)
|
||||
.jpaConverter(x -> x);
|
||||
}
|
||||
|
||||
@AutoValue.Builder
|
||||
abstract static class Builder<T> {
|
||||
|
||||
abstract Builder<T> name(String name);
|
||||
|
||||
abstract Builder<T> batchSize(int batchSize);
|
||||
|
||||
abstract Builder<T> shards(int jdbcNumConnsHint);
|
||||
|
||||
abstract Builder<T> jpaConverter(SerializableFunction<T, Object> jpaConverter);
|
||||
|
||||
abstract Write<T> build();
|
||||
}
|
||||
}
|
||||
|
||||
/** Writes a batch of entities to a SQL database through a {@link JpaTransactionManager}. */
|
||||
private static class SqlBatchWriter<T> extends DoFn<KV<ShardedKey<Integer>, Iterable<T>>, Void> {
|
||||
private final Counter counter;
|
||||
private final SerializableFunction<T, Object> jpaConverter;
|
||||
|
||||
SqlBatchWriter(String type, SerializableFunction<T, Object> jpaConverter) {
|
||||
counter = Metrics.counter("SQL_WRITE", type);
|
||||
this.jpaConverter = jpaConverter;
|
||||
}
|
||||
|
||||
@Setup
|
||||
public void setup() {
|
||||
// Below is needed as long as Objectify keys are still involved in the handling of SQL
|
||||
// entities (e.g., in VKeys).
|
||||
try (AppEngineEnvironment env = new AppEngineEnvironment()) {
|
||||
ObjectifyService.initOfy();
|
||||
}
|
||||
}
|
||||
|
||||
@ProcessElement
|
||||
public void processElement(@Element KV<ShardedKey<Integer>, Iterable<T>> kv) {
|
||||
try (AppEngineEnvironment env = new AppEngineEnvironment()) {
|
||||
ImmutableList<Object> ofyEntities =
|
||||
Streams.stream(kv.getValue())
|
||||
.map(this.jpaConverter::apply)
|
||||
// TODO(b/177340730): post migration delete the line below.
|
||||
.filter(Objects::nonNull)
|
||||
.collect(ImmutableList.toImmutableList());
|
||||
try {
|
||||
jpaTm().transact(() -> jpaTm().putAll(ofyEntities));
|
||||
counter.inc(ofyEntities.size());
|
||||
} catch (RuntimeException e) {
|
||||
processSingly(ofyEntities);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes entities in a failed batch one by one to identify the first bad entity and throws a
|
||||
* {@link RuntimeException} on it.
|
||||
*/
|
||||
private void processSingly(ImmutableList<Object> ofyEntities) {
|
||||
for (Object ofyEntity : ofyEntities) {
|
||||
try {
|
||||
jpaTm().transact(() -> jpaTm().put(ofyEntity));
|
||||
counter.inc();
|
||||
} catch (RuntimeException e) {
|
||||
throw new RuntimeException(toOfyKey(ofyEntity).toString(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private com.googlecode.objectify.Key<?> toOfyKey(Object ofyEntity) {
|
||||
return com.googlecode.objectify.Key.create(ofyEntity);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
package google.registry.beam.common;
|
||||
|
||||
import dagger.BindsInstance;
|
||||
import dagger.Component;
|
||||
import dagger.Lazy;
|
||||
import google.registry.config.CredentialModule;
|
||||
|
@ -21,9 +22,11 @@ import google.registry.config.RegistryConfig.Config;
|
|||
import google.registry.config.RegistryConfig.ConfigModule;
|
||||
import google.registry.persistence.PersistenceModule;
|
||||
import google.registry.persistence.PersistenceModule.BeamJpaTm;
|
||||
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
|
||||
import google.registry.persistence.transaction.JpaTransactionManager;
|
||||
import google.registry.privileges.secretmanager.SecretManagerModule;
|
||||
import google.registry.util.UtilsModule;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
/** Component that provides everything needed on a Pipeline worker. */
|
||||
|
@ -44,4 +47,18 @@ public interface RegistryPipelineComponent {
|
|||
|
||||
@BeamJpaTm
|
||||
Lazy<JpaTransactionManager> getJpaTransactionManager();
|
||||
|
||||
@Component.Builder
|
||||
interface Builder {
|
||||
|
||||
/**
|
||||
* Optionally overrides the default transaction isolation level. This applies to ALL
|
||||
* transactions executed in the pipeline.
|
||||
*/
|
||||
@BindsInstance
|
||||
Builder isolationOverride(
|
||||
@Nullable @Config("beamIsolationOverride") TransactionIsolationLevel isolationOverride);
|
||||
|
||||
RegistryPipelineComponent build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,13 +14,23 @@
|
|||
|
||||
package google.registry.beam.common;
|
||||
|
||||
import google.registry.beam.common.RegistryJpaIO.Write;
|
||||
import google.registry.config.RegistryEnvironment;
|
||||
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
|
||||
import java.util.Objects;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
|
||||
import org.apache.beam.sdk.options.Default;
|
||||
import org.apache.beam.sdk.options.Description;
|
||||
|
||||
/** Defines Nomulus-specific pipeline options. */
|
||||
/**
|
||||
* Defines Nomulus-specific pipeline options, e.g. JPA configurations.
|
||||
*
|
||||
* <p>When using the Cloud Dataflow runner, users are recommended to set an upper bound on active
|
||||
* database connections by setting the pipeline worker options including {@code --maxNumWorkers},
|
||||
* {@code workerMachineType}, and {@code numberOfWorkerHarnessThreads}. Please refer to {@link
|
||||
* Write#shards()} for more information.
|
||||
*/
|
||||
public interface RegistryPipelineOptions extends GcpOptions {
|
||||
|
||||
@Description("The Registry environment.")
|
||||
|
@ -29,6 +39,32 @@ public interface RegistryPipelineOptions extends GcpOptions {
|
|||
|
||||
void setRegistryEnvironment(RegistryEnvironment environment);
|
||||
|
||||
@Description("The desired SQL transaction isolation level.")
|
||||
@Nullable
|
||||
TransactionIsolationLevel getIsolationOverride();
|
||||
|
||||
void setIsolationOverride(TransactionIsolationLevel isolationOverride);
|
||||
|
||||
@Description("The number of entities to write to the SQL database in one operation.")
|
||||
@Default.Integer(20)
|
||||
int getSqlWriteBatchSize();
|
||||
|
||||
void setSqlWriteBatchSize(int sqlWriteBatchSize);
|
||||
|
||||
@Description(
|
||||
"Number of shards to create out of the data before writing to the SQL database. Please refer "
|
||||
+ "to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.")
|
||||
@Default.Integer(100)
|
||||
int getSqlWriteShards();
|
||||
|
||||
void setSqlWriteShards(int maxConcurrentSqlWriters);
|
||||
|
||||
static RegistryPipelineComponent toRegistryPipelineComponent(RegistryPipelineOptions options) {
|
||||
return DaggerRegistryPipelineComponent.builder()
|
||||
.isolationOverride(options.getIsolationOverride())
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the GCP project and Registry environment settings in {@code option}. If project is
|
||||
* undefined, it is set according to the Registry environment; if project is defined but
|
||||
|
@ -38,18 +74,18 @@ public interface RegistryPipelineOptions extends GcpOptions {
|
|||
* in {@link RegistryEnvironment}). Tests calling this method must restore the original
|
||||
* environment on completion.
|
||||
*/
|
||||
static void validateRegistryPipelineOptions(RegistryPipelineOptions option) {
|
||||
RegistryEnvironment environment = option.getRegistryEnvironment();
|
||||
static void validateRegistryPipelineOptions(RegistryPipelineOptions options) {
|
||||
RegistryEnvironment environment = options.getRegistryEnvironment();
|
||||
if (environment == null) {
|
||||
return;
|
||||
}
|
||||
environment.setup();
|
||||
String projectByEnv = DaggerRegistryPipelineComponent.create().getProjectId();
|
||||
if (Objects.equals(option.getProject(), projectByEnv)) {
|
||||
String projectByEnv = toRegistryPipelineComponent(options).getProjectId();
|
||||
if (Objects.equals(options.getProject(), projectByEnv)) {
|
||||
return;
|
||||
}
|
||||
if (option.getProject() == null) {
|
||||
option.setProject(projectByEnv);
|
||||
if (options.getProject() == null) {
|
||||
options.setProject(projectByEnv);
|
||||
return;
|
||||
}
|
||||
throw new IllegalArgumentException(
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
|
||||
package google.registry.beam.common;
|
||||
|
||||
import static google.registry.beam.common.RegistryPipelineOptions.toRegistryPipelineComponent;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import com.google.common.flogger.FluentLogger;
|
||||
import dagger.Lazy;
|
||||
|
@ -24,7 +26,8 @@ import org.apache.beam.sdk.harness.JvmInitializer;
|
|||
import org.apache.beam.sdk.options.PipelineOptions;
|
||||
|
||||
/**
|
||||
* Sets up Nomulus environment and initializes JPA on each pipeline worker.
|
||||
* Sets up Nomulus environment and initializes JPA on each pipeline worker. It is assumed that the
|
||||
* pipeline only works with one SQL database.
|
||||
*
|
||||
* <p>This class only takes effect in portable beam pipeline runners (including the Cloud Dataflow
|
||||
* runner). It is not invoked in test pipelines.
|
||||
|
@ -35,15 +38,15 @@ public class RegistryPipelineWorkerInitializer implements JvmInitializer {
|
|||
|
||||
@Override
|
||||
public void beforeProcessing(PipelineOptions options) {
|
||||
RegistryEnvironment environment =
|
||||
options.as(RegistryPipelineOptions.class).getRegistryEnvironment();
|
||||
RegistryPipelineOptions registryOptions = options.as(RegistryPipelineOptions.class);
|
||||
RegistryEnvironment environment = registryOptions.getRegistryEnvironment();
|
||||
if (environment == null || environment.equals(RegistryEnvironment.UNITTEST)) {
|
||||
return;
|
||||
}
|
||||
logger.atInfo().log("Setting up RegistryEnvironment: %s", environment);
|
||||
environment.setup();
|
||||
Lazy<JpaTransactionManager> transactionManagerLazy =
|
||||
DaggerRegistryPipelineComponent.create().getJpaTransactionManager();
|
||||
toRegistryPipelineComponent(registryOptions).getJpaTransactionManager();
|
||||
TransactionManagerFactory.setJpaTmOnBeamWorker(transactionManagerLazy::get);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ import com.google.common.collect.ImmutableSet;
|
|||
import com.googlecode.objectify.Key;
|
||||
import google.registry.backup.AppEngineEnvironment;
|
||||
import google.registry.backup.VersionedEntity;
|
||||
import google.registry.beam.initsql.BeamJpaModule.JpaTransactionManagerComponent;
|
||||
import google.registry.beam.common.RegistryJpaIO;
|
||||
import google.registry.beam.initsql.Transforms.RemoveDomainBaseForeignKeys;
|
||||
import google.registry.model.billing.BillingEvent;
|
||||
import google.registry.model.contact.ContactResource;
|
||||
|
@ -35,7 +35,6 @@ import google.registry.model.registrar.RegistrarContact;
|
|||
import google.registry.model.registry.Registry;
|
||||
import google.registry.model.reporting.HistoryEntry;
|
||||
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
|
||||
import google.registry.persistence.transaction.JpaTransactionManager;
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
|
@ -43,7 +42,6 @@ import org.apache.beam.sdk.Pipeline;
|
|||
import org.apache.beam.sdk.PipelineResult;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
import org.apache.beam.sdk.transforms.ParDo;
|
||||
import org.apache.beam.sdk.transforms.SerializableFunction;
|
||||
import org.apache.beam.sdk.transforms.Wait;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.apache.beam.sdk.values.PCollectionTuple;
|
||||
|
@ -124,20 +122,15 @@ public class InitSqlPipeline implements Serializable {
|
|||
|
||||
private final Pipeline pipeline;
|
||||
|
||||
private final SerializableFunction<JpaTransactionManagerComponent, JpaTransactionManager>
|
||||
jpaGetter;
|
||||
|
||||
InitSqlPipeline(InitSqlPipelineOptions options) {
|
||||
this.options = options;
|
||||
pipeline = Pipeline.create(options);
|
||||
jpaGetter = JpaTransactionManagerComponent::cloudSqlJpaTransactionManager;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
InitSqlPipeline(InitSqlPipelineOptions options, Pipeline pipeline) {
|
||||
this.options = options;
|
||||
this.pipeline = pipeline;
|
||||
jpaGetter = JpaTransactionManagerComponent::localDbJpaTransactionManager;
|
||||
}
|
||||
|
||||
public PipelineResult run() {
|
||||
|
@ -147,6 +140,7 @@ public class InitSqlPipeline implements Serializable {
|
|||
|
||||
@VisibleForTesting
|
||||
void setupPipeline() {
|
||||
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_UNCOMMITTED);
|
||||
PCollectionTuple datastoreSnapshot =
|
||||
pipeline.apply(
|
||||
"Load Datastore snapshot",
|
||||
|
@ -223,22 +217,13 @@ public class InitSqlPipeline implements Serializable {
|
|||
}
|
||||
|
||||
private PCollection<Void> writeToSql(String transformId, PCollection<VersionedEntity> data) {
|
||||
String credentialFileUrl =
|
||||
options.getSqlCredentialUrlOverride() != null
|
||||
? options.getSqlCredentialUrlOverride()
|
||||
: BackupPaths.getCloudSQLCredentialFilePatterns(options.getEnvironment()).get(0);
|
||||
|
||||
return data.apply(
|
||||
"Write to sql: " + transformId,
|
||||
Transforms.writeToSql(
|
||||
transformId,
|
||||
options.getMaxConcurrentSqlWriters(),
|
||||
options.getSqlWriteBatchSize(),
|
||||
new JpaSupplierFactory(
|
||||
credentialFileUrl,
|
||||
options.getCloudKmsProjectId(),
|
||||
jpaGetter,
|
||||
TransactionIsolationLevel.TRANSACTION_READ_UNCOMMITTED)));
|
||||
"Write to Sql: " + transformId,
|
||||
RegistryJpaIO.<VersionedEntity>write()
|
||||
.withName(transformId)
|
||||
.withBatchSize(options.getSqlWriteBatchSize())
|
||||
.withShards(options.getSqlWriteShards())
|
||||
.withJpaConverter(Transforms::convertVersionedEntityToSqlEntity));
|
||||
}
|
||||
|
||||
private static ImmutableList<String> toKindStrings(Collection<Class<?>> entityClasses) {
|
||||
|
|
|
@ -14,21 +14,12 @@
|
|||
|
||||
package google.registry.beam.initsql;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
|
||||
import org.apache.beam.sdk.options.Default;
|
||||
import google.registry.beam.common.RegistryPipelineOptions;
|
||||
import org.apache.beam.sdk.options.Description;
|
||||
import org.apache.beam.sdk.options.Validation;
|
||||
|
||||
/** Pipeline options for {@link InitSqlPipeline} */
|
||||
public interface InitSqlPipelineOptions extends GcpOptions {
|
||||
|
||||
@Description(
|
||||
"Overrides the URL to the SQL credential file. " + "Required if environment is not provided.")
|
||||
@Nullable
|
||||
String getSqlCredentialUrlOverride();
|
||||
|
||||
void setSqlCredentialUrlOverride(String credentialUrlOverride);
|
||||
public interface InitSqlPipelineOptions extends RegistryPipelineOptions {
|
||||
|
||||
@Description("The root directory of the export to load.")
|
||||
String getDatastoreExportDir();
|
||||
|
@ -51,41 +42,4 @@ public interface InitSqlPipelineOptions extends GcpOptions {
|
|||
String getCommitLogEndTimestamp();
|
||||
|
||||
void setCommitLogEndTimestamp(String commitLogEndTimestamp);
|
||||
|
||||
@Description(
|
||||
"The deployed environment, alpha, crash, sandbox, or production. "
|
||||
+ "Not required only if sqlCredentialUrlOverride is provided.")
|
||||
@Nullable
|
||||
String getEnvironment();
|
||||
|
||||
void setEnvironment(String environment);
|
||||
|
||||
@Description(
|
||||
"The GCP project that contains the keyring used for decrypting the " + "SQL credential file.")
|
||||
@Nullable
|
||||
String getCloudKmsProjectId();
|
||||
|
||||
void setCloudKmsProjectId(String cloudKmsProjectId);
|
||||
|
||||
@Description(
|
||||
"The maximum JDBC connection pool size on a VM. "
|
||||
+ "This value should be equal to or greater than the number of cores on the VM.")
|
||||
@Default.Integer(4)
|
||||
int getJdbcMaxPoolSize();
|
||||
|
||||
void setJdbcMaxPoolSize(int jdbcMaxPoolSize);
|
||||
|
||||
@Description(
|
||||
"A hint to the pipeline runner of the maximum number of concurrent SQL writers to create. "
|
||||
+ "Note that multiple writers may run on the same VM and share the connection pool.")
|
||||
@Default.Integer(4)
|
||||
int getMaxConcurrentSqlWriters();
|
||||
|
||||
void setMaxConcurrentSqlWriters(int maxConcurrentSqlWriters);
|
||||
|
||||
@Description("The number of entities to be written to the SQL database in one transaction.")
|
||||
@Default.Integer(20)
|
||||
int getSqlWriteBatchSize();
|
||||
|
||||
void setSqlWriteBatchSize(int sqlWriteBatchSize);
|
||||
}
|
||||
|
|
|
@ -368,7 +368,7 @@ public final class Transforms {
|
|||
* to make Optional work with BEAM)
|
||||
*/
|
||||
@Nullable
|
||||
private static Object convertVersionedEntityToSqlEntity(VersionedEntity dsEntity) {
|
||||
public static Object convertVersionedEntityToSqlEntity(VersionedEntity dsEntity) {
|
||||
return dsEntity
|
||||
.getEntity()
|
||||
.filter(Transforms::isMigratable)
|
||||
|
|
|
@ -124,18 +124,6 @@ public abstract class PersistenceModule {
|
|||
@Config("beamIsolationOverride")
|
||||
abstract TransactionIsolationLevel bindBeamIsolationOverride();
|
||||
|
||||
/**
|
||||
* Optionally overrides the maximum connection pool size for JPA.
|
||||
*
|
||||
* <p>If present, this binding overrides the {@code HIKARI_MAXIMUM_POOL_SIZE} value set in {@link
|
||||
* #provideDefaultDatabaseConfigs()}. The default value is tuned for the Registry server on
|
||||
* AppEngine. Other applications such as the Nomulus tool and the BEAM pipeline, should override
|
||||
* it.
|
||||
*/
|
||||
@BindsOptionalOf
|
||||
@Config("jpaMaxPoolSizeOverride")
|
||||
abstract Integer bindJpaMaxPoolSizeOverride();
|
||||
|
||||
/**
|
||||
* Optionally overrides the Cloud SQL database instance's connection name.
|
||||
*
|
||||
|
@ -201,7 +189,6 @@ public abstract class PersistenceModule {
|
|||
SqlCredentialStore credentialStore,
|
||||
@Config("instanceConnectionNameOverride")
|
||||
Optional<Provider<String>> instanceConnectionNameOverride,
|
||||
@Config("jpaMaxPoolSizeOverride") Optional<Integer> jpaMaxConnectionPoolSizeOverride,
|
||||
@Config("beamIsolationOverride")
|
||||
Optional<Provider<TransactionIsolationLevel>> isolationOverride,
|
||||
@PartialCloudSqlConfigs ImmutableMap<String, String> cloudSqlConfigs,
|
||||
|
@ -211,13 +198,21 @@ public abstract class PersistenceModule {
|
|||
SqlCredential credential = credentialStore.getCredential(new RobotUser(RobotId.NOMULUS));
|
||||
overrides.put(Environment.USER, credential.login());
|
||||
overrides.put(Environment.PASS, credential.password());
|
||||
// Override the default minimum which is tuned for the Registry server. A worker VM should
|
||||
// release all connections if it no longer interacts with the database.
|
||||
overrides.put(HIKARI_MINIMUM_IDLE, "0");
|
||||
/**
|
||||
* Disable Hikari's maxPoolSize limit check by setting it to an absurdly large number. The
|
||||
* effective (and desirable) limit is the number of pipeline threads on the pipeline worker,
|
||||
* which can be configured using pipeline options. See {@link RegistryPipelineOptions} for more
|
||||
* information.
|
||||
*/
|
||||
overrides.put(HIKARI_MAXIMUM_POOL_SIZE, String.valueOf(Integer.MAX_VALUE));
|
||||
instanceConnectionNameOverride
|
||||
.map(Provider::get)
|
||||
.ifPresent(
|
||||
instanceConnectionName ->
|
||||
overrides.put(HIKARI_DS_CLOUD_SQL_INSTANCE, instanceConnectionName));
|
||||
jpaMaxConnectionPoolSizeOverride.ifPresent(
|
||||
maxPoolSize -> overrides.put(HIKARI_MAXIMUM_POOL_SIZE, String.valueOf(maxPoolSize)));
|
||||
isolationOverride
|
||||
.map(Provider::get)
|
||||
.ifPresent(isolation -> overrides.put(Environment.ISOLATION, isolation.name()));
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.common;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
|
||||
|
||||
import com.google.appengine.api.datastore.Entity;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import google.registry.backup.VersionedEntity;
|
||||
import google.registry.beam.TestPipelineExtension;
|
||||
import google.registry.beam.initsql.BackupTestStore;
|
||||
import google.registry.beam.initsql.InitSqlTestUtils;
|
||||
import google.registry.beam.initsql.Transforms;
|
||||
import google.registry.model.ImmutableObject;
|
||||
import google.registry.model.contact.ContactResource;
|
||||
import google.registry.model.ofy.Ofy;
|
||||
import google.registry.model.registrar.Registrar;
|
||||
import google.registry.persistence.transaction.JpaTestRules;
|
||||
import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension;
|
||||
import google.registry.testing.AppEngineExtension;
|
||||
import google.registry.testing.DatabaseHelper;
|
||||
import google.registry.testing.DatastoreEntityExtension;
|
||||
import google.registry.testing.FakeClock;
|
||||
import google.registry.testing.InjectExtension;
|
||||
import java.io.Serializable;
|
||||
import java.nio.file.Path;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.beam.sdk.transforms.Create;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Order;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
/** Unit test for {@link RegistryJpaIO.Write}. */
|
||||
class RegistryJpaWriteTest implements Serializable {
|
||||
|
||||
private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
|
||||
|
||||
private final FakeClock fakeClock = new FakeClock(START_TIME);
|
||||
|
||||
@RegisterExtension
|
||||
@Order(Order.DEFAULT - 1)
|
||||
final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension();
|
||||
|
||||
@RegisterExtension final transient InjectExtension injectRule = new InjectExtension();
|
||||
|
||||
@RegisterExtension
|
||||
final transient JpaIntegrationTestExtension database =
|
||||
new JpaTestRules.Builder().withClock(fakeClock).buildIntegrationTestRule();
|
||||
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
@TempDir
|
||||
transient Path tmpDir;
|
||||
|
||||
@RegisterExtension
|
||||
final transient TestPipelineExtension testPipeline =
|
||||
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
|
||||
|
||||
private ImmutableList<Entity> contacts;
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws Exception {
|
||||
try (BackupTestStore store = new BackupTestStore(fakeClock)) {
|
||||
injectRule.setStaticField(Ofy.class, "clock", fakeClock);
|
||||
|
||||
// Required for contacts created below.
|
||||
Registrar ofyRegistrar = AppEngineExtension.makeRegistrar2();
|
||||
store.insertOrUpdate(ofyRegistrar);
|
||||
jpaTm().transact(() -> jpaTm().put(store.loadAsOfyEntity(ofyRegistrar)));
|
||||
|
||||
ImmutableList.Builder<Entity> builder = new ImmutableList.Builder<>();
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
ContactResource contact = DatabaseHelper.newContactResource("contact_" + i);
|
||||
store.insertOrUpdate(contact);
|
||||
builder.add(store.loadAsDatastoreEntity(contact));
|
||||
}
|
||||
contacts = builder.build();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void writeToSql_twoWriters() {
|
||||
testPipeline
|
||||
.apply(
|
||||
Create.of(
|
||||
contacts.stream()
|
||||
.map(InitSqlTestUtils::entityToBytes)
|
||||
.map(bytes -> VersionedEntity.from(0L, bytes))
|
||||
.collect(Collectors.toList())))
|
||||
.apply(
|
||||
RegistryJpaIO.<VersionedEntity>write()
|
||||
.withName("ContactResource")
|
||||
.withBatchSize(4)
|
||||
.withShards(2)
|
||||
.withJpaConverter(Transforms::convertVersionedEntityToSqlEntity));
|
||||
testPipeline.run().waitUntilFinish();
|
||||
|
||||
ImmutableList<?> sqlContacts = jpaTm().transact(() -> jpaTm().loadAllOf(ContactResource.class));
|
||||
assertThat(sqlContacts)
|
||||
.comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp"))
|
||||
.containsExactlyElementsIn(
|
||||
contacts.stream()
|
||||
.map(InitSqlTestUtils::datastoreToOfyEntity)
|
||||
.map(ImmutableObject.class::cast)
|
||||
.collect(ImmutableList.toImmutableList()));
|
||||
}
|
||||
}
|
|
@ -19,6 +19,7 @@ import static google.registry.beam.common.RegistryPipelineOptions.validateRegist
|
|||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
import google.registry.config.RegistryEnvironment;
|
||||
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
|
||||
import google.registry.testing.SystemPropertyExtension;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
@ -43,29 +44,42 @@ class RegistryPipelineOptionsTest {
|
|||
|
||||
@Test
|
||||
void environment_fromArgs() {
|
||||
assertThat(
|
||||
PipelineOptionsFactory.fromArgs("--registryEnvironment=ALPHA")
|
||||
.as(RegistryPipelineOptions.class)
|
||||
.getRegistryEnvironment())
|
||||
.isSameInstanceAs(RegistryEnvironment.ALPHA);
|
||||
RegistryPipelineOptions options =
|
||||
PipelineOptionsFactory.fromArgs(
|
||||
"--registryEnvironment=ALPHA", "--isolationOverride=TRANSACTION_SERIALIZABLE")
|
||||
.withValidation()
|
||||
.as(RegistryPipelineOptions.class);
|
||||
assertThat(options.getRegistryEnvironment()).isSameInstanceAs(RegistryEnvironment.ALPHA);
|
||||
assertThat(options.getIsolationOverride())
|
||||
.isSameInstanceAs(TransactionIsolationLevel.TRANSACTION_SERIALIZABLE);
|
||||
}
|
||||
|
||||
@Test
|
||||
void environment_invalid() {
|
||||
void environment_invalidEnvironment() {
|
||||
assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
() ->
|
||||
PipelineOptionsFactory.fromArgs("--registryEnvironment=alpha")
|
||||
.withValidation()
|
||||
.as(RegistryPipelineOptions.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void environment_invalidIsolation() {
|
||||
assertThrows(
|
||||
IllegalArgumentException.class,
|
||||
() ->
|
||||
PipelineOptionsFactory.fromArgs("--isolationOverride=something_wrong")
|
||||
.withValidation()
|
||||
.as(RegistryPipelineOptions.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void environment_undefined() {
|
||||
assertThat(
|
||||
PipelineOptionsFactory.create()
|
||||
.as(RegistryPipelineOptions.class)
|
||||
.getRegistryEnvironment())
|
||||
.isNull();
|
||||
RegistryPipelineOptions options =
|
||||
PipelineOptionsFactory.fromArgs().withValidation().as(RegistryPipelineOptions.class);
|
||||
assertThat(options.getRegistryEnvironment()).isNull();
|
||||
assertThat(options.getIsolationOverride()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -47,7 +47,7 @@ import org.joda.time.format.DateTimeFormatter;
|
|||
* every transaction is invoked on this store, ensuring strictly increasing timestamps on causally
|
||||
* dependent transactions. In production, the same ordering is ensured by sleep and retry.
|
||||
*/
|
||||
class BackupTestStore implements AutoCloseable {
|
||||
public final class BackupTestStore implements AutoCloseable {
|
||||
|
||||
private static final DateTimeFormatter EXPORT_TIMESTAMP_FORMAT =
|
||||
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss_SSS");
|
||||
|
@ -59,7 +59,7 @@ class BackupTestStore implements AutoCloseable {
|
|||
|
||||
private CommitLogCheckpoint prevCommitLogCheckpoint;
|
||||
|
||||
BackupTestStore(FakeClock fakeClock) throws Exception {
|
||||
public BackupTestStore(FakeClock fakeClock) throws Exception {
|
||||
this.fakeClock = fakeClock;
|
||||
this.appEngine =
|
||||
new AppEngineExtension.Builder()
|
||||
|
@ -88,7 +88,7 @@ class BackupTestStore implements AutoCloseable {
|
|||
* transaction.
|
||||
*/
|
||||
@SafeVarargs
|
||||
final long insertOrUpdate(Object... entities) {
|
||||
public final long insertOrUpdate(Object... entities) {
|
||||
long timestamp = fakeClock.nowUtc().getMillis();
|
||||
tm().transact(() -> ofy().save().entities(entities).now());
|
||||
fakeClock.advanceOneMilli();
|
||||
|
@ -97,7 +97,7 @@ class BackupTestStore implements AutoCloseable {
|
|||
|
||||
/** Deletes {@code entities} from the Datastore and returns the timestamp of this transaction. */
|
||||
@SafeVarargs
|
||||
final long delete(Object... entities) {
|
||||
public final long delete(Object... entities) {
|
||||
long timestamp = fakeClock.nowUtc().getMillis();
|
||||
tm().transact(() -> ofy().delete().entities(entities).now());
|
||||
fakeClock.advanceOneMilli();
|
||||
|
@ -111,7 +111,7 @@ class BackupTestStore implements AutoCloseable {
|
|||
* Objectify entity and want to find out the values of certain assign-on-persist properties. See
|
||||
* {@link VersionedEntity} for more information.
|
||||
*/
|
||||
Entity loadAsDatastoreEntity(Object ofyEntity) {
|
||||
public Entity loadAsDatastoreEntity(Object ofyEntity) {
|
||||
try {
|
||||
return datastoreService.get(Key.create(ofyEntity).getRaw());
|
||||
} catch (EntityNotFoundException e) {
|
||||
|
@ -124,7 +124,7 @@ class BackupTestStore implements AutoCloseable {
|
|||
*
|
||||
* <p>See {@link #loadAsDatastoreEntity} and {@link VersionedEntity} for more information.
|
||||
*/
|
||||
Object loadAsOfyEntity(Object ofyEntity) {
|
||||
public Object loadAsOfyEntity(Object ofyEntity) {
|
||||
try {
|
||||
return ofy().load().fromEntity(datastoreService.get(Key.create(ofyEntity).getRaw()));
|
||||
} catch (EntityNotFoundException e) {
|
||||
|
|
|
@ -38,7 +38,7 @@ class InitSqlPipelineGraphTest {
|
|||
"--commitLogEndTimestamp=2000-01-02TZ",
|
||||
"--datastoreExportDir=/somedir",
|
||||
"--commitLogDir=/someotherdir",
|
||||
"--environment=alpha"
|
||||
"--registryEnvironment=ALPHA"
|
||||
};
|
||||
|
||||
private static final transient InitSqlPipelineOptions options =
|
||||
|
|
|
@ -317,8 +317,6 @@ class InitSqlPipelineTest {
|
|||
void runPipeline() {
|
||||
InitSqlPipelineOptions options =
|
||||
PipelineOptionsFactory.fromArgs(
|
||||
"--sqlCredentialUrlOverride="
|
||||
+ beamJpaExtension.getCredentialFile().getAbsolutePath(),
|
||||
"--commitLogStartTimestamp=" + START_TIME,
|
||||
"--commitLogEndTimestamp=" + fakeClock.nowUtc().plusMillis(1),
|
||||
"--datastoreExportDir=" + exportDir.getAbsolutePath(),
|
||||
|
|
File diff suppressed because it is too large
Load diff
Binary file not shown.
Before Width: | Height: | Size: 1.1 MiB After Width: | Height: | Size: 1.1 MiB |
Loading…
Add table
Add a link
Reference in a new issue