diff --git a/common/src/testing/java/google/registry/testing/truth/TextDiffSubject.java b/common/src/testing/java/google/registry/testing/truth/TextDiffSubject.java index f5a91e7a7..71071e2c5 100644 --- a/common/src/testing/java/google/registry/testing/truth/TextDiffSubject.java +++ b/common/src/testing/java/google/registry/testing/truth/TextDiffSubject.java @@ -16,6 +16,7 @@ package google.registry.testing.truth; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.truth.Truth.assertAbout; +import static com.google.common.truth.Truth.assertWithMessage; import static java.nio.charset.StandardCharsets.UTF_8; import com.github.difflib.DiffUtils; @@ -31,6 +32,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.io.Resources; import com.google.common.truth.Fact; import com.google.common.truth.FailureMetadata; +import com.google.common.truth.SimpleSubjectBuilder; import com.google.common.truth.Subject; import java.io.IOException; import java.net.URL; @@ -68,6 +70,15 @@ public class TextDiffSubject extends Subject { this.actual = ImmutableList.copyOf(actual); } + protected TextDiffSubject(FailureMetadata metadata, URL actual) { + super(metadata, actual); + try { + this.actual = ImmutableList.copyOf(Resources.asCharSource(actual, UTF_8).readLines()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + public TextDiffSubject withDiffFormat(DiffFormat format) { this.diffFormat = format; return this; @@ -100,6 +111,11 @@ public class TextDiffSubject extends Subject { return assertThat(Resources.asCharSource(resourceUrl, UTF_8).readLines()); } + public static SimpleSubjectBuilder assertWithMessageAboutUrlSource( + String format, Object... params) { + return assertWithMessage(format, params).about(urlFactory()); + } + private static final Subject.Factory> TEXT_DIFF_SUBJECT_TEXT_FACTORY = TextDiffSubject::new; @@ -107,6 +123,13 @@ public class TextDiffSubject extends Subject { return TEXT_DIFF_SUBJECT_TEXT_FACTORY; } + private static final Subject.Factory TEXT_DIFF_SUBJECT_URL_FACTORY = + TextDiffSubject::new; + + public static Subject.Factory urlFactory() { + return TEXT_DIFF_SUBJECT_URL_FACTORY; + } + static String generateUnifiedDiff( ImmutableList expectedContent, ImmutableList actualContent) { Patch diff; diff --git a/config/nom_build.py b/config/nom_build.py index 2ce896ddb..8bb0584ab 100644 --- a/config/nom_build.py +++ b/config/nom_build.py @@ -49,10 +49,10 @@ PROPERTIES_HEADER = """\ # This file defines properties used by the gradle build. It must be kept in # sync with config/nom_build.py. # -# To regenerate, run config/nom_build.py --generate-gradle-properties +# To regenerate, run ./nom_build --generate-gradle-properties # # To view property descriptions (which are command line flags for -# nom_build), run config/nom_build.py --help. +# nom_build), run ./nom_build --help. # # DO NOT EDIT THIS FILE BY HAND org.gradle.jvmargs=-Xmx1024m @@ -114,6 +114,11 @@ PROPERTIES = [ Property('nomulus_version', 'The version of nomulus to test against in a database ' 'integration test.'), + Property('dot_path', + 'The path to "dot", part of the graphviz package that converts ' + 'a BEAM pipeline to image. Setting this property to empty string ' + 'will disable image generation.', + '/usr/bin/dot'), ] GRADLE_FLAGS = [ diff --git a/core/build.gradle b/core/build.gradle index 58680ef0f..a089a2038 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -238,6 +238,7 @@ dependencies { compile deps['jline:jline'] compile deps['joda-time:joda-time'] compile deps['org.apache.avro:avro'] + testCompile deps['org.apache.beam:beam-runners-core-construction-java'] testCompile deps['org.apache.beam:beam-runners-direct-java'] compile deps['org.apache.beam:beam-runners-google-cloud-dataflow-java'] compile deps['org.apache.beam:beam-sdks-java-core'] @@ -968,6 +969,49 @@ task buildToolImage(dependsOn: nomulus, type: Exec) { commandLine 'docker', 'build', '-t', 'nomulus-tool', '.' } +task generateInitSqlPipelineGraph(type: Test) { + include "**/InitSqlPipelineGraphTest.*" + testNameIncludePatterns = ["**createPipeline_compareGraph"] + ignoreFailures = true +} + +task updateInitSqlPipelineGraph(type: Copy) { + def graphRelativePath = 'google/registry/beam/initsql/' + from ("${projectDir}/build/resources/test/${graphRelativePath}") { + include 'pipeline_curr.dot' + rename 'curr', 'golden' + } + into "src/test/resources/${graphRelativePath}" + + dependsOn generateInitSqlPipelineGraph + + doLast { + if (com.google.common.base.Strings.isNullOrEmpty(project.dot_path)) { + getLogger().info('Property dot_path is null. Not creating image for pipeline graph.') + } + def dotPath = project.dot_path + if (!new File(dotPath).exists()) { + throw new RuntimeException( + """\ + ${dotPath} not found. Make sure graphviz is installed + and the dot_path property is set correctly.""" + .stripIndent()) + } + def goldenGraph = "src/test/resources/${graphRelativePath}/pipeline_golden.dot" + def goldenImage = "src/test/resources/${graphRelativePath}/pipeline_golden.png" + def cmd = "${dotPath} -Tpng -o \"${goldenImage}\" \"${goldenGraph}\"" + try { + rootProject.ext.execInBash(cmd, projectDir) + } catch (Throwable throwable) { + throw new RuntimeException( + """\ + Failed to generate golden image with command ${cmd} + Error: ${throwable.getMessage()} + """) + } + } +} + // Build the devtool jar. createUberJar( 'devtool', diff --git a/core/src/main/java/google/registry/backup/AppEngineEnvironment.java b/core/src/main/java/google/registry/backup/AppEngineEnvironment.java index c4ca47466..43d05920b 100644 --- a/core/src/main/java/google/registry/backup/AppEngineEnvironment.java +++ b/core/src/main/java/google/registry/backup/AppEngineEnvironment.java @@ -22,21 +22,35 @@ import java.lang.reflect.Method; import java.lang.reflect.Proxy; /** - * Sets up a placeholder {@link Environment} on a non-AppEngine platform so that Datastore Entities - * can be converted from/to Objectify entities. See {@code DatastoreEntityExtension} in test source - * for more information. + * Sets up a fake {@link Environment} so that the following operations can be performed without the + * Datastore service: + * + *
    + *
  • Create Objectify {@code Keys}. + *
  • Instantiate Objectify objects. + *
  • Convert Datastore {@code Entities} to their corresponding Objectify objects. + *
+ * + *

User has the option to specify their desired {@code appId} string, which forms part of an + * Objectify {@code Key} and is included in the equality check. This feature makes it easy to + * compare a migrated object in SQL with the original in Objectify. + * + *

Note that conversion from Objectify objects to Datastore {@code Entities} still requires the + * Datastore service. */ public class AppEngineEnvironment implements Closeable { - private static final Environment PLACEHOLDER_ENV = createAppEngineEnvironment(); - private boolean isPlaceHolderNeeded; public AppEngineEnvironment() { + this("PlaceholderAppId"); + } + + public AppEngineEnvironment(String appId) { isPlaceHolderNeeded = ApiProxy.getCurrentEnvironment() == null; // isPlaceHolderNeeded may be true when we are invoked in a test with AppEngineRule. if (isPlaceHolderNeeded) { - ApiProxy.setEnvironmentForCurrentThread(PLACEHOLDER_ENV); + ApiProxy.setEnvironmentForCurrentThread(createAppEngineEnvironment(appId)); } } @@ -48,7 +62,7 @@ public class AppEngineEnvironment implements Closeable { } /** Returns a placeholder {@link Environment} that can return hardcoded AppId and Attributes. */ - private static Environment createAppEngineEnvironment() { + private static Environment createAppEngineEnvironment(String appId) { return (Environment) Proxy.newProxyInstance( Environment.class.getClassLoader(), @@ -56,7 +70,7 @@ public class AppEngineEnvironment implements Closeable { (Object proxy, Method method, Object[] args) -> { switch (method.getName()) { case "getAppId": - return "PlaceholderAppId"; + return appId; case "getAttributes": return ImmutableMap.of(); default: diff --git a/core/src/main/java/google/registry/beam/initsql/DomainBaseUtil.java b/core/src/main/java/google/registry/beam/initsql/DomainBaseUtil.java new file mode 100644 index 000000000..c81f01197 --- /dev/null +++ b/core/src/main/java/google/registry/beam/initsql/DomainBaseUtil.java @@ -0,0 +1,75 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.initsql; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.appengine.api.datastore.Entity; +import java.util.Objects; + +/** Helper for manipulating {@code DomainBase} when migrating from Datastore to SQL database */ +final class DomainBaseUtil { + + private DomainBaseUtil() {} + + /** + * Removes {@link google.registry.model.billing.BillingEvent.Recurring}, {@link + * google.registry.model.poll.PollMessage PollMessages} and {@link + * google.registry.model.host.HostResource name servers} from a Datastore {@link Entity} that + * represents an Ofy {@link google.registry.model.domain.DomainBase}. This breaks the cycle of + * foreign key constraints between these entity kinds, allowing {@code DomainBases} to be inserted + * into the SQL database. See {@link InitSqlPipeline} for a use case, where the full {@code + * DomainBases} are written again during the last stage of the pipeline. + * + *

The returned object may be in bad state. Specifically, {@link + * google.registry.model.eppcommon.StatusValue#INACTIVE} is not added after name servers are + * removed. This only impacts tests. + * + *

This operation is performed on an Datastore {@link Entity} instead of Ofy Java object + * because Objectify requires access to a Datastore service when converting an Ofy object to a + * Datastore {@code Entity}. If we insist on working with Objectify objects, we face a few + * unsatisfactory options: + * + *

    + *
  • Connect to our production Datastore, which incurs unnecessary security and code health + * risk. + *
  • Connect to a separate real Datastore instance, which is a waster and overkill. + *
  • Use an in-memory test Datastore, which is a project health risk in that the test + * Datastore would be added to Nomulus' production binary unless we create a separate + * project for this pipeline. + *
+ * + *

Given our use case, operating on Datastore entities is the best option. + * + * @throws IllegalArgumentException if input does not represent a DomainBase + */ + static Entity removeBillingAndPollAndHosts(Entity domainBase) { + checkNotNull(domainBase, "domainBase"); + checkArgument( + Objects.equals(domainBase.getKind(), "DomainBase"), + "Expecting DomainBase, got %s", + domainBase.getKind()); + Entity clone = domainBase.clone(); + clone.removeProperty("autorenewBillingEvent"); + clone.removeProperty("autorenewPollMessage"); + clone.removeProperty("deletePollMessage"); + clone.removeProperty("nsHosts"); + domainBase.getProperties().keySet().stream() + .filter(s -> s.startsWith("transferData.")) + .forEach(s -> clone.removeProperty(s)); + return clone; + } +} diff --git a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java new file mode 100644 index 000000000..d7e84a92a --- /dev/null +++ b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java @@ -0,0 +1,260 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.initsql; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.googlecode.objectify.Key; +import google.registry.backup.AppEngineEnvironment; +import google.registry.backup.VersionedEntity; +import google.registry.beam.initsql.BeamJpaModule.JpaTransactionManagerComponent; +import google.registry.beam.initsql.Transforms.RemoveDomainBaseForeignKeys; +import google.registry.beam.initsql.Transforms.SerializableSupplier; +import google.registry.model.billing.BillingEvent; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DomainBase; +import google.registry.model.domain.token.AllocationToken; +import google.registry.model.host.HostResource; +import google.registry.model.poll.PollMessage; +import google.registry.model.registrar.Registrar; +import google.registry.model.registrar.RegistrarContact; +import google.registry.model.registry.Registry; +import google.registry.model.reporting.HistoryEntry; +import google.registry.persistence.transaction.JpaTransactionManager; +import java.io.Serializable; +import java.util.Collection; +import java.util.Optional; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Wait; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.DateTime; + +/** + * A BEAM pipeline that populates a SQL database with data from a Datastore backup. + * + *

This pipeline migrates EPP resources and related entities that cross-reference each other. To + * avoid violating foreign key constraints, writes to SQL are ordered by entity kinds. In addition, + * the {@link DomainBase} kind is written twice (see details below). The write order is presented + * below. Although some kinds can be written concurrently, e.g. {@code ContactResource} and {@code + * RegistrarContact}, we do not expect any performance benefit since the limiting resource is the + * number of JDBC connections. Google internal users may refer to the design doc for more information. + * + *

    + *
  1. {@link Registry}: Assumes that {@code PremiumList} and {@code ReservedList} have been set + * up in the SQL database. + *
  2. {@link Registrar}: Logically depends on {@code Registry}, Foreign key not modeled yet. + *
  3. {@link ContactResource}: references {@code Registrar} + *
  4. {@link RegistrarContact}: references {@code Registrar}. + *
  5. Cleansed {@link DomainBase}: with references to {@code BillingEvent}, {@code Recurring}, + * {@code Cancellation} and {@code HostResource} removed, still references {@code Registrar} + * and {@code ContactResource}. The removal breaks circular Foreign Key references. + *
  6. {@link HostResource}: references {@code DomainBase}. + *
  7. {@link HistoryEntry}: maps to one of three SQL entity types and may reference {@code + * Registrar}, {@code ContactResource}, {@code HostResource}, and {@code DomainBase}. + *
  8. {@link AllocationToken}: references {@code HistoryEntry}. + *
  9. {@link BillingEvent.Recurring}: references {@code Registrar}, {@code DomainBase} and {@code + * HistoryEntry}. + *
  10. {@link BillingEvent.OneTime}: references {@code Registrar}, {@code DomainBase}, {@code + * BillingEvent.Recurring}, {@code HistoryEntry} and {@code AllocationToken}. + *
  11. {@link BillingEvent.Modification}: SQL model TBD. Will reference {@code Registrar}, {@code + * DomainBase} and {@code BillingEvent.OneTime}. + *
  12. {@link BillingEvent.Cancellation}: references {@code Registrar}, {@code DomainBase}, {@code + * BillingEvent.Recurring}, {@code BillingEvent.OneTime}, and {@code HistoryEntry}. + *
  13. {@link PollMessage}: references {@code Registrar}, {@code DomainBase}, {@code + * ContactResource}, {@code HostResource}, and {@code HistoryEntry}. + *
  14. {@link DomainBase}, original copy from Datastore. + *
+ */ +public class InitSqlPipeline implements Serializable { + + /** + * Datastore kinds to be written to the SQL database before the cleansed version of {@link + * DomainBase}. + */ + // TODO(weiminyu): include Registry.class when it is modeled in JPA. + private static final ImmutableList> PHASE_ONE_ORDERED = + ImmutableList.of(Registrar.class, ContactResource.class); + + /** + * Datastore kinds to be written to the SQL database after the cleansed version of {@link + * DomainBase}. + * + *

The following entities are missing from the list: + * + *

    + *
  • Those not modeled in JPA yet, e.g., {@code BillingEvent.Modification}. + *
  • Those waiting for sanitation, e.g., {@code HistoryEntry}, which would have duplicate keys + * after converting to SQL model. + *
  • Those that have foreign key constraints on the above. + *
+ */ + // TODO(weiminyu): add more entities when available. + private static final ImmutableList> PHASE_TWO_ORDERED = + ImmutableList.of(HostResource.class); + + private final InitSqlPipelineOptions options; + + private final Pipeline pipeline; + + private final SerializableFunction + jpaGetter; + + InitSqlPipeline(InitSqlPipelineOptions options) { + this.options = options; + pipeline = Pipeline.create(options); + jpaGetter = JpaTransactionManagerComponent::cloudSqlJpaTransactionManager; + } + + @VisibleForTesting + InitSqlPipeline(InitSqlPipelineOptions options, Pipeline pipeline) { + this.options = options; + this.pipeline = pipeline; + jpaGetter = JpaTransactionManagerComponent::localDbJpaTransactionManager; + } + + public PipelineResult run() { + setupPipeline(); + return pipeline.run(); + } + + @VisibleForTesting + void setupPipeline() { + PCollectionTuple datastoreSnapshot = + pipeline.apply( + "Load Datastore snapshot", + Transforms.loadDatastoreSnapshot( + options.getDatastoreExportDir(), + options.getCommitLogDir(), + DateTime.parse(options.getCommitLogStartTimestamp()), + DateTime.parse(options.getCommitLogEndTimestamp()), + ImmutableSet.builder() + .add("DomainBase") + .addAll(toKindStrings(PHASE_ONE_ORDERED)) + .addAll(toKindStrings(PHASE_TWO_ORDERED)) + .build())); + + // Set up the pipeline to write entity kinds from PHASE_ONE_ORDERED to SQL. Return a object + // that signals the completion of the phase. + PCollection blocker = + scheduleOnePhaseWrites(datastoreSnapshot, PHASE_ONE_ORDERED, Optional.empty(), null); + blocker = + writeToSql( + "DomainBase without circular foreign keys", + removeDomainBaseForeignKeys(datastoreSnapshot) + .apply("Wait on phase one", Wait.on(blocker))); + // Set up the pipeline to write entity kinds from PHASE_TWO_ORDERED to SQL. This phase won't + // start until all cleansed DomainBases have been written (started by line above). + scheduleOnePhaseWrites( + datastoreSnapshot, PHASE_TWO_ORDERED, Optional.of(blocker), "DomainBaseNoFkeys"); + } + + private PCollection removeDomainBaseForeignKeys( + PCollectionTuple datastoreSnapshot) { + PCollection domainBases = + datastoreSnapshot.get(Transforms.createTagForKind("DomainBase")); + return domainBases.apply( + "Remove circular foreign keys from DomainBase", + ParDo.of(new RemoveDomainBaseForeignKeys())); + } + + /** + * Sets up the pipeline to write entities in {@code entityClasses} to SQL. Entities are written + * one kind at a time based on each kind's position in {@code entityClasses}. Concurrency exists + * within each kind. + * + * @param datastoreSnapshot the Datastore snapshot of all data to be migrated to SQL + * @param entityClasses the entity types in write order + * @param blockingPCollection the pipeline stage that blocks this phase + * @param blockingTag description of the stage (if exists) that blocks this phase. Needed for + * generating unique transform ids + * @return the output {@code PCollection} from the writing of the last entity kind. Other parts of + * the pipeline can {@link Wait} on this object + */ + private PCollection scheduleOnePhaseWrites( + PCollectionTuple datastoreSnapshot, + Collection> entityClasses, + Optional> blockingPCollection, + String blockingTag) { + checkArgument(!entityClasses.isEmpty(), "Each phase must have at least one kind."); + ImmutableList> tags = + toKindStrings(entityClasses).stream() + .map(Transforms::createTagForKind) + .collect(ImmutableList.toImmutableList()); + + PCollection prev = blockingPCollection.orElse(null); + String prevTag = blockingTag; + for (TupleTag tag : tags) { + PCollection curr = datastoreSnapshot.get(tag); + if (prev != null) { + curr = curr.apply("Wait on " + prevTag, Wait.on(prev)); + } + prev = writeToSql(tag.getId(), curr); + prevTag = tag.getId(); + } + return prev; + } + + private PCollection writeToSql(String transformId, PCollection data) { + String credentialFileUrl = + options.getSqlCredentialUrlOverride() != null + ? options.getSqlCredentialUrlOverride() + : BackupPaths.getCloudSQLCredentialFilePatterns(options.getEnvironment()).get(0); + + return data.apply( + "Write to sql: " + transformId, + Transforms.writeToSql( + transformId, + options.getMaxConcurrentSqlWriters(), + options.getSqlWriteBatchSize(), + new JpaSupplierFactory(credentialFileUrl, jpaGetter))); + } + + private static ImmutableList toKindStrings(Collection> entityClasses) { + try (AppEngineEnvironment env = new AppEngineEnvironment()) { + return entityClasses.stream().map(Key::getKind).collect(ImmutableList.toImmutableList()); + } + } + + static class JpaSupplierFactory implements SerializableSupplier { + private static final long serialVersionUID = 1L; + + private String credentialFileUrl; + private SerializableFunction jpaGetter; + + JpaSupplierFactory( + String credentialFileUrl, + SerializableFunction jpaGetter) { + this.credentialFileUrl = credentialFileUrl; + this.jpaGetter = jpaGetter; + } + + @Override + public JpaTransactionManager get() { + return jpaGetter.apply( + DaggerBeamJpaModule_JpaTransactionManagerComponent.builder() + .beamJpaModule(new BeamJpaModule(credentialFileUrl)) + .build()); + } + } +} diff --git a/core/src/main/java/google/registry/beam/initsql/InitSqlPipelineOptions.java b/core/src/main/java/google/registry/beam/initsql/InitSqlPipelineOptions.java new file mode 100644 index 000000000..9a540d5e5 --- /dev/null +++ b/core/src/main/java/google/registry/beam/initsql/InitSqlPipelineOptions.java @@ -0,0 +1,84 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.initsql; + +import javax.annotation.Nullable; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.Validation; + +/** Pipeline options for {@link InitSqlPipeline} */ +public interface InitSqlPipelineOptions extends GcpOptions { + + @Description( + "Overrides the URL to the SQL credential file. " + "Required if environment is not provided.") + @Nullable + String getSqlCredentialUrlOverride(); + + void setSqlCredentialUrlOverride(String credentialUrlOverride); + + @Description("The root directory of the export to load.") + String getDatastoreExportDir(); + + void setDatastoreExportDir(String datastoreExportDir); + + @Description("The directory that contains all CommitLog files.") + String getCommitLogDir(); + + void setCommitLogDir(String commitLogDir); + + @Description("The earliest CommitLogs to load, in ISO8601 format.") + @Validation.Required + String getCommitLogStartTimestamp(); + + void setCommitLogStartTimestamp(String commitLogStartTimestamp); + + @Description("The latest CommitLogs to load, in ISO8601 format.") + @Validation.Required + String getCommitLogEndTimestamp(); + + void setCommitLogEndTimestamp(String commitLogEndTimestamp); + + @Description( + "The deployed environment, alpha, crash, sandbox, or production. " + + "Not required only if sqlCredentialUrlOverride is provided.") + @Nullable + String getEnvironment(); + + void setEnvironment(String environment); + + @Description( + "The maximum JDBC connection pool size on a VM. " + + "This value should be equal to or greater than the number of cores on the VM.") + @Default.Integer(4) + int getJdbcMaxPoolSize(); + + void setJdbcMaxPoolSize(int jdbcMaxPoolSize); + + @Description( + "A hint to the pipeline runner of the maximum number of concurrent SQL writers to create. " + + "Note that multiple writers may run on the same VM and share the connection pool.") + @Default.Integer(4) + int getMaxConcurrentSqlWriters(); + + void setMaxConcurrentSqlWriters(int maxConcurrentSqlWriters); + + @Description("The number of entities to be written to the SQL database in one transaction.") + @Default.Integer(20) + int getSqlWriteBatchSize(); + + void setSqlWriteBatchSize(int sqlWriteBatchSize); +} diff --git a/core/src/main/java/google/registry/beam/initsql/README.md b/core/src/main/java/google/registry/beam/initsql/README.md index 810fa4e50..19c54200e 100644 --- a/core/src/main/java/google/registry/beam/initsql/README.md +++ b/core/src/main/java/google/registry/beam/initsql/README.md @@ -1,3 +1,17 @@ ## Summary -This package contains a BEAM pipeline that populates a Cloud SQL database from a Datastore backup. +This package contains a BEAM pipeline that populates a Cloud SQL database from a +Datastore backup. The pipeline uses an unsynchronized Datastore export and +overlapping CommitLogs generated by the Nomulus server to recreate a consistent +Datastore snapshot, and writes the data to a Cloud SQL instance. + +## Pipeline Visualization + +The golden flow graph of the InitSqlPipeline is saved both as a text-base +[DOT file](../../../../../../test/resources/google/registry/beam/initsql/pipeline_golden.dot) +and a +[.png file](../../../../../../test/resources/google/registry/beam/initsql/pipeline_golden.png). +A test compares the flow graph of the current pipeline with the golden graph, +and will fail if changes are detected. When this happens, run the Gradle task +':core:updateInitSqlPipelineGraph' to update the golden files and review the +changes. diff --git a/core/src/main/java/google/registry/beam/initsql/Transforms.java b/core/src/main/java/google/registry/beam/initsql/Transforms.java index 7f302cfb3..3011cc4d4 100644 --- a/core/src/main/java/google/registry/beam/initsql/Transforms.java +++ b/core/src/main/java/google/registry/beam/initsql/Transforms.java @@ -17,8 +17,10 @@ package google.registry.beam.initsql; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Throwables.throwIfUnchecked; import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp; import static google.registry.beam.initsql.BackupPaths.getExportFilePatterns; +import static google.registry.persistence.JpaRetries.isFailedTxnRetriable; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.setJpaTm; import static google.registry.util.DateTimeUtils.START_OF_TIME; @@ -29,14 +31,16 @@ import static org.apache.beam.sdk.values.TypeDescriptors.kvs; import static org.apache.beam.sdk.values.TypeDescriptors.strings; import avro.shaded.com.google.common.collect.Iterators; +import com.google.appengine.api.datastore.Entity; +import com.google.appengine.api.datastore.EntityTranslator; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; import google.registry.backup.AppEngineEnvironment; import google.registry.backup.CommitLogImports; import google.registry.backup.VersionedEntity; +import google.registry.model.domain.DomainBase; import google.registry.model.ofy.ObjectifyService; import google.registry.model.ofy.Ofy; import google.registry.persistence.transaction.JpaTransactionManager; @@ -49,7 +53,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; -import javax.persistence.OptimisticLockException; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.FileIO; @@ -70,7 +73,6 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; @@ -225,7 +227,7 @@ public final class Transforms { return new PTransform, PCollection>() { @Override public PCollection expand(PCollection input) { - return input.apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)); + return input.apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)); } }; } @@ -263,6 +265,11 @@ public final class Transforms { /** * Returns a {@link PTransform} that writes a {@link PCollection} of entities to a SQL database. + * and outputs an empty {@code PCollection}. This allows other operations to {@link + * org.apache.beam.sdk.transforms.Wait wait} for the completion of this transform. + * + *

Errors are handled according to the pipeline runner's default policy. As part of a one-time + * job, we will not add features unless proven necessary. * * @param transformId a unique ID for an instance of the returned transform * @param maxWriters the max number of concurrent writes to SQL, which also determines the max @@ -270,22 +277,21 @@ public final class Transforms { * @param batchSize the number of entities to write in each operation * @param jpaSupplier supplier of a {@link JpaTransactionManager} */ - public static PTransform, PDone> writeToSql( + public static PTransform, PCollection> writeToSql( String transformId, int maxWriters, int batchSize, SerializableSupplier jpaSupplier) { - return new PTransform, PDone>() { + return new PTransform, PCollection>() { @Override - public PDone expand(PCollection input) { - input + public PCollection expand(PCollection input) { + return input .apply( "Shard data for " + transformId, MapElements.into(kvs(integers(), TypeDescriptor.of(VersionedEntity.class))) .via(ve -> KV.of(ThreadLocalRandom.current().nextInt(maxWriters), ve))) .apply("Batch output by shard " + transformId, GroupIntoBatches.ofSize(batchSize)) .apply("Write in batch for " + transformId, ParDo.of(new SqlBatchWriter(jpaSupplier))); - return PDone.in(input.getPipeline()); } }; } @@ -397,8 +403,10 @@ public final class Transforms { public void setup() { sleeper = new SystemSleeper(); - ObjectifyService.initOfy(); - ofy = ObjectifyService.ofy(); + try (AppEngineEnvironment env = new AppEngineEnvironment()) { + ObjectifyService.initOfy(); + ofy = ObjectifyService.ofy(); + } synchronized (SqlBatchWriter.class) { if (instanceCount == 0) { @@ -444,7 +452,10 @@ public final class Transforms { runnable.run(); return; } catch (Throwable throwable) { - throwIfNotCausedBy(throwable, OptimisticLockException.class); + if (!isFailedTxnRetriable(throwable)) { + throwIfUnchecked(throwable); + throw new RuntimeException(throwable); + } int sleepMillis = (1 << attempt) * initialDelayMillis; int jitter = ThreadLocalRandom.current().nextInt((int) (sleepMillis * jitterRatio)) @@ -453,21 +464,28 @@ public final class Transforms { } } } + } - /** - * Rethrows {@code throwable} if it is not (and does not have a cause of) {@code causeType}; - * otherwise returns with no side effects. - */ - private void throwIfNotCausedBy(Throwable throwable, Class causeType) { - Throwable t = throwable; - while (t != null) { - if (causeType.isInstance(t)) { - return; - } - t = t.getCause(); - } - Throwables.throwIfUnchecked(t); - throw new RuntimeException(t); + /** + * Removes BillingEvents, {@link google.registry.model.poll.PollMessage PollMessages} and {@link + * google.registry.model.host.HostResource} from a {@link DomainBase}. These are circular foreign + * key constraints that prevent migration of {@code DomainBase} to SQL databases. + * + *

See {@link InitSqlPipeline} for more information. + */ + static class RemoveDomainBaseForeignKeys extends DoFn { + + @ProcessElement + public void processElement( + @Element VersionedEntity domainBase, OutputReceiver out) { + checkArgument( + domainBase.getEntity().isPresent(), "Unexpected delete entity %s", domainBase.key()); + Entity outputEntity = + DomainBaseUtil.removeBillingAndPollAndHosts(domainBase.getEntity().get()); + out.output( + VersionedEntity.from( + domainBase.commitTimeMills(), + EntityTranslator.convertToPb(outputEntity).toByteArray())); } } } diff --git a/core/src/main/java/google/registry/persistence/JpaRetries.java b/core/src/main/java/google/registry/persistence/JpaRetries.java new file mode 100644 index 000000000..e9e210548 --- /dev/null +++ b/core/src/main/java/google/registry/persistence/JpaRetries.java @@ -0,0 +1,58 @@ +// Copyright 2019 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.persistence; + +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableSet; +import java.sql.SQLException; +import java.util.function.Predicate; +import javax.persistence.OptimisticLockException; + +/** Helpers for identifying retriable database operations. */ +public final class JpaRetries { + + private JpaRetries() {} + + private static final ImmutableSet RETRIABLE_TXN_SQL_STATE = + ImmutableSet.of( + "40001", // serialization_failure + "40P01", // deadlock_detected, PSQL-specific + "55006", // object_in_use, PSQL and DB2 + "55P03" // lock_not_available, PSQL-specific + ); + + private static final Predicate RETRIABLE_TXN_PREDICATE = + Predicates.or( + OptimisticLockException.class::isInstance, + e -> + e instanceof SQLException + && RETRIABLE_TXN_SQL_STATE.contains(((SQLException) e).getSQLState())); + + public static boolean isFailedTxnRetriable(Throwable throwable) { + Throwable t = throwable; + while (t != null) { + if (RETRIABLE_TXN_PREDICATE.test(t)) { + return true; + } + t = t.getCause(); + } + return false; + } + + public static boolean isFailedQueryRetriable(Throwable throwable) { + // TODO(weiminyu): check for more error codes. + return isFailedTxnRetriable(throwable); + } +} diff --git a/core/src/test/java/google/registry/beam/initsql/BeamJpaExtension.java b/core/src/test/java/google/registry/beam/initsql/BeamJpaExtension.java new file mode 100644 index 000000000..bec44f37a --- /dev/null +++ b/core/src/test/java/google/registry/beam/initsql/BeamJpaExtension.java @@ -0,0 +1,72 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.initsql; + +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.function.Supplier; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.testcontainers.containers.JdbcDatabaseContainer; + +/** + * Helpers for setting up {@link BeamJpaModule} in tests. + * + *

This extension is often used with a Database container and/or temporary file folder. User must + * make sure that all dependent extensions are set up before this extension, e.g., by assigning + * {@link org.junit.jupiter.api.Order orders}. + */ +public final class BeamJpaExtension implements BeforeEachCallback, AfterEachCallback, Serializable { + + private final transient JdbcDatabaseContainer database; + private final transient Supplier credentialPathSupplier; + private transient BeamJpaModule beamJpaModule; + + private File credentialFile; + + public BeamJpaExtension(Supplier credentialPathSupplier, JdbcDatabaseContainer database) { + this.database = database; + this.credentialPathSupplier = credentialPathSupplier; + } + + public File getCredentialFile() { + return credentialFile; + } + + public BeamJpaModule getBeamJpaModule() { + if (beamJpaModule != null) { + return beamJpaModule; + } + return beamJpaModule = new BeamJpaModule(credentialFile.getAbsolutePath()); + } + + @Override + public void beforeEach(ExtensionContext context) throws IOException { + credentialFile = Files.createFile(credentialPathSupplier.get()).toFile(); + new PrintStream(credentialFile) + .printf("%s %s %s", database.getJdbcUrl(), database.getUsername(), database.getPassword()) + .close(); + } + + @Override + public void afterEach(ExtensionContext context) { + credentialFile.delete(); + } +} diff --git a/core/src/test/java/google/registry/beam/initsql/BeamJpaModuleTest.java b/core/src/test/java/google/registry/beam/initsql/BeamJpaModuleTest.java index d7bebc3ee..1b4ad8a5e 100644 --- a/core/src/test/java/google/registry/beam/initsql/BeamJpaModuleTest.java +++ b/core/src/test/java/google/registry/beam/initsql/BeamJpaModuleTest.java @@ -19,12 +19,10 @@ import static com.google.common.truth.Truth.assertThat; import google.registry.persistence.NomulusPostgreSql; import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.testing.DatastoreEntityExtension; -import java.io.File; -import java.io.IOException; -import java.io.PrintStream; +import java.nio.file.Path; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.junit.jupiter.api.extension.RegisterExtension; @@ -35,31 +33,28 @@ import org.testcontainers.junit.jupiter.Testcontainers; /** Unit tests for {@link BeamJpaModule}. */ @Testcontainers -public class BeamJpaModuleTest { - - @Container - public PostgreSQLContainer database = new PostgreSQLContainer(NomulusPostgreSql.getDockerTag()); +class BeamJpaModuleTest { @RegisterExtension - public DatastoreEntityExtension datastoreEntityExtension = new DatastoreEntityExtension(); + final DatastoreEntityExtension datastoreEntityExtension = new DatastoreEntityExtension(); - @TempDir File tempFolder; + @Container + final PostgreSQLContainer database = new PostgreSQLContainer(NomulusPostgreSql.getDockerTag()); - private File credentialFile; + @SuppressWarnings("WeakerAccess") + @TempDir + Path tmpDir; - @BeforeEach - public void beforeEach() throws IOException { - credentialFile = new File(tempFolder, "credential"); - new PrintStream(credentialFile) - .printf("%s %s %s", database.getJdbcUrl(), database.getUsername(), database.getPassword()) - .close(); - } + @RegisterExtension + @Order(Order.DEFAULT + 1) + final BeamJpaExtension beamJpaExtension = + new BeamJpaExtension(() -> tmpDir.resolve("credential.dat"), database); @Test void getJpaTransactionManager_local() { JpaTransactionManager jpa = DaggerBeamJpaModule_JpaTransactionManagerComponent.builder() - .beamJpaModule(new BeamJpaModule(credentialFile.getAbsolutePath())) + .beamJpaModule(beamJpaExtension.getBeamJpaModule()) .build() .localDbJpaTransactionManager(); assertThat( @@ -80,7 +75,7 @@ public class BeamJpaModuleTest { */ @Test @EnabledIfSystemProperty(named = "test.gcp_integration.env", matches = "\\S+") - public void getJpaTransactionManager_cloudSql_authRequired() { + void getJpaTransactionManager_cloudSql_authRequired() { String environmentName = System.getProperty("test.gcp_integration.env"); FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create()); JpaTransactionManager jpa = diff --git a/core/src/test/java/google/registry/beam/initsql/DomainBaseUtilTest.java b/core/src/test/java/google/registry/beam/initsql/DomainBaseUtilTest.java new file mode 100644 index 000000000..d4217f7e4 --- /dev/null +++ b/core/src/test/java/google/registry/beam/initsql/DomainBaseUtilTest.java @@ -0,0 +1,223 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.initsql; + +import static google.registry.model.ImmutableObjectSubject.assertAboutImmutableObjects; +import static google.registry.model.ofy.ObjectifyService.ofy; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.testing.DatastoreHelper.cloneAndSetAutoTimestamps; +import static google.registry.testing.DatastoreHelper.createTld; +import static google.registry.testing.DatastoreHelper.persistResource; +import static google.registry.util.DateTimeUtils.START_OF_TIME; +import static org.junit.Assert.assertThrows; + +import com.google.appengine.api.datastore.Entity; +import com.google.common.collect.ImmutableSet; +import com.googlecode.objectify.Key; +import google.registry.model.billing.BillingEvent; +import google.registry.model.billing.BillingEvent.OneTime; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DesignatedContact; +import google.registry.model.domain.DomainAuthInfo; +import google.registry.model.domain.DomainBase; +import google.registry.model.domain.GracePeriod; +import google.registry.model.domain.launch.LaunchNotice; +import google.registry.model.domain.rgp.GracePeriodStatus; +import google.registry.model.domain.secdns.DelegationSignerData; +import google.registry.model.eppcommon.AuthInfo.PasswordAuth; +import google.registry.model.eppcommon.StatusValue; +import google.registry.model.eppcommon.Trid; +import google.registry.model.host.HostResource; +import google.registry.model.ofy.Ofy; +import google.registry.model.poll.PollMessage; +import google.registry.model.reporting.HistoryEntry; +import google.registry.model.transfer.DomainTransferData; +import google.registry.model.transfer.TransferStatus; +import google.registry.persistence.VKey; +import google.registry.testing.AppEngineRule; +import google.registry.testing.DatastoreHelper; +import google.registry.testing.FakeClock; +import google.registry.testing.InjectRule; +import org.joda.time.Instant; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** Unit tests for {@link DomainBaseUtil}. */ +public class DomainBaseUtilTest { + + private final FakeClock fakeClock = new FakeClock(Instant.now()); + + private DomainBase domain; + private Entity domainEntity; + private Key oneTimeBillKey; + private Key recurringBillKey; + private Key domainKey; + + @RegisterExtension + AppEngineRule appEngineRule = + AppEngineRule.builder().withDatastore().withClock(fakeClock).build(); + + @RegisterExtension InjectRule injectRule = new InjectRule(); + + @BeforeEach + void beforeEach() { + injectRule.setStaticField(Ofy.class, "clock", fakeClock); + createTld("com"); + domainKey = Key.create(null, DomainBase.class, "4-COM"); + VKey hostKey = + persistResource( + new HostResource.Builder() + .setHostName("ns1.example.com") + .setSuperordinateDomain(VKey.from(domainKey)) + .setRepoId("1-COM") + .build()) + .createVKey(); + VKey contact1Key = + persistResource( + new ContactResource.Builder() + .setContactId("contact_id1") + .setRepoId("2-COM") + .build()) + .createVKey(); + VKey contact2Key = + persistResource( + new ContactResource.Builder() + .setContactId("contact_id2") + .setRepoId("3-COM") + .build()) + .createVKey(); + Key historyEntryKey = + Key.create(persistResource(new HistoryEntry.Builder().setParent(domainKey).build())); + oneTimeBillKey = Key.create(historyEntryKey, BillingEvent.OneTime.class, 1); + recurringBillKey = Key.create(historyEntryKey, BillingEvent.Recurring.class, 2); + Key autorenewPollKey = + Key.create(historyEntryKey, PollMessage.Autorenew.class, 3); + Key onetimePollKey = + Key.create(historyEntryKey, PollMessage.OneTime.class, 1); + // Set up a new persisted domain entity. + domain = + persistResource( + cloneAndSetAutoTimestamps( + new DomainBase.Builder() + .setDomainName("example.com") + .setRepoId("4-COM") + .setCreationClientId("a registrar") + .setLastEppUpdateTime(fakeClock.nowUtc()) + .setLastEppUpdateClientId("AnotherRegistrar") + .setLastTransferTime(fakeClock.nowUtc()) + .setStatusValues( + ImmutableSet.of( + StatusValue.CLIENT_DELETE_PROHIBITED, + StatusValue.SERVER_DELETE_PROHIBITED, + StatusValue.SERVER_TRANSFER_PROHIBITED, + StatusValue.SERVER_UPDATE_PROHIBITED, + StatusValue.SERVER_RENEW_PROHIBITED, + StatusValue.SERVER_HOLD)) + .setRegistrant(contact1Key) + .setContacts( + ImmutableSet.of( + DesignatedContact.create(DesignatedContact.Type.ADMIN, contact2Key))) + .setNameservers(ImmutableSet.of(hostKey)) + .setSubordinateHosts(ImmutableSet.of("ns1.example.com")) + .setPersistedCurrentSponsorClientId("losing") + .setRegistrationExpirationTime(fakeClock.nowUtc().plusYears(1)) + .setAuthInfo(DomainAuthInfo.create(PasswordAuth.create("password"))) + .setDsData( + ImmutableSet.of(DelegationSignerData.create(1, 2, 3, new byte[] {0, 1, 2}))) + .setLaunchNotice( + LaunchNotice.create("tcnid", "validatorId", START_OF_TIME, START_OF_TIME)) + .setTransferData( + new DomainTransferData.Builder() + .setGainingClientId("gaining") + .setLosingClientId("losing") + .setPendingTransferExpirationTime(fakeClock.nowUtc()) + .setServerApproveEntities( + ImmutableSet.of( + VKey.from(oneTimeBillKey), + VKey.from(recurringBillKey), + VKey.from(autorenewPollKey))) + .setServerApproveBillingEvent(VKey.from(oneTimeBillKey)) + .setServerApproveAutorenewEvent(VKey.from(recurringBillKey)) + .setServerApproveAutorenewPollMessage(VKey.from(autorenewPollKey)) + .setTransferRequestTime(fakeClock.nowUtc().plusDays(1)) + .setTransferStatus(TransferStatus.SERVER_APPROVED) + .setTransferRequestTrid(Trid.create("client-trid", "server-trid")) + .build()) + .setDeletePollMessage(onetimePollKey) + .setAutorenewBillingEvent(recurringBillKey) + .setAutorenewPollMessage(autorenewPollKey) + .setSmdId("smdid") + .addGracePeriod( + GracePeriod.create( + GracePeriodStatus.ADD, + fakeClock.nowUtc().plusDays(1), + "registrar", + null)) + .build())); + domainEntity = tm().transact(() -> ofy().toEntity(domain)); + } + + @Test + void removeBillingAndPollAndHosts_allFkeysPresent() { + DomainBase domainTransformedByOfy = + domain + .asBuilder() + .setAutorenewBillingEvent(null) + .setAutorenewPollMessage(null) + .setNameservers(ImmutableSet.of()) + .setDeletePollMessage(null) + .setTransferData(null) + .build(); + DomainBase domainTransformedByUtil = + (DomainBase) ofy().toPojo(DomainBaseUtil.removeBillingAndPollAndHosts(domainEntity)); + // Compensates for the missing INACTIVE status. + domainTransformedByUtil = domainTransformedByUtil.asBuilder().build(); + assertAboutImmutableObjects() + .that(domainTransformedByUtil) + .isEqualExceptFields(domainTransformedByOfy, "revisions"); + } + + @Test + void removeBillingAndPollAndHosts_noFkeysPresent() { + DomainBase domainWithoutFKeys = + domain + .asBuilder() + .setAutorenewBillingEvent(null) + .setAutorenewPollMessage(null) + .setNameservers(ImmutableSet.of()) + .setDeletePollMessage(null) + .setTransferData(null) + .build(); + Entity entityWithoutFkeys = tm().transact(() -> ofy().toEntity(domainWithoutFKeys)); + DomainBase domainTransformedByUtil = + (DomainBase) ofy().toPojo(DomainBaseUtil.removeBillingAndPollAndHosts(entityWithoutFkeys)); + // Compensates for the missing INACTIVE status. + domainTransformedByUtil = domainTransformedByUtil.asBuilder().build(); + assertAboutImmutableObjects() + .that(domainTransformedByUtil) + .isEqualExceptFields(domainWithoutFKeys, "revisions"); + } + + @Test + void removeBillingAndPollAndHosts_notDomainBase() { + Entity contactEntity = + tm().transact(() -> ofy().toEntity(DatastoreHelper.newContactResource("contact"))); + + assertThrows( + IllegalArgumentException.class, + () -> DomainBaseUtil.removeBillingAndPollAndHosts(contactEntity)); + } +} diff --git a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineGraphTest.java b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineGraphTest.java new file mode 100644 index 000000000..a62a01124 --- /dev/null +++ b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineGraphTest.java @@ -0,0 +1,67 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.initsql; + +import static google.registry.testing.truth.TextDiffSubject.assertWithMessageAboutUrlSource; + +import com.google.common.io.Resources; +import google.registry.beam.TestPipelineExtension; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.net.URL; +import org.apache.beam.runners.core.construction.renderer.PipelineDotRenderer; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** Manages visualization of {@link InitSqlPipeline}. */ +class InitSqlPipelineGraphTest { + + private static final String GOLDEN_DOT_FILE = "pipeline_golden.dot"; + + private static final String[] OPTIONS_ARGS = + new String[] { + "--commitLogStartTimestamp=2000-01-01TZ", + "--commitLogEndTimestamp=2000-01-02TZ", + "--datastoreExportDir=/somedir", + "--commitLogDir=/someotherdir", + "--environment=alpha" + }; + + private static final transient InitSqlPipelineOptions options = + PipelineOptionsFactory.fromArgs(OPTIONS_ARGS) + .withValidation() + .as(InitSqlPipelineOptions.class); + + @RegisterExtension + final transient TestPipelineExtension testPipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(false); + + @Test + public void createPipeline_compareGraph() throws IOException { + new InitSqlPipeline(options, testPipeline).setupPipeline(); + String dotString = PipelineDotRenderer.toDotString(testPipeline); + URL goldenDotUrl = Resources.getResource(InitSqlPipelineGraphTest.class, GOLDEN_DOT_FILE); + File outputFile = new File(new File(goldenDotUrl.getFile()).getParent(), "pipeline_curr.dot"); + try (PrintStream ps = new PrintStream(outputFile)) { + ps.print(dotString); + } + assertWithMessageAboutUrlSource( + "InitSqlPipeline graph changed. Run :core:updateInitSqlPipelineGraph to update.") + .that(outputFile.toURI().toURL()) + .hasSameContentAs(goldenDotUrl); + } +} diff --git a/core/src/test/java/google/registry/model/EppResourceTestUtils.java b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineOptionsTest.java similarity index 52% rename from core/src/test/java/google/registry/model/EppResourceTestUtils.java rename to core/src/test/java/google/registry/beam/initsql/InitSqlPipelineOptionsTest.java index 86ff2ee05..299dabdc9 100644 --- a/core/src/test/java/google/registry/model/EppResourceTestUtils.java +++ b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineOptionsTest.java @@ -12,24 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -package google.registry.model; +package google.registry.beam.initsql; -import static com.google.common.truth.Truth.assertThat; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.jupiter.api.Test; -import java.util.Objects; +/** Unit tests for {@link google.registry.beam.initsql.InitSqlPipelineOptions}. * */ +public class InitSqlPipelineOptionsTest { -/** Test helpers for {@link EppResource}. */ -public final class EppResourceTestUtils { - - private EppResourceTestUtils() {} - - public static void assertEqualsIgnoreLastUpdateTime( - E actual, E expected) { - if (Objects.equals(actual, expected)) { - return; - } - actual = (E) actual.asBuilder().build(); - actual.updateTimestamp = expected.getUpdateTimestamp(); - assertThat(actual).isEqualTo(expected); + @Test + void registerToValidate() { + PipelineOptionsFactory.register(InitSqlPipelineOptions.class); } } diff --git a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java new file mode 100644 index 000000000..f883f291d --- /dev/null +++ b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java @@ -0,0 +1,283 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.initsql; + +import static com.google.common.truth.Truth.assertThat; +import static google.registry.model.ImmutableObjectSubject.assertAboutImmutableObjects; +import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static google.registry.testing.DatastoreHelper.newRegistry; +import static google.registry.testing.DatastoreHelper.persistResource; +import static google.registry.util.DateTimeUtils.START_OF_TIME; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.googlecode.objectify.Key; +import google.registry.backup.AppEngineEnvironment; +import google.registry.beam.TestPipelineExtension; +import google.registry.model.billing.BillingEvent; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DesignatedContact; +import google.registry.model.domain.DomainAuthInfo; +import google.registry.model.domain.DomainBase; +import google.registry.model.domain.GracePeriod; +import google.registry.model.domain.launch.LaunchNotice; +import google.registry.model.domain.rgp.GracePeriodStatus; +import google.registry.model.domain.secdns.DelegationSignerData; +import google.registry.model.eppcommon.AuthInfo.PasswordAuth; +import google.registry.model.eppcommon.StatusValue; +import google.registry.model.eppcommon.Trid; +import google.registry.model.host.HostResource; +import google.registry.model.ofy.Ofy; +import google.registry.model.poll.PollMessage; +import google.registry.model.registrar.Registrar; +import google.registry.model.registry.Registry; +import google.registry.model.reporting.HistoryEntry; +import google.registry.model.transfer.DomainTransferData; +import google.registry.model.transfer.TransferStatus; +import google.registry.persistence.VKey; +import google.registry.persistence.transaction.JpaTestRules; +import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestRule; +import google.registry.testing.AppEngineRule; +import google.registry.testing.DatastoreEntityExtension; +import google.registry.testing.FakeClock; +import google.registry.testing.InjectRule; +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.joda.time.DateTime; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +/** Unit tests for {@link InitSqlPipeline}. */ +class InitSqlPipelineTest { + private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z"); + + private static final ImmutableList> ALL_KINDS = + ImmutableList.of( + Registry.class, + Registrar.class, + ContactResource.class, + HostResource.class, + DomainBase.class, + HistoryEntry.class); + + private transient FakeClock fakeClock = new FakeClock(START_TIME); + + @RegisterExtension + @Order(Order.DEFAULT - 1) + final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension(); + + @RegisterExtension final transient InjectRule injectRule = new InjectRule(); + + @SuppressWarnings("WeakerAccess") + @TempDir + transient Path tmpDir; + + @RegisterExtension + final transient TestPipelineExtension testPipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); + + @RegisterExtension + final transient JpaIntegrationTestRule database = + new JpaTestRules.Builder().withClock(fakeClock).buildIntegrationTestRule(); + + // Must not be transient! + @RegisterExtension + @Order(Order.DEFAULT + 1) + final BeamJpaExtension beamJpaExtension = + new BeamJpaExtension(() -> tmpDir.resolve("credential.dat"), database.getDatabase()); + + private File exportRootDir; + private File exportDir; + private File commitLogDir; + + private transient Registrar registrar1; + private transient Registrar registrar2; + private transient DomainBase domain; + private transient ContactResource contact1; + private transient ContactResource contact2; + private transient HostResource hostResource; + + private transient HistoryEntry historyEntry; + + @BeforeEach + public void beforeEach() throws Exception { + try (BackupTestStore store = new BackupTestStore(fakeClock)) { + injectRule.setStaticField(Ofy.class, "clock", fakeClock); + exportRootDir = Files.createDirectory(tmpDir.resolve("exports")).toFile(); + + persistResource(newRegistry("com", "COM")); + registrar1 = persistResource(AppEngineRule.makeRegistrar1()); + registrar2 = persistResource(AppEngineRule.makeRegistrar2()); + Key domainKey = Key.create(null, DomainBase.class, "4-COM"); + hostResource = + persistResource( + new HostResource.Builder() + .setHostName("ns1.example.com") + .setSuperordinateDomain(VKey.from(domainKey)) + .setRepoId("1-COM") + .setCreationClientId(registrar1.getClientId()) + .setPersistedCurrentSponsorClientId(registrar2.getClientId()) + .build()); + contact1 = + persistResource( + new ContactResource.Builder() + .setContactId("contact_id1") + .setRepoId("2-COM") + .setCreationClientId(registrar1.getClientId()) + .setPersistedCurrentSponsorClientId(registrar2.getClientId()) + .build()); + contact2 = + persistResource( + new ContactResource.Builder() + .setContactId("contact_id2") + .setRepoId("3-COM") + .setCreationClientId(registrar1.getClientId()) + .setPersistedCurrentSponsorClientId(registrar1.getClientId()) + .build()); + historyEntry = persistResource(new HistoryEntry.Builder().setParent(domainKey).build()); + Key historyEntryKey = Key.create(historyEntry); + Key oneTimeBillKey = + Key.create(historyEntryKey, BillingEvent.OneTime.class, 1); + Key recurringBillKey = + Key.create(historyEntryKey, BillingEvent.Recurring.class, 2); + Key autorenewPollKey = + Key.create(historyEntryKey, PollMessage.Autorenew.class, 3); + Key onetimePollKey = + Key.create(historyEntryKey, PollMessage.OneTime.class, 1); + domain = + persistResource( + new DomainBase.Builder() + .setDomainName("example.com") + .setRepoId("4-COM") + .setCreationClientId(registrar1.getClientId()) + .setLastEppUpdateTime(fakeClock.nowUtc()) + .setLastEppUpdateClientId(registrar2.getClientId()) + .setLastTransferTime(fakeClock.nowUtc()) + .setStatusValues( + ImmutableSet.of( + StatusValue.CLIENT_DELETE_PROHIBITED, + StatusValue.SERVER_DELETE_PROHIBITED, + StatusValue.SERVER_TRANSFER_PROHIBITED, + StatusValue.SERVER_UPDATE_PROHIBITED, + StatusValue.SERVER_RENEW_PROHIBITED, + StatusValue.SERVER_HOLD)) + .setRegistrant(contact1.createVKey()) + .setContacts( + ImmutableSet.of( + DesignatedContact.create( + DesignatedContact.Type.ADMIN, contact2.createVKey()))) + .setNameservers(ImmutableSet.of(hostResource.createVKey())) + .setSubordinateHosts(ImmutableSet.of("ns1.example.com")) + .setPersistedCurrentSponsorClientId(registrar2.getClientId()) + .setRegistrationExpirationTime(fakeClock.nowUtc().plusYears(1)) + .setAuthInfo(DomainAuthInfo.create(PasswordAuth.create("password"))) + .setDsData( + ImmutableSet.of(DelegationSignerData.create(1, 2, 3, new byte[] {0, 1, 2}))) + .setLaunchNotice( + LaunchNotice.create("tcnid", "validatorId", START_OF_TIME, START_OF_TIME)) + .setTransferData( + new DomainTransferData.Builder() + .setGainingClientId(registrar1.getClientId()) + .setLosingClientId(registrar2.getClientId()) + .setPendingTransferExpirationTime(fakeClock.nowUtc()) + .setServerApproveEntities( + ImmutableSet.of( + VKey.from(oneTimeBillKey), + VKey.from(recurringBillKey), + VKey.from(autorenewPollKey))) + .setServerApproveBillingEvent(VKey.from(oneTimeBillKey)) + .setServerApproveAutorenewEvent(VKey.from(recurringBillKey)) + .setServerApproveAutorenewPollMessage(VKey.from(autorenewPollKey)) + .setTransferRequestTime(fakeClock.nowUtc().plusDays(1)) + .setTransferStatus(TransferStatus.SERVER_APPROVED) + .setTransferRequestTrid(Trid.create("client-trid", "server-trid")) + .build()) + .setDeletePollMessage(onetimePollKey) + .setAutorenewBillingEvent(recurringBillKey) + .setAutorenewPollMessage(autorenewPollKey) + .setSmdId("smdid") + .addGracePeriod( + GracePeriod.create( + GracePeriodStatus.ADD, fakeClock.nowUtc().plusDays(1), "registrar", null)) + .build()); + exportDir = store.export(exportRootDir.getAbsolutePath(), ALL_KINDS, ImmutableSet.of()); + commitLogDir = Files.createDirectory(tmpDir.resolve("commits")).toFile(); + } + } + + @Test + public void runPipeline() { + InitSqlPipelineOptions options = + PipelineOptionsFactory.fromArgs( + "--sqlCredentialUrlOverride=" + + beamJpaExtension.getCredentialFile().getAbsolutePath(), + "--commitLogStartTimestamp=" + START_TIME, + "--commitLogEndTimestamp=" + fakeClock.nowUtc().plusMillis(1), + "--datastoreExportDir=" + exportDir.getAbsolutePath(), + "--commitLogDir=" + commitLogDir.getAbsolutePath()) + .withValidation() + .as(InitSqlPipelineOptions.class); + InitSqlPipeline initSqlPipeline = new InitSqlPipeline(options, testPipeline); + initSqlPipeline.run().waitUntilFinish(); + try (AppEngineEnvironment env = new AppEngineEnvironment("test")) { + assertHostResourceEquals( + jpaTm().transact(() -> jpaTm().load(hostResource.createVKey())), hostResource); + assertThat(jpaTm().transact(() -> jpaTm().loadAll(Registrar.class))) + .comparingElementsUsing(immutableObjectCorrespondence("lastUpdateTime")) + .containsExactly(registrar1, registrar2); + assertThat(jpaTm().transact(() -> jpaTm().loadAll(ContactResource.class))) + .comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp")) + .containsExactly(contact1, contact2); + assertCleansedDomainEquals(jpaTm().transact(() -> jpaTm().load(domain.createVKey())), domain); + } + } + + private static void assertHostResourceEquals(HostResource actual, HostResource expected) { + assertAboutImmutableObjects() + .that(actual) + .isEqualExceptFields(expected, "superordinateDomain", "revisions", "updateTimestamp"); + assertThat(actual.getSuperordinateDomain().getSqlKey()) + .isEqualTo(expected.getSuperordinateDomain().getSqlKey()); + } + + private static void assertCleansedDomainEquals(DomainBase actual, DomainBase expected) { + assertAboutImmutableObjects() + .that(actual) + .isEqualExceptFields( + expected, + "adminContact", + "registrantContact", + "gracePeriods", + "dsData", + "allContacts", + "revisions", + "updateTimestamp", + "autorenewBillingEvent", + "autorenewPollMessage", + "deletePollMessage", + "nsHosts", + "transferData"); + assertThat(actual.getAdminContact().getSqlKey()) + .isEqualTo(expected.getAdminContact().getSqlKey()); + assertThat(actual.getRegistrant().getSqlKey()).isEqualTo(expected.getRegistrant().getSqlKey()); + // TODO(weiminyu): compare gracePeriods, allContacts and dsData, when SQL model supports them. + } +} diff --git a/core/src/test/java/google/registry/beam/initsql/WriteToSqlTest.java b/core/src/test/java/google/registry/beam/initsql/WriteToSqlTest.java index 8a98165e2..038d340f2 100644 --- a/core/src/test/java/google/registry/beam/initsql/WriteToSqlTest.java +++ b/core/src/test/java/google/registry/beam/initsql/WriteToSqlTest.java @@ -15,12 +15,14 @@ package google.registry.beam.initsql; import static com.google.common.truth.Truth.assertThat; +import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import com.google.appengine.api.datastore.Entity; import com.google.common.collect.ImmutableList; import google.registry.backup.VersionedEntity; import google.registry.beam.TestPipelineExtension; +import google.registry.model.ImmutableObject; import google.registry.model.contact.ContactResource; import google.registry.model.ofy.Ofy; import google.registry.model.registrar.Registrar; @@ -31,10 +33,7 @@ import google.registry.testing.DatastoreEntityExtension; import google.registry.testing.DatastoreHelper; import google.registry.testing.FakeClock; import google.registry.testing.InjectRule; -import java.io.File; -import java.io.PrintStream; import java.io.Serializable; -import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Collectors; import org.apache.beam.sdk.transforms.Create; @@ -52,16 +51,16 @@ class WriteToSqlTest implements Serializable { private final FakeClock fakeClock = new FakeClock(START_TIME); + @RegisterExtension + @Order(Order.DEFAULT - 1) + final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension(); + @RegisterExtension final transient InjectRule injectRule = new InjectRule(); @RegisterExtension final transient JpaIntegrationTestRule database = new JpaTestRules.Builder().withClock(fakeClock).buildIntegrationTestRule(); - @RegisterExtension - @Order(value = 1) - final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension(); - @SuppressWarnings("WeakerAccess") @TempDir transient Path tmpDir; @@ -70,9 +69,13 @@ class WriteToSqlTest implements Serializable { final transient TestPipelineExtension testPipeline = TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); - private ImmutableList contacts; + // Must not be transient! + @RegisterExtension + @Order(Order.DEFAULT + 1) + public final BeamJpaExtension beamJpaExtension = + new BeamJpaExtension(() -> tmpDir.resolve("credential.dat"), database.getDatabase()); - private File credentialFile; + private ImmutableList contacts; @BeforeEach void beforeEach() throws Exception { @@ -93,14 +96,6 @@ class WriteToSqlTest implements Serializable { } contacts = builder.build(); } - credentialFile = Files.createFile(tmpDir.resolve("credential.dat")).toFile(); - new PrintStream(credentialFile) - .printf( - "%s %s %s", - database.getDatabaseUrl(), - database.getDatabaseUsername(), - database.getDatabasePassword()) - .close(); } @Test @@ -119,14 +114,18 @@ class WriteToSqlTest implements Serializable { 4, () -> DaggerBeamJpaModule_JpaTransactionManagerComponent.builder() - .beamJpaModule(new BeamJpaModule(credentialFile.getAbsolutePath())) + .beamJpaModule(beamJpaExtension.getBeamJpaModule()) .build() .localDbJpaTransactionManager())); testPipeline.run().waitUntilFinish(); ImmutableList sqlContacts = jpaTm().transact(() -> jpaTm().loadAll(ContactResource.class)); - // TODO(weiminyu): compare load entities with originals. Note: lastUpdateTimes won't match by - // design. Need an elegant way to deal with this.bbq - assertThat(sqlContacts).hasSize(3); + assertThat(sqlContacts) + .comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp")) + .containsExactlyElementsIn( + contacts.stream() + .map(InitSqlTestUtils::datastoreToOfyEntity) + .map(ImmutableObject.class::cast) + .collect(ImmutableList.toImmutableList())); } } diff --git a/core/src/test/java/google/registry/model/contact/ContactResourceTest.java b/core/src/test/java/google/registry/model/contact/ContactResourceTest.java index 2d50eed40..804b4d04e 100644 --- a/core/src/test/java/google/registry/model/contact/ContactResourceTest.java +++ b/core/src/test/java/google/registry/model/contact/ContactResourceTest.java @@ -16,7 +16,6 @@ package google.registry.model.contact; import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth8.assertThat; -import static google.registry.model.EppResourceTestUtils.assertEqualsIgnoreLastUpdateTime; import static google.registry.model.EppResourceUtils.loadByForeignKey; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.testing.ContactResourceSubject.assertAboutContacts; @@ -31,6 +30,7 @@ import static org.junit.Assert.assertThrows; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import google.registry.model.EntityTestCase; +import google.registry.model.ImmutableObjectSubject; import google.registry.model.billing.BillingEvent; import google.registry.model.contact.Disclose.PostalInfoChoice; import google.registry.model.contact.PostalInfo.Type; @@ -155,7 +155,9 @@ public class ContactResourceTest extends EntityTestCase { .setServerApproveEntities(null) .build()) .build(); - assertEqualsIgnoreLastUpdateTime(persisted, fixed); + ImmutableObjectSubject.assertAboutImmutableObjects() + .that(persisted) + .isEqualExceptFields(fixed, "updateTimestamp"); } @Test diff --git a/core/src/test/java/google/registry/model/domain/DomainBaseSqlTest.java b/core/src/test/java/google/registry/model/domain/DomainBaseSqlTest.java index 2481c2f19..bf9ce1eb2 100644 --- a/core/src/test/java/google/registry/model/domain/DomainBaseSqlTest.java +++ b/core/src/test/java/google/registry/model/domain/DomainBaseSqlTest.java @@ -14,7 +14,7 @@ package google.registry.model.domain; -import static google.registry.model.EppResourceTestUtils.assertEqualsIgnoreLastUpdateTime; +import static google.registry.model.ImmutableObjectSubject.assertAboutImmutableObjects; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.testing.SqlHelper.assertThrowForeignKeyViolation; import static google.registry.testing.SqlHelper.saveRegistrar; @@ -154,7 +154,9 @@ public class DomainBaseSqlTest { DomainBase org = domain.asBuilder().setCreationTime(result.getCreationTime()).build(); // Note that the equality comparison forces a lazy load of all fields. - assertEqualsIgnoreLastUpdateTime(result, org); + assertAboutImmutableObjects() + .that(result) + .isEqualExceptFields(org, "updateTimestamp"); }); } diff --git a/core/src/test/java/google/registry/persistence/transaction/JpaTestRules.java b/core/src/test/java/google/registry/persistence/transaction/JpaTestRules.java index b7282248a..c02fc3489 100644 --- a/core/src/test/java/google/registry/persistence/transaction/JpaTestRules.java +++ b/core/src/test/java/google/registry/persistence/transaction/JpaTestRules.java @@ -135,14 +135,14 @@ public class JpaTestRules { @Override public void beforeEach(ExtensionContext context) throws Exception { this.currentTestClassName = context.getRequiredTestClass().getName(); - integrationTestRule.before(); + integrationTestRule.beforeEach(null); jpaEntityCoverage.before(); } @Override public void afterEach(ExtensionContext context) throws Exception { jpaEntityCoverage.after(); - integrationTestRule.after(); + integrationTestRule.afterEach(null); this.currentTestClassName = null; } } diff --git a/core/src/test/java/google/registry/persistence/transaction/JpaTransactionManagerRule.java b/core/src/test/java/google/registry/persistence/transaction/JpaTransactionManagerRule.java index dfd48e327..3e47499c6 100644 --- a/core/src/test/java/google/registry/persistence/transaction/JpaTransactionManagerRule.java +++ b/core/src/test/java/google/registry/persistence/transaction/JpaTransactionManagerRule.java @@ -216,25 +216,17 @@ abstract class JpaTransactionManagerRule extends ExternalResource } @Override - public void before() throws Exception { + protected void before() throws Exception { beforeEach(null); } @Override - public void after() { + protected void after() { afterEach(null); } - public String getDatabaseUrl() { - return database.getJdbcUrl(); - } - - public String getDatabaseUsername() { - return database.getUsername(); - } - - public String getDatabasePassword() { - return database.getPassword(); + public JdbcDatabaseContainer getDatabase() { + return database; } private void resetTablesAndSequences() { diff --git a/core/src/test/java/google/registry/testing/AppEngineRule.java b/core/src/test/java/google/registry/testing/AppEngineRule.java index 69762f74c..c72d031b4 100644 --- a/core/src/test/java/google/registry/testing/AppEngineRule.java +++ b/core/src/test/java/google/registry/testing/AppEngineRule.java @@ -30,6 +30,7 @@ import com.google.appengine.tools.development.testing.LocalServiceTestHelper; import com.google.appengine.tools.development.testing.LocalTaskQueueTestConfig; import com.google.appengine.tools.development.testing.LocalURLFetchServiceTestConfig; import com.google.appengine.tools.development.testing.LocalUserServiceTestConfig; +import com.google.apphosting.api.ApiProxy; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -521,6 +522,8 @@ public final class AppEngineRule extends ExternalResource } finally { temporaryFolder.delete(); } + // Clean up environment setting left behind by AppEngine test instance. + ApiProxy.setEnvironmentForCurrentThread(null); } /** diff --git a/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.dot b/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.dot new file mode 100644 index 000000000..af3fce7b4 --- /dev/null +++ b/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.dot @@ -0,0 +1,584 @@ +digraph { + rankdir=LR + subgraph cluster_0 { + label = "" + subgraph cluster_1 { + label = "Load Datastore snapshot" + subgraph cluster_2 { + label = "Load Datastore snapshot/Get export file patterns" + 3 [label="Read(CreateSource)"] + } + subgraph cluster_4 { + label = "Load Datastore snapshot/Find export files" + subgraph cluster_5 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll" + subgraph cluster_6 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Match filepatterns" + 7 [label="ParMultiDo(Match)"] + 3 -> 7 [style=solid label=""] + } + subgraph cluster_8 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey" + subgraph cluster_9 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Pair with random key" + 10 [label="ParMultiDo(AssignShard)"] + 7 -> 10 [style=solid label=""] + } + subgraph cluster_11 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle" + subgraph cluster_12 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/Window.Into()" + 13 [label="Window.Assign"] + 10 -> 13 [style=solid label=""] + } + subgraph cluster_14 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/ReifyOriginalTimestamps" + subgraph cluster_15 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)" + 16 [label="ParMultiDo(Anonymous)"] + 13 -> 16 [style=solid label=""] + } + } + 17 [label="GroupByKey"] + 16 -> 17 [style=solid label=""] + subgraph cluster_18 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/ExpandIterable" + 19 [label="ParMultiDo(Anonymous)"] + 17 -> 19 [style=solid label=""] + } + subgraph cluster_20 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps" + subgraph cluster_21 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard" + subgraph cluster_22 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)" + 23 [label="ParMultiDo(Anonymous)"] + 19 -> 23 [style=solid label=""] + } + } + subgraph cluster_24 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues" + subgraph cluster_25 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)" + 26 [label="ParMultiDo(Anonymous)"] + 23 -> 26 [style=solid label=""] + } + } + } + } + subgraph cluster_27 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Values" + subgraph cluster_28 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Values/Values" + subgraph cluster_29 { + label = "Load Datastore snapshot/Find export files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Values/Values/Map" + 30 [label="ParMultiDo(Anonymous)"] + 26 -> 30 [style=solid label=""] + } + } + } + } + } + } + subgraph cluster_31 { + label = "Load Datastore snapshot/Load export data" + subgraph cluster_32 { + label = "Load Datastore snapshot/Load export data/FileIO.ReadMatches" + subgraph cluster_33 { + label = "Load Datastore snapshot/Load export data/FileIO.ReadMatches/ParDo(ToReadableFile)" + 34 [label="ParMultiDo(ToReadableFile)"] + 30 -> 34 [style=solid label=""] + } + } + subgraph cluster_35 { + label = "Load Datastore snapshot/Load export data/BackupFileReader" + 36 [label="ParMultiDo(BackupFileReader)"] + 34 -> 36 [style=solid label=""] + } + } + subgraph cluster_37 { + label = "Load Datastore snapshot/Get commitlog file patterns" + 38 [label="Read(CreateSource)"] + } + subgraph cluster_39 { + label = "Load Datastore snapshot/Find commitlog files" + subgraph cluster_40 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll" + subgraph cluster_41 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Match filepatterns" + 42 [label="ParMultiDo(Match)"] + 38 -> 42 [style=solid label=""] + } + subgraph cluster_43 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey" + subgraph cluster_44 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Pair with random key" + 45 [label="ParMultiDo(AssignShard)"] + 42 -> 45 [style=solid label=""] + } + subgraph cluster_46 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle" + subgraph cluster_47 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/Window.Into()" + 48 [label="Window.Assign"] + 45 -> 48 [style=solid label=""] + } + subgraph cluster_49 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/ReifyOriginalTimestamps" + subgraph cluster_50 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)" + 51 [label="ParMultiDo(Anonymous)"] + 48 -> 51 [style=solid label=""] + } + } + 52 [label="GroupByKey"] + 51 -> 52 [style=solid label=""] + subgraph cluster_53 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/ExpandIterable" + 54 [label="ParMultiDo(Anonymous)"] + 52 -> 54 [style=solid label=""] + } + subgraph cluster_55 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps" + subgraph cluster_56 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard" + subgraph cluster_57 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)" + 58 [label="ParMultiDo(Anonymous)"] + 54 -> 58 [style=solid label=""] + } + } + subgraph cluster_59 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues" + subgraph cluster_60 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)" + 61 [label="ParMultiDo(Anonymous)"] + 58 -> 61 [style=solid label=""] + } + } + } + } + subgraph cluster_62 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Values" + subgraph cluster_63 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Values/Values" + subgraph cluster_64 { + label = "Load Datastore snapshot/Find commitlog files/FileIO.MatchAll/Reshuffle.ViaRandomKey/Values/Values/Map" + 65 [label="ParMultiDo(Anonymous)"] + 61 -> 65 [style=solid label=""] + } + } + } + } + } + } + subgraph cluster_66 { + label = "Load Datastore snapshot/Filter commitLog by time" + 67 [label="ParMultiDo(FilterCommitLogFileByTime)"] + 65 -> 67 [style=solid label=""] + } + subgraph cluster_68 { + label = "Load Datastore snapshot/Load commitlog data" + subgraph cluster_69 { + label = "Load Datastore snapshot/Load commitlog data/FileIO.ReadMatches" + subgraph cluster_70 { + label = "Load Datastore snapshot/Load commitlog data/FileIO.ReadMatches/ParDo(ToReadableFile)" + 71 [label="ParMultiDo(ToReadableFile)"] + 67 -> 71 [style=solid label=""] + } + } + subgraph cluster_72 { + label = "Load Datastore snapshot/Load commitlog data/BackupFileReader" + 73 [label="ParMultiDo(BackupFileReader)"] + 71 -> 73 [style=solid label=""] + } + } + 74 [label="Flatten.PCollections"] + 36 -> 74 [style=solid label=""] + 73 -> 74 [style=solid label=""] + subgraph cluster_75 { + label = "Load Datastore snapshot/Key entities by Datastore Keys" + subgraph cluster_76 { + label = "Load Datastore snapshot/Key entities by Datastore Keys/Map" + 77 [label="ParMultiDo(Anonymous)"] + 74 -> 77 [style=solid label=""] + } + } + 78 [label="GroupByKey"] + 77 -> 78 [style=solid label=""] + 79 [label="ParMultiDo(Anonymous)"] + 78 -> 79 [style=solid label=""] + } + subgraph cluster_80 { + label = "Write to sql: Transforms:Registrar" + subgraph cluster_81 { + label = "Write to sql: Transforms:Registrar/Shard data for Transforms:Registrar" + subgraph cluster_82 { + label = "Write to sql: Transforms:Registrar/Shard data for Transforms:Registrar/Map" + 83 [label="ParMultiDo(Anonymous)"] + 79 -> 83 [style=solid label=""] + } + } + subgraph cluster_84 { + label = "Write to sql: Transforms:Registrar/Batch output by shard Transforms:Registrar" + subgraph cluster_85 { + label = "Write to sql: Transforms:Registrar/Batch output by shard Transforms:Registrar/ParDo(GroupIntoBatches)" + 86 [label="ParMultiDo(GroupIntoBatches)"] + 83 -> 86 [style=solid label=""] + } + } + subgraph cluster_87 { + label = "Write to sql: Transforms:Registrar/Write in batch for Transforms:Registrar" + 88 [label="ParMultiDo(SqlBatchWriter)"] + 86 -> 88 [style=solid label=""] + } + } + subgraph cluster_89 { + label = "Wait on Transforms:Registrar" + subgraph cluster_90 { + label = "Wait on Transforms:Registrar/To wait view 0" + subgraph cluster_91 { + label = "Wait on Transforms:Registrar/To wait view 0/Window.Into()" + 92 [label="Flatten.PCollections"] + 88 -> 92 [style=solid label=""] + } + subgraph cluster_93 { + label = "Wait on Transforms:Registrar/To wait view 0/ParDo(CollectWindows)" + 94 [label="ParMultiDo(CollectWindows)"] + 92 -> 94 [style=solid label=""] + } + subgraph cluster_95 { + label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any" + subgraph cluster_96 { + label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_97 { + label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_98 { + label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_99 { + label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 100 [label="ParMultiDo(Anonymous)"] + 94 -> 100 [style=solid label=""] + } + } + } + subgraph cluster_101 { + label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 102 [label="GroupByKey"] + 100 -> 102 [style=solid label=""] + subgraph cluster_103 { + label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_104 { + label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 105 [label="ParMultiDo(Anonymous)"] + 102 -> 105 [style=solid label=""] + } + } + } + subgraph cluster_106 { + label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_107 { + label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_108 { + label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 109 [label="ParMultiDo(Anonymous)"] + 105 -> 109 [style=solid label=""] + } + } + } + } + subgraph cluster_110 { + label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_111 { + label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_112 { + label = "Wait on Transforms:Registrar/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 113 [label="ParMultiDo(Anonymous)"] + 109 -> 113 [style=solid label=""] + } + } + } + } + subgraph cluster_114 { + label = "Wait on Transforms:Registrar/To wait view 0/View.AsList" + subgraph cluster_115 { + label = "Wait on Transforms:Registrar/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_116 { + label = "Wait on Transforms:Registrar/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 117 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 113 -> 117 [style=solid label=""] + } + } + 118 [label="View.CreatePCollectionView"] + 117 -> 118 [style=solid label=""] + } + } + subgraph cluster_119 { + label = "Wait on Transforms:Registrar/Wait" + subgraph cluster_120 { + label = "Wait on Transforms:Registrar/Wait/Map" + 121 [label="ParMultiDo(Anonymous)"] + 79 -> 121 [style=solid label=""] + 117 -> 121 [style=dashed label=""] + } + } + } + subgraph cluster_122 { + label = "Write to sql: Transforms:ContactResource" + subgraph cluster_123 { + label = "Write to sql: Transforms:ContactResource/Shard data for Transforms:ContactResource" + subgraph cluster_124 { + label = "Write to sql: Transforms:ContactResource/Shard data for Transforms:ContactResource/Map" + 125 [label="ParMultiDo(Anonymous)"] + 121 -> 125 [style=solid label=""] + } + } + subgraph cluster_126 { + label = "Write to sql: Transforms:ContactResource/Batch output by shard Transforms:ContactResource" + subgraph cluster_127 { + label = "Write to sql: Transforms:ContactResource/Batch output by shard Transforms:ContactResource/ParDo(GroupIntoBatches)" + 128 [label="ParMultiDo(GroupIntoBatches)"] + 125 -> 128 [style=solid label=""] + } + } + subgraph cluster_129 { + label = "Write to sql: Transforms:ContactResource/Write in batch for Transforms:ContactResource" + 130 [label="ParMultiDo(SqlBatchWriter)"] + 128 -> 130 [style=solid label=""] + } + } + subgraph cluster_131 { + label = "Remove circular foreign keys from DomainBase" + 132 [label="ParMultiDo(RemoveDomainBaseForeignKeys)"] + 79 -> 132 [style=solid label=""] + } + subgraph cluster_133 { + label = "Wait on phase one" + subgraph cluster_134 { + label = "Wait on phase one/To wait view 0" + subgraph cluster_135 { + label = "Wait on phase one/To wait view 0/Window.Into()" + 136 [label="Flatten.PCollections"] + 130 -> 136 [style=solid label=""] + } + subgraph cluster_137 { + label = "Wait on phase one/To wait view 0/ParDo(CollectWindows)" + 138 [label="ParMultiDo(CollectWindows)"] + 136 -> 138 [style=solid label=""] + } + subgraph cluster_139 { + label = "Wait on phase one/To wait view 0/Sample.Any" + subgraph cluster_140 { + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_141 { + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_142 { + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_143 { + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 144 [label="ParMultiDo(Anonymous)"] + 138 -> 144 [style=solid label=""] + } + } + } + subgraph cluster_145 { + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 146 [label="GroupByKey"] + 144 -> 146 [style=solid label=""] + subgraph cluster_147 { + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_148 { + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 149 [label="ParMultiDo(Anonymous)"] + 146 -> 149 [style=solid label=""] + } + } + } + subgraph cluster_150 { + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_151 { + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_152 { + label = "Wait on phase one/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 153 [label="ParMultiDo(Anonymous)"] + 149 -> 153 [style=solid label=""] + } + } + } + } + subgraph cluster_154 { + label = "Wait on phase one/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_155 { + label = "Wait on phase one/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_156 { + label = "Wait on phase one/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 157 [label="ParMultiDo(Anonymous)"] + 153 -> 157 [style=solid label=""] + } + } + } + } + subgraph cluster_158 { + label = "Wait on phase one/To wait view 0/View.AsList" + subgraph cluster_159 { + label = "Wait on phase one/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_160 { + label = "Wait on phase one/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 161 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 157 -> 161 [style=solid label=""] + } + } + 162 [label="View.CreatePCollectionView"] + 161 -> 162 [style=solid label=""] + } + } + subgraph cluster_163 { + label = "Wait on phase one/Wait" + subgraph cluster_164 { + label = "Wait on phase one/Wait/Map" + 165 [label="ParMultiDo(Anonymous)"] + 132 -> 165 [style=solid label=""] + 161 -> 165 [style=dashed label=""] + } + } + } + subgraph cluster_166 { + label = "Write to sql: DomainBase without circular foreign keys" + subgraph cluster_167 { + label = "Write to sql: DomainBase without circular foreign keys/Shard data for DomainBase without circular foreign keys" + subgraph cluster_168 { + label = "Write to sql: DomainBase without circular foreign keys/Shard data for DomainBase without circular foreign keys/Map" + 169 [label="ParMultiDo(Anonymous)"] + 165 -> 169 [style=solid label=""] + } + } + subgraph cluster_170 { + label = "Write to sql: DomainBase without circular foreign keys/Batch output by shard DomainBase without circular foreign keys" + subgraph cluster_171 { + label = "Write to sql: DomainBase without circular foreign keys/Batch output by shard DomainBase without circular foreign keys/ParDo(GroupIntoBatches)" + 172 [label="ParMultiDo(GroupIntoBatches)"] + 169 -> 172 [style=solid label=""] + } + } + subgraph cluster_173 { + label = "Write to sql: DomainBase without circular foreign keys/Write in batch for DomainBase without circular foreign keys" + 174 [label="ParMultiDo(SqlBatchWriter)"] + 172 -> 174 [style=solid label=""] + } + } + subgraph cluster_175 { + label = "Wait on DomainBaseNoFkeys" + subgraph cluster_176 { + label = "Wait on DomainBaseNoFkeys/To wait view 0" + subgraph cluster_177 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Window.Into()" + 178 [label="Flatten.PCollections"] + 174 -> 178 [style=solid label=""] + } + subgraph cluster_179 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/ParDo(CollectWindows)" + 180 [label="ParMultiDo(CollectWindows)"] + 178 -> 180 [style=solid label=""] + } + subgraph cluster_181 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any" + subgraph cluster_182 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)" + subgraph cluster_183 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys" + subgraph cluster_184 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys" + subgraph cluster_185 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/WithKeys/AddKeys/Map" + 186 [label="ParMultiDo(Anonymous)"] + 180 -> 186 [style=solid label=""] + } + } + } + subgraph cluster_187 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)" + 188 [label="GroupByKey"] + 186 -> 188 [style=solid label=""] + subgraph cluster_189 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues" + subgraph cluster_190 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Combine.perKey(SampleAny)/Combine.GroupedValues/ParDo(Anonymous)" + 191 [label="ParMultiDo(Anonymous)"] + 188 -> 191 [style=solid label=""] + } + } + } + subgraph cluster_192 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values" + subgraph cluster_193 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values" + subgraph cluster_194 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Combine.globally(SampleAny)/Values/Values/Map" + 195 [label="ParMultiDo(Anonymous)"] + 191 -> 195 [style=solid label=""] + } + } + } + } + subgraph cluster_196 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Flatten.Iterables" + subgraph cluster_197 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables" + subgraph cluster_198 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/Sample.Any/Flatten.Iterables/FlattenIterables/FlatMap" + 199 [label="ParMultiDo(Anonymous)"] + 195 -> 199 [style=solid label=""] + } + } + } + } + subgraph cluster_200 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/View.AsList" + subgraph cluster_201 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization" + subgraph cluster_202 { + label = "Wait on DomainBaseNoFkeys/To wait view 0/View.AsList/View.VoidKeyToMultimapMaterialization/ParDo(VoidKeyToMultimapMaterialization)" + 203 [label="ParMultiDo(VoidKeyToMultimapMaterialization)"] + 199 -> 203 [style=solid label=""] + } + } + 204 [label="View.CreatePCollectionView"] + 203 -> 204 [style=solid label=""] + } + } + subgraph cluster_205 { + label = "Wait on DomainBaseNoFkeys/Wait" + subgraph cluster_206 { + label = "Wait on DomainBaseNoFkeys/Wait/Map" + 207 [label="ParMultiDo(Anonymous)"] + 79 -> 207 [style=solid label=""] + 203 -> 207 [style=dashed label=""] + } + } + } + subgraph cluster_208 { + label = "Write to sql: Transforms:HostResource" + subgraph cluster_209 { + label = "Write to sql: Transforms:HostResource/Shard data for Transforms:HostResource" + subgraph cluster_210 { + label = "Write to sql: Transforms:HostResource/Shard data for Transforms:HostResource/Map" + 211 [label="ParMultiDo(Anonymous)"] + 207 -> 211 [style=solid label=""] + } + } + subgraph cluster_212 { + label = "Write to sql: Transforms:HostResource/Batch output by shard Transforms:HostResource" + subgraph cluster_213 { + label = "Write to sql: Transforms:HostResource/Batch output by shard Transforms:HostResource/ParDo(GroupIntoBatches)" + 214 [label="ParMultiDo(GroupIntoBatches)"] + 211 -> 214 [style=solid label=""] + } + } + subgraph cluster_215 { + label = "Write to sql: Transforms:HostResource/Write in batch for Transforms:HostResource" + 216 [label="ParMultiDo(SqlBatchWriter)"] + 214 -> 216 [style=solid label=""] + } + } + } +} diff --git a/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.png b/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.png new file mode 100644 index 000000000..40ebad90d Binary files /dev/null and b/core/src/test/resources/google/registry/beam/initsql/pipeline_golden.png differ diff --git a/dependencies.gradle b/dependencies.gradle index 63aed960d..68a96aa8b 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -112,6 +112,7 @@ ext { 'org.junit.platform:junit-platform-suite-api:1.6.2', 'org.junit.vintage:junit-vintage-engine:5.6.2', 'org.apache.avro:avro:1.8.2', + 'org.apache.beam:beam-runners-core-construction-java:2.16.0', 'org.apache.beam:beam-runners-direct-java:2.16.0', 'org.apache.beam:beam-runners-google-cloud-dataflow-java:2.16.0', 'org.apache.beam:beam-sdks-java-core:2.16.0', diff --git a/gradle.properties b/gradle.properties index 6400bcb31..7873f1e50 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,10 +1,10 @@ # This file defines properties used by the gradle build. It must be kept in # sync with config/nom_build.py. # -# To regenerate, run config/nom_build.py --generate-gradle-properties +# To regenerate, run ./nom_build --generate-gradle-properties # # To view property descriptions (which are command line flags for -# nom_build), run config/nom_build.py --help. +# nom_build), run ./nom_build --help. # # DO NOT EDIT THIS FILE BY HAND org.gradle.jvmargs=-Xmx1024m @@ -26,3 +26,4 @@ dbPassword= publish_repo= schema_version= nomulus_version= +dot_path=/usr/bin/dot