Properly set up JPA in BEAM workers (#965)

* Properly set up JPA in BEAM workers

Sets up a singleton JpaTransactionManger on each worker JVM for all
pipeline nodes to share.

Also added/updated relevant dependencies. The BEAM SDK version change
caused the InitSqlPipeline's graph to change.
This commit is contained in:
Weimin Yu 2021-02-22 18:11:32 -05:00 committed by GitHub
parent 8f90b5746a
commit ffe3124ee1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
159 changed files with 8896 additions and 7821 deletions

View file

@ -0,0 +1,81 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.common;
import static com.google.common.base.Verify.verify;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import google.registry.backup.AppEngineEnvironment;
import google.registry.persistence.transaction.JpaTransactionManager;
import java.io.Serializable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
/** Toy pipeline that demonstrates how to use {@link JpaTransactionManager} in BEAM pipelines. */
public class JpaDemoPipeline implements Serializable {
public static void main(String[] args) {
RegistryPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(RegistryPipelineOptions.class);
RegistryPipelineOptions.validateRegistryPipelineOptions(options);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Start", Create.of((Void) null))
.apply(
"Generate Elements",
ParDo.of(
new DoFn<Void, Void>() {
@ProcessElement
public void processElement(OutputReceiver<Void> output) {
for (int i = 0; i < 500; i++) {
output.output(null);
}
}
}))
.apply(
"Make Query",
ParDo.of(
new DoFn<Void, Void>() {
private Counter counter = Metrics.counter("Demo", "Read");
@ProcessElement
public void processElement() {
// AppEngineEnvironment is needed as long as JPA entity classes still depends
// on Objectify.
try (AppEngineEnvironment allowOfyEntity = new AppEngineEnvironment()) {
int result =
(Integer)
jpaTm()
.transact(
() ->
jpaTm()
.getEntityManager()
.createNativeQuery("select 1;")
.getSingleResult());
verify(result == 1, "Expecting 1, got %s.", result);
}
counter.inc();
}
}));
pipeline.run();
}
}

View file

@ -0,0 +1,47 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.common;
import dagger.Component;
import dagger.Lazy;
import google.registry.config.CredentialModule;
import google.registry.config.RegistryConfig.Config;
import google.registry.config.RegistryConfig.ConfigModule;
import google.registry.persistence.PersistenceModule;
import google.registry.persistence.PersistenceModule.BeamJpaTm;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.privileges.secretmanager.SecretManagerModule;
import google.registry.util.UtilsModule;
import javax.inject.Singleton;
/** Component that provides everything needed on a Pipeline worker. */
@Singleton
@Component(
modules = {
ConfigModule.class,
CredentialModule.class,
PersistenceModule.class,
SecretManagerModule.class,
UtilsModule.class
})
public interface RegistryPipelineComponent {
/** Returns the GCP project ID. */
@Config("projectId")
String getProjectId();
@BeamJpaTm
Lazy<JpaTransactionManager> getJpaTransactionManager();
}

View file

@ -0,0 +1,58 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.common;
import google.registry.config.RegistryEnvironment;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.Description;
/** Defines Nomulus-specific pipeline options. */
public interface RegistryPipelineOptions extends GcpOptions {
@Description("The Registry environment.")
@Nullable
RegistryEnvironment getRegistryEnvironment();
void setRegistryEnvironment(RegistryEnvironment environment);
/**
* Validates the GCP project and Registry environment settings in {@code option}. If project is
* undefined, it is set according to the Registry environment; if project is defined but
* inconsistent with the Registry environment, an {@link IllegalArgumentException} will be thrown.
*
* <p>This method may modify the system property ("google.registry.environment" which is defined
* in {@link RegistryEnvironment}). Tests calling this method must restore the original
* environment on completion.
*/
static void validateRegistryPipelineOptions(RegistryPipelineOptions option) {
RegistryEnvironment environment = option.getRegistryEnvironment();
if (environment == null) {
return;
}
environment.setup();
String projectByEnv = DaggerRegistryPipelineComponent.create().getProjectId();
if (Objects.equals(option.getProject(), projectByEnv)) {
return;
}
if (option.getProject() == null) {
option.setProject(projectByEnv);
return;
}
throw new IllegalArgumentException(
"Arguments for --project and --registryEnvironment do not match.");
}
}

View file

@ -0,0 +1,49 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.common;
import com.google.auto.service.AutoService;
import com.google.common.flogger.FluentLogger;
import dagger.Lazy;
import google.registry.config.RegistryEnvironment;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.persistence.transaction.TransactionManagerFactory;
import org.apache.beam.sdk.harness.JvmInitializer;
import org.apache.beam.sdk.options.PipelineOptions;
/**
* Sets up Nomulus environment and initializes JPA on each pipeline worker.
*
* <p>This class only takes effect in portable beam pipeline runners (including the Cloud Dataflow
* runner). It is not invoked in test pipelines.
*/
@AutoService(JvmInitializer.class)
public class RegistryPipelineWorkerInitializer implements JvmInitializer {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@Override
public void beforeProcessing(PipelineOptions options) {
RegistryEnvironment environment =
options.as(RegistryPipelineOptions.class).getRegistryEnvironment();
if (environment == null || environment.equals(RegistryEnvironment.UNITTEST)) {
return;
}
logger.atInfo().log("Setting up RegistryEnvironment: %s", environment);
environment.setup();
Lazy<JpaTransactionManager> transactionManagerLazy =
DaggerRegistryPipelineComponent.create().getJpaTransactionManager();
TransactionManagerFactory.setJpaTmOnBeamWorker(transactionManagerLazy::get);
}
}

View file

@ -124,6 +124,28 @@ public abstract class PersistenceModule {
@Config("beamIsolationOverride")
abstract TransactionIsolationLevel bindBeamIsolationOverride();
/**
* Optionally overrides the maximum connection pool size for JPA.
*
* <p>If present, this binding overrides the {@code HIKARI_MAXIMUM_POOL_SIZE} value set in {@link
* #provideDefaultDatabaseConfigs()}. The default value is tuned for the Registry server on
* AppEngine. Other applications such as the Nomulus tool and the BEAM pipeline, should override
* it.
*/
@BindsOptionalOf
@Config("jpaMaxPoolSizeOverride")
abstract Integer bindJpaMaxPoolSizeOverride();
/**
* Optionally overrides the Cloud SQL database instance's connection name.
*
* <p>This allows connections to alternative database instances, e.g., the read-only replica or a
* test database.
*/
@BindsOptionalOf
@Config("instanceConnectionNameOverride")
abstract String instanceConnectionNameOverride();
@Provides
@Singleton
@BeamPipelineCloudSqlConfigs
@ -172,6 +194,36 @@ public abstract class PersistenceModule {
return new JpaTransactionManagerImpl(create(overrides), clock);
}
@Provides
@Singleton
@BeamJpaTm
static JpaTransactionManager provideBeamJpaTm(
SqlCredentialStore credentialStore,
@Config("instanceConnectionNameOverride")
Optional<Provider<String>> instanceConnectionNameOverride,
@Config("jpaMaxPoolSizeOverride") Optional<Integer> jpaMaxConnectionPoolSizeOverride,
@Config("beamIsolationOverride")
Optional<Provider<TransactionIsolationLevel>> isolationOverride,
@PartialCloudSqlConfigs ImmutableMap<String, String> cloudSqlConfigs,
Clock clock) {
HashMap<String, String> overrides = Maps.newHashMap(cloudSqlConfigs);
// TODO(b/175700623): make sql username configurable from config file.
SqlCredential credential = credentialStore.getCredential(new RobotUser(RobotId.NOMULUS));
overrides.put(Environment.USER, credential.login());
overrides.put(Environment.PASS, credential.password());
instanceConnectionNameOverride
.map(Provider::get)
.ifPresent(
instanceConnectionName ->
overrides.put(HIKARI_DS_CLOUD_SQL_INSTANCE, instanceConnectionName));
jpaMaxConnectionPoolSizeOverride.ifPresent(
maxPoolSize -> overrides.put(HIKARI_MAXIMUM_POOL_SIZE, String.valueOf(maxPoolSize)));
isolationOverride
.map(Provider::get)
.ifPresent(isolation -> overrides.put(Environment.ISOLATION, isolation.name()));
return new JpaTransactionManagerImpl(create(overrides), clock);
}
@Provides
@Singleton
@NomulusToolJpaTm
@ -336,6 +388,12 @@ public abstract class PersistenceModule {
@Documented
@interface AppEngineJpaTm {}
/** Dagger qualifier for {@link JpaTransactionManager} used inside BEAM pipelines. */
// Note: @SocketFactoryJpaTm will be phased out in favor of this qualifier.
@Qualifier
@Documented
public @interface BeamJpaTm {}
/** Dagger qualifier for {@link JpaTransactionManager} used for Nomulus tool. */
@Qualifier
@Documented

View file

@ -101,6 +101,16 @@ public class TransactionManagerFactory {
jpaTm = Suppliers.memoize(jpaTmSupplier::get);
}
/**
* Makes {@link #jpaTm()} return the {@link JpaTransactionManager} instance provided by {@code
* jpaTmSupplier} from now on. This method should only be called by an implementor of {@link
* org.apache.beam.sdk.harness.JvmInitializer}.
*/
public static void setJpaTmOnBeamWorker(Supplier<JpaTransactionManager> jpaTmSupplier) {
checkNotNull(jpaTmSupplier, "jpaTmSupplier");
jpaTm = Suppliers.memoize(jpaTmSupplier::get);
}
/** Sets the return of {@link #tm()} to the given instance of {@link TransactionManager}. */
@VisibleForTesting
public static void setTm(TransactionManager newTm) {

View file

@ -0,0 +1,112 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.common;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.beam.common.RegistryPipelineOptions.validateRegistryPipelineOptions;
import static org.junit.jupiter.api.Assertions.assertThrows;
import google.registry.config.RegistryEnvironment;
import google.registry.testing.SystemPropertyExtension;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
/** Unit tests for {@link RegistryPipelineOptions}. */
class RegistryPipelineOptionsTest {
/**
* Restores original RegistryEnvironment after calling {@link
* RegistryPipelineOptions#validateRegistryPipelineOptions}.
*/
@RegisterExtension
final SystemPropertyExtension systemPropertyExtension = new SystemPropertyExtension();
@BeforeEach
void beforeEach() {
// Saves the current RegistryEnvironment for restoration later.
RegistryEnvironment.get().setup(systemPropertyExtension);
}
@Test
void environment_fromArgs() {
assertThat(
PipelineOptionsFactory.fromArgs("--registryEnvironment=ALPHA")
.as(RegistryPipelineOptions.class)
.getRegistryEnvironment())
.isSameInstanceAs(RegistryEnvironment.ALPHA);
}
@Test
void environment_invalid() {
assertThrows(
IllegalArgumentException.class,
() ->
PipelineOptionsFactory.fromArgs("--registryEnvironment=alpha")
.as(RegistryPipelineOptions.class));
}
@Test
void environment_undefined() {
assertThat(
PipelineOptionsFactory.create()
.as(RegistryPipelineOptions.class)
.getRegistryEnvironment())
.isNull();
}
@Test
void validateProject_projectsMatch() {
RegistryPipelineOptions options =
PipelineOptionsFactory.fromArgs(
"--registryEnvironment=" + RegistryEnvironment.UNITTEST.name(),
"--project=registry-project-id")
.withValidation()
.as(RegistryPipelineOptions.class);
validateRegistryPipelineOptions(options);
}
@Test
void validateProject_projectsMismatch() {
RegistryPipelineOptions options =
PipelineOptionsFactory.fromArgs(
"--registryEnvironment=" + RegistryEnvironment.UNITTEST.name(), "--project=")
.withValidation()
.as(RegistryPipelineOptions.class);
assertThrows(IllegalArgumentException.class, () -> validateRegistryPipelineOptions(options));
}
@Test
void validateProject_missingProjectAdded() {
RegistryPipelineOptions options =
PipelineOptionsFactory.fromArgs(
"--registryEnvironment=" + RegistryEnvironment.UNITTEST.name())
.withValidation()
.as(RegistryPipelineOptions.class);
// If gcloud is installed and --project not set , project may be inferred.
options.setProject(null);
validateRegistryPipelineOptions(options);
assertThat(options.getProject()).isEqualTo("registry-project-id");
}
@Test
void validateProject_noEnvironment() {
RegistryPipelineOptions options =
PipelineOptionsFactory.fromArgs("--project=some-project").as(RegistryPipelineOptions.class);
validateRegistryPipelineOptions(options);
assertThat(options.getProject()).isEqualTo("some-project");
}
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.1 MiB

After

Width:  |  Height:  |  Size: 1.1 MiB

Before After
Before After