diff --git a/core/src/test/java/google/registry/beam/TestPipelineExtension.java b/core/src/test/java/google/registry/beam/TestPipelineExtension.java new file mode 100644 index 000000000..bef8158e6 --- /dev/null +++ b/core/src/test/java/google/registry/beam/TestPipelineExtension.java @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package google.registry.beam; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.options.ApplicationNameOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.testing.CrashingRunner; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.testing.ValidatesRunner; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.common.ReflectHelpers; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicate; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +// NOTE: This file is copied from the Apache Beam distribution so that it can be locally modified to +// support JUnit 5. + +/** + * A creator of test pipelines that can be used inside of tests that can be configured to run + * locally or against a remote pipeline runner. + * + *

It is recommended to tag hand-selected tests for this purpose using the {@link + * ValidatesRunner} {@link Category} annotation, as each test run against a pipeline runner will + * utilize resources of that pipeline runner. + * + *

In order to run tests on a pipeline runner, the following conditions must be met: + * + *

+ * + *

Use {@link PAssert} for tests, as it integrates with this test harness in both direct and + * remote execution modes. For example: + * + *


+ * {@literal @Rule}
+ *  public final transient TestPipeline p = TestPipeline.create();
+ *
+ * {@literal @Test}
+ * {@literal @Category}(NeedsRunner.class)
+ *  public void myPipelineTest() throws Exception {
+ *    final PCollection<String> pCollection = pipeline.apply(...)
+ *    PAssert.that(pCollection).containsInAnyOrder(...);
+ *    pipeline.run();
+ *  }
+ * 
+ * + *

For pipeline runners, it is required that they must throw an {@link AssertionError} containing + * the message from the {@link PAssert} that failed. + * + *

See also the Testing documentation + * section. + */ +public class TestPipelineExtension extends Pipeline implements TestRule { + + private final PipelineOptions options; + + private static class PipelineRunEnforcement { + + @SuppressWarnings("WeakerAccess") + protected boolean enableAutoRunIfMissing; + + protected final Pipeline pipeline; + + protected boolean runAttempted; + + private PipelineRunEnforcement(final Pipeline pipeline) { + this.pipeline = pipeline; + } + + protected void enableAutoRunIfMissing(final boolean enable) { + enableAutoRunIfMissing = enable; + } + + protected void beforePipelineExecution() { + runAttempted = true; + } + + protected void afterPipelineExecution() {} + + protected void afterUserCodeFinished() { + if (!runAttempted && enableAutoRunIfMissing) { + pipeline.run().waitUntilFinish(); + } + } + } + + private static class PipelineAbandonedNodeEnforcement extends PipelineRunEnforcement { + + // Null until the pipeline has been run + @Nullable private List runVisitedNodes; + + private final Predicate isPAssertNode = + node -> + node.getTransform() instanceof PAssert.GroupThenAssert + || node.getTransform() instanceof PAssert.GroupThenAssertForSingleton + || node.getTransform() instanceof PAssert.OneSideInputAssert; + + private static class NodeRecorder extends PipelineVisitor.Defaults { + + private final List visited = new ArrayList<>(); + + @Override + public void leaveCompositeTransform(final TransformHierarchy.Node node) { + visited.add(node); + } + + @Override + public void visitPrimitiveTransform(final TransformHierarchy.Node node) { + visited.add(node); + } + } + + private PipelineAbandonedNodeEnforcement(final TestPipelineExtension pipeline) { + super(pipeline); + runVisitedNodes = null; + } + + private List recordPipelineNodes(final Pipeline pipeline) { + final NodeRecorder nodeRecorder = new NodeRecorder(); + pipeline.traverseTopologically(nodeRecorder); + return nodeRecorder.visited; + } + + private boolean isEmptyPipeline(final Pipeline pipeline) { + final IsEmptyVisitor isEmptyVisitor = new IsEmptyVisitor(); + pipeline.traverseTopologically(isEmptyVisitor); + return isEmptyVisitor.isEmpty(); + } + + private void verifyPipelineExecution() { + if (!isEmptyPipeline(pipeline)) { + if (!runAttempted && !enableAutoRunIfMissing) { + throw new PipelineRunMissingException("The pipeline has not been run."); + + } else { + final List pipelineNodes = recordPipelineNodes(pipeline); + if (pipelineRunSucceeded() && !visitedAll(pipelineNodes)) { + final boolean hasDanglingPAssert = + FluentIterable.from(pipelineNodes) + .filter(Predicates.not(Predicates.in(runVisitedNodes))) + .anyMatch(isPAssertNode); + if (hasDanglingPAssert) { + throw new AbandonedNodeException("The pipeline contains abandoned PAssert(s)."); + } else { + throw new AbandonedNodeException("The pipeline contains abandoned PTransform(s)."); + } + } + } + } + } + + private boolean visitedAll(final List pipelineNodes) { + return runVisitedNodes.equals(pipelineNodes); + } + + private boolean pipelineRunSucceeded() { + return runVisitedNodes != null; + } + + @Override + protected void afterPipelineExecution() { + runVisitedNodes = recordPipelineNodes(pipeline); + super.afterPipelineExecution(); + } + + @Override + protected void afterUserCodeFinished() { + super.afterUserCodeFinished(); + verifyPipelineExecution(); + } + } + + /** + * An exception thrown in case an abandoned {@link org.apache.beam.sdk.transforms.PTransform} is + * detected, that is, a {@link org.apache.beam.sdk.transforms.PTransform} that has not been run. + */ + public static class AbandonedNodeException extends RuntimeException { + + AbandonedNodeException(final String msg) { + super(msg); + } + } + + /** An exception thrown in case a test finishes without invoking {@link Pipeline#run()}. */ + public static class PipelineRunMissingException extends RuntimeException { + + PipelineRunMissingException(final String msg) { + super(msg); + } + } + + /** System property used to set {@link TestPipelineOptions}. */ + public static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions"; + + static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner"; + + private static final ObjectMapper MAPPER = + new ObjectMapper() + .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader())); + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + private Optional enforcement = Optional.absent(); + + /** + * Creates and returns a new test pipeline. + * + *

Use {@link PAssert} to add tests, then call {@link Pipeline#run} to execute the pipeline and + * check the tests. + */ + public static TestPipelineExtension create() { + return fromOptions(testingPipelineOptions()); + } + + public static TestPipelineExtension fromOptions(PipelineOptions options) { + return new TestPipelineExtension(options); + } + + private TestPipelineExtension(final PipelineOptions options) { + super(options); + this.options = options; + } + + @Override + public PipelineOptions getOptions() { + return this.options; + } + + @Override + public Statement apply(final Statement statement, final Description description) { + return new Statement() { + + private void setDeducedEnforcementLevel() { + // if the enforcement level has not been set by the user do auto-inference + if (!enforcement.isPresent()) { + + final boolean annotatedWithNeedsRunner = + FluentIterable.from(description.getAnnotations()) + .filter(Annotations.Predicates.isAnnotationOfType(Category.class)) + .anyMatch(Annotations.Predicates.isCategoryOf(NeedsRunner.class, true)); + + final boolean crashingRunner = CrashingRunner.class.isAssignableFrom(options.getRunner()); + + checkState( + !(annotatedWithNeedsRunner && crashingRunner), + "The test was annotated with a [@%s] / [@%s] while the runner " + + "was set to [%s]. Please re-check your configuration.", + NeedsRunner.class.getSimpleName(), + ValidatesRunner.class.getSimpleName(), + CrashingRunner.class.getSimpleName()); + + enableAbandonedNodeEnforcement(annotatedWithNeedsRunner || !crashingRunner); + } + } + + @Override + public void evaluate() throws Throwable { + options.as(ApplicationNameOptions.class).setAppName(getAppName(description)); + + setDeducedEnforcementLevel(); + + // statement.evaluate() essentially runs the user code contained in the unit test at hand. + // Exceptions thrown during the execution of the user's test code will propagate here, + // unless the user explicitly handles them with a "catch" clause in his code. If the + // exception is handled by a user's "catch" clause, is does not interrupt the flow and + // we move on to invoking the configured enforcements. + // If the user does not handle a thrown exception, it will propagate here and interrupt + // the flow, preventing the enforcement(s) from being activated. + // The motivation for this is avoiding enforcements over faulty pipelines. + statement.evaluate(); + enforcement.get().afterUserCodeFinished(); + } + }; + } + + /** + * Runs this {@link TestPipelineExtension}, unwrapping any {@code AssertionError} that is raised during + * testing. + */ + @Override + public PipelineResult run() { + return run(getOptions()); + } + + /** Like {@link #run} but with the given potentially modified options. */ + @Override + public PipelineResult run(PipelineOptions options) { + checkState( + enforcement.isPresent(), + "Is your TestPipeline declaration missing a @Rule annotation? Usage: " + + "@Rule public final transient TestPipeline pipeline = TestPipeline.create();"); + + final PipelineResult pipelineResult; + try { + enforcement.get().beforePipelineExecution(); + PipelineOptions updatedOptions = + MAPPER.convertValue(MAPPER.valueToTree(options), PipelineOptions.class); + updatedOptions + .as(TestValueProviderOptions.class) + .setProviderRuntimeValues(StaticValueProvider.of(providerRuntimeValues)); + pipelineResult = super.run(updatedOptions); + verifyPAssertsSucceeded(this, pipelineResult); + } catch (RuntimeException exc) { + Throwable cause = exc.getCause(); + if (cause instanceof AssertionError) { + throw (AssertionError) cause; + } else { + throw exc; + } + } + + // If we reach this point, the pipeline has been run and no exceptions have been thrown during + // its execution. + enforcement.get().afterPipelineExecution(); + return pipelineResult; + } + + /** Implementation detail of {@link #newProvider}, do not use. */ + @Internal + public interface TestValueProviderOptions extends PipelineOptions { + ValueProvider> getProviderRuntimeValues(); + + void setProviderRuntimeValues(ValueProvider> runtimeValues); + } + + /** + * Returns a new {@link ValueProvider} that is inaccessible before {@link #run}, but will be + * accessible while the pipeline runs. + */ + public ValueProvider newProvider(T runtimeValue) { + String uuid = UUID.randomUUID().toString(); + providerRuntimeValues.put(uuid, runtimeValue); + return ValueProvider.NestedValueProvider.of( + options.as(TestValueProviderOptions.class).getProviderRuntimeValues(), + new GetFromRuntimeValues(uuid)); + } + + private final Map providerRuntimeValues = Maps.newHashMap(); + + private static class GetFromRuntimeValues + implements SerializableFunction, T> { + private final String key; + + private GetFromRuntimeValues(String key) { + this.key = key; + } + + @Override + public T apply(Map input) { + return (T) input.get(key); + } + } + + /** + * Enables the abandoned node detection. Abandoned nodes are PTransforms, + * PAsserts included, that were not executed by the pipeline runner. Abandoned nodes are + * most likely to occur due to the one of the following scenarios: + * + *

+ * + * Abandoned node detection is automatically enabled when a real pipeline runner (i.e. not a + * {@link CrashingRunner}) and/or a {@link NeedsRunner} or a {@link ValidatesRunner} annotation + * are detected. + */ + public TestPipelineExtension enableAbandonedNodeEnforcement(final boolean enable) { + enforcement = + enable + ? Optional.of(new PipelineAbandonedNodeEnforcement(this)) + : Optional.of(new PipelineRunEnforcement(this)); + + return this; + } + + /** + * If enabled, a pipeline.run() statement will be added automatically in case it is + * missing in the test. + */ + public TestPipelineExtension enableAutoRunIfMissing(final boolean enable) { + enforcement.get().enableAutoRunIfMissing(enable); + return this; + } + + @Override + public String toString() { + return "TestPipeline#" + options.as(ApplicationNameOptions.class).getAppName(); + } + + /** Creates {@link PipelineOptions} for testing. */ + public static PipelineOptions testingPipelineOptions() { + try { + @Nullable + String beamTestPipelineOptions = System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS); + + PipelineOptions options = + Strings.isNullOrEmpty(beamTestPipelineOptions) + ? PipelineOptionsFactory.create() + : PipelineOptionsFactory.fromArgs( + MAPPER.readValue(beamTestPipelineOptions, String[].class)) + .as(TestPipelineOptions.class); + + // If no options were specified, set some reasonable defaults + if (Strings.isNullOrEmpty(beamTestPipelineOptions)) { + // If there are no provided options, check to see if a dummy runner should be used. + String useDefaultDummy = System.getProperty(PROPERTY_USE_DEFAULT_DUMMY_RUNNER); + if (!Strings.isNullOrEmpty(useDefaultDummy) && Boolean.valueOf(useDefaultDummy)) { + options.setRunner(CrashingRunner.class); + } + } + options.setStableUniqueNames(CheckEnabled.ERROR); + + FileSystems.setDefaultPipelineOptions(options); + return options; + } catch (IOException e) { + throw new RuntimeException( + "Unable to instantiate test options from system property " + + PROPERTY_BEAM_TEST_PIPELINE_OPTIONS + + ":" + + System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS), + e); + } + } + + /** Returns the class + method name of the test. */ + private String getAppName(Description description) { + String methodName = description.getMethodName(); + Class testClass = description.getTestClass(); + if (testClass.isMemberClass()) { + return String.format( + "%s$%s-%s", + testClass.getEnclosingClass().getSimpleName(), testClass.getSimpleName(), methodName); + } else { + return String.format("%s-%s", testClass.getSimpleName(), methodName); + } + } + + /** + * Verifies all {{@link PAssert PAsserts}} in the pipeline have been executed and were successful. + * + *

Note this only runs for runners which support Metrics. Runners which do not should verify + * this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001 + */ + public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pipelineResult) { + if (MetricsEnvironment.isMetricsSupported()) { + long expectedNumberOfAssertions = (long) PAssert.countAsserts(pipeline); + + long successfulAssertions = 0; + Iterable> successCounterResults = + pipelineResult + .metrics() + .queryMetrics( + MetricsFilter.builder() + .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER)) + .build()) + .getCounters(); + for (MetricResult counter : successCounterResults) { + if (counter.getAttempted() > 0) { + successfulAssertions++; + } + } + + assertThat( + String.format( + "Expected %d successful assertions, but found %d.", + expectedNumberOfAssertions, successfulAssertions), + successfulAssertions, + is(expectedNumberOfAssertions)); + } + } + + private static class IsEmptyVisitor extends PipelineVisitor.Defaults { + private boolean empty = true; + + public boolean isEmpty() { + return empty; + } + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + empty = false; + } + } + + /** + * A utility class for querying annotations. + * + *

NOTE: This was copied from the Apache Beam project from a separate file only for visibility + * reasons (it's package-private there). + */ + static class Annotations { + + /** Annotation predicates. */ + static class Predicates { + + static Predicate isAnnotationOfType(final Class clazz) { + return annotation -> + annotation.annotationType() != null && annotation.annotationType().equals(clazz); + } + + static Predicate isCategoryOf(final Class value, final boolean allowDerived) { + return category -> + FluentIterable.from(Arrays.asList(((Category) category).value())) + .anyMatch( + aClass -> allowDerived ? value.isAssignableFrom(aClass) : value.equals(aClass)); + } + } + } +} diff --git a/core/src/test/java/google/registry/beam/initsql/CommitLogTransformsTest.java b/core/src/test/java/google/registry/beam/initsql/CommitLogTransformsTest.java index 941a1df99..e1b75d6b8 100644 --- a/core/src/test/java/google/registry/beam/initsql/CommitLogTransformsTest.java +++ b/core/src/test/java/google/registry/beam/initsql/CommitLogTransformsTest.java @@ -21,6 +21,7 @@ import static google.registry.testing.DatastoreHelper.newRegistry; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import google.registry.backup.VersionedEntity; +import google.registry.beam.TestPipelineExtension; import google.registry.model.contact.ContactResource; import google.registry.model.domain.DomainBase; import google.registry.model.ofy.Ofy; @@ -34,7 +35,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -51,8 +51,6 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** Unit tests for {@link Transforms} related to loading CommitLogs. */ -// TODO(weiminyu): Upgrade to JUnit5 when TestPipeline is upgraded. It is also easy to adapt with -// a wrapper. @RunWith(JUnit4.class) public class CommitLogTransformsTest implements Serializable { @@ -63,8 +61,8 @@ public class CommitLogTransformsTest implements Serializable { @Rule public final transient InjectRule injectRule = new InjectRule(); @Rule - public final transient TestPipeline pipeline = - TestPipeline.create().enableAbandonedNodeEnforcement(true); + public final transient TestPipelineExtension pipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); private FakeClock fakeClock; private transient BackupTestStore store; diff --git a/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java b/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java index b83a6cdb5..50404686d 100644 --- a/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java +++ b/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.googlecode.objectify.Key; import google.registry.backup.VersionedEntity; +import google.registry.beam.TestPipelineExtension; import google.registry.model.contact.ContactResource; import google.registry.model.domain.DomainBase; import google.registry.model.ofy.Ofy; @@ -35,7 +36,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -56,8 +56,6 @@ import org.junit.runners.JUnit4; * *

This class implements {@link Serializable} so that test {@link DoFn} classes may be inlined. */ -// TODO(weiminyu): Upgrade to JUnit5 when TestPipeline is upgraded. It is also easy to adapt with -// a wrapper. @RunWith(JUnit4.class) public class ExportloadingTransformsTest implements Serializable { private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z"); @@ -72,8 +70,8 @@ public class ExportloadingTransformsTest implements Serializable { @Rule public final transient InjectRule injectRule = new InjectRule(); @Rule - public final transient TestPipeline pipeline = - TestPipeline.create().enableAbandonedNodeEnforcement(true); + public final transient TestPipelineExtension pipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); private FakeClock fakeClock; private transient BackupTestStore store; diff --git a/core/src/test/java/google/registry/beam/initsql/LoadDatastoreSnapshotTest.java b/core/src/test/java/google/registry/beam/initsql/LoadDatastoreSnapshotTest.java index 8399d8dcd..477160adf 100644 --- a/core/src/test/java/google/registry/beam/initsql/LoadDatastoreSnapshotTest.java +++ b/core/src/test/java/google/registry/beam/initsql/LoadDatastoreSnapshotTest.java @@ -22,6 +22,7 @@ import com.google.appengine.api.datastore.Entity; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.googlecode.objectify.Key; +import google.registry.beam.TestPipelineExtension; import google.registry.model.contact.ContactResource; import google.registry.model.domain.DomainAuthInfo; import google.registry.model.domain.DomainBase; @@ -32,7 +33,6 @@ import google.registry.testing.FakeClock; import google.registry.testing.InjectRule; import java.io.File; import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionTuple; import org.joda.time.DateTime; @@ -85,8 +85,8 @@ public class LoadDatastoreSnapshotTest { @Rule public final transient InjectRule injectRule = new InjectRule(); @Rule - public final transient TestPipeline pipeline = - TestPipeline.create().enableAbandonedNodeEnforcement(true); + public final transient TestPipelineExtension pipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); private FakeClock fakeClock; private File exportRootDir; diff --git a/core/src/test/java/google/registry/beam/initsql/WriteToSqlTest.java b/core/src/test/java/google/registry/beam/initsql/WriteToSqlTest.java index 95ce904a8..a1821dd7e 100644 --- a/core/src/test/java/google/registry/beam/initsql/WriteToSqlTest.java +++ b/core/src/test/java/google/registry/beam/initsql/WriteToSqlTest.java @@ -20,6 +20,7 @@ import static google.registry.persistence.transaction.TransactionManagerFactory. import com.google.appengine.api.datastore.Entity; import com.google.common.collect.ImmutableList; import google.registry.backup.VersionedEntity; +import google.registry.beam.TestPipelineExtension; import google.registry.model.contact.ContactResource; import google.registry.model.ofy.Ofy; import google.registry.model.registrar.Registrar; @@ -35,7 +36,6 @@ import java.io.PrintStream; import java.io.Serializable; import java.util.stream.Collectors; import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.joda.time.DateTime; import org.junit.Before; @@ -65,8 +65,8 @@ public class WriteToSqlTest implements Serializable { @Rule public transient TemporaryFolder temporaryFolder = new TemporaryFolder(); @Rule - public final transient TestPipeline pipeline = - TestPipeline.create().enableAbandonedNodeEnforcement(true); + public final transient TestPipelineExtension pipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); private ImmutableList contacts; diff --git a/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java b/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java index d85907df7..19ee22ff5 100644 --- a/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java +++ b/core/src/test/java/google/registry/beam/invoicing/InvoicingPipelineTest.java @@ -19,6 +19,7 @@ import static com.google.common.truth.Truth.assertThat; import com.google.auth.oauth2.GoogleCredentials; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import google.registry.beam.TestPipelineExtension; import google.registry.util.GoogleCredentialsBundle; import google.registry.util.ResourceUtils; import java.io.File; @@ -30,7 +31,6 @@ import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; import org.junit.Before; @@ -53,7 +53,10 @@ public class InvoicingPipelineTest { pipelineOptions.setRunner(DirectRunner.class); } - @Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions); + @Rule + public final transient TestPipelineExtension p = + TestPipelineExtension.fromOptions(pipelineOptions); + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); private InvoicingPipeline invoicingPipeline; diff --git a/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java b/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java index bf9c41ab0..6479a8bcf 100644 --- a/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java +++ b/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.withSettings; import com.google.auth.oauth2.GoogleCredentials; import com.google.common.collect.ImmutableList; import com.google.common.io.CharStreams; +import google.registry.beam.TestPipelineExtension; import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn; import google.registry.testing.FakeClock; import google.registry.testing.FakeSleeper; @@ -43,7 +44,6 @@ import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; import org.apache.http.ProtocolVersion; @@ -78,7 +78,10 @@ public class Spec11PipelineTest { pipelineOptions.setRunner(DirectRunner.class); } - @Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions); + @Rule + public final transient TestPipelineExtension p = + TestPipelineExtension.fromOptions(pipelineOptions); + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); private final Retrier retrier =