mirror of
https://github.com/google/nomulus.git
synced 2025-06-12 15:34:47 +02:00
Copy TestPipeline rule from Apache Beam project into our codebase
This is copied in here with the absolute minimum # of modifications required (just a rename to JUnit 5 format and some small fixes required to enable compilation to be successful). This is in preparation for the next commit where I'll convert this Rule into a JUnit 5 extension, which is the entire goal here. But I wanted to get the code from Apache Beam in with the maximum possible fidelity so that my changes will be in a separate commit and will thus be obvious. Note that we do unfortunately need to modify/rewrite the Rule itself; merely wrapping it in some manner isn't possible.
This commit is contained in:
parent
3c632b3fc0
commit
731a85b285
7 changed files with 591 additions and 20 deletions
|
@ -0,0 +1,569 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package google.registry.beam;
|
||||||
|
|
||||||
|
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.annotation.Annotation;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.UUID;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import org.apache.beam.sdk.Pipeline;
|
||||||
|
import org.apache.beam.sdk.PipelineResult;
|
||||||
|
import org.apache.beam.sdk.annotations.Internal;
|
||||||
|
import org.apache.beam.sdk.io.FileSystems;
|
||||||
|
import org.apache.beam.sdk.metrics.MetricNameFilter;
|
||||||
|
import org.apache.beam.sdk.metrics.MetricResult;
|
||||||
|
import org.apache.beam.sdk.metrics.MetricsEnvironment;
|
||||||
|
import org.apache.beam.sdk.metrics.MetricsFilter;
|
||||||
|
import org.apache.beam.sdk.options.ApplicationNameOptions;
|
||||||
|
import org.apache.beam.sdk.options.PipelineOptions;
|
||||||
|
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
|
||||||
|
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||||
|
import org.apache.beam.sdk.options.ValueProvider;
|
||||||
|
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
|
||||||
|
import org.apache.beam.sdk.runners.TransformHierarchy;
|
||||||
|
import org.apache.beam.sdk.testing.CrashingRunner;
|
||||||
|
import org.apache.beam.sdk.testing.NeedsRunner;
|
||||||
|
import org.apache.beam.sdk.testing.PAssert;
|
||||||
|
import org.apache.beam.sdk.testing.TestPipelineOptions;
|
||||||
|
import org.apache.beam.sdk.testing.ValidatesRunner;
|
||||||
|
import org.apache.beam.sdk.transforms.SerializableFunction;
|
||||||
|
import org.apache.beam.sdk.util.common.ReflectHelpers;
|
||||||
|
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
|
||||||
|
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicate;
|
||||||
|
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
|
||||||
|
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
|
||||||
|
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
|
||||||
|
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestRule;
|
||||||
|
import org.junit.runner.Description;
|
||||||
|
import org.junit.runners.model.Statement;
|
||||||
|
|
||||||
|
// NOTE: This file is copied from the Apache Beam distribution so that it can be locally modified to
|
||||||
|
// support JUnit 5.
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A creator of test pipelines that can be used inside of tests that can be configured to run
|
||||||
|
* locally or against a remote pipeline runner.
|
||||||
|
*
|
||||||
|
* <p>It is recommended to tag hand-selected tests for this purpose using the {@link
|
||||||
|
* ValidatesRunner} {@link Category} annotation, as each test run against a pipeline runner will
|
||||||
|
* utilize resources of that pipeline runner.
|
||||||
|
*
|
||||||
|
* <p>In order to run tests on a pipeline runner, the following conditions must be met:
|
||||||
|
*
|
||||||
|
* <ul>
|
||||||
|
* <li>System property "beamTestPipelineOptions" must contain a JSON delimited list of pipeline
|
||||||
|
* options. For example:
|
||||||
|
* <pre>{@code [
|
||||||
|
* "--runner=TestDataflowRunner",
|
||||||
|
* "--project=mygcpproject",
|
||||||
|
* "--stagingLocation=gs://mygcsbucket/path"
|
||||||
|
* ]}</pre>
|
||||||
|
* Note that the set of pipeline options required is pipeline runner specific.
|
||||||
|
* <li>Jars containing the SDK and test classes must be available on the classpath.
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
* <p>Use {@link PAssert} for tests, as it integrates with this test harness in both direct and
|
||||||
|
* remote execution modes. For example:
|
||||||
|
*
|
||||||
|
* <pre><code>
|
||||||
|
* {@literal @Rule}
|
||||||
|
* public final transient TestPipeline p = TestPipeline.create();
|
||||||
|
*
|
||||||
|
* {@literal @Test}
|
||||||
|
* {@literal @Category}(NeedsRunner.class)
|
||||||
|
* public void myPipelineTest() throws Exception {
|
||||||
|
* final PCollection<String> pCollection = pipeline.apply(...)
|
||||||
|
* PAssert.that(pCollection).containsInAnyOrder(...);
|
||||||
|
* pipeline.run();
|
||||||
|
* }
|
||||||
|
* </code></pre>
|
||||||
|
*
|
||||||
|
* <p>For pipeline runners, it is required that they must throw an {@link AssertionError} containing
|
||||||
|
* the message from the {@link PAssert} that failed.
|
||||||
|
*
|
||||||
|
* <p>See also the <a href="https://beam.apache.org/contribute/testing/">Testing</a> documentation
|
||||||
|
* section.
|
||||||
|
*/
|
||||||
|
public class TestPipelineExtension extends Pipeline implements TestRule {
|
||||||
|
|
||||||
|
private final PipelineOptions options;
|
||||||
|
|
||||||
|
private static class PipelineRunEnforcement {
|
||||||
|
|
||||||
|
@SuppressWarnings("WeakerAccess")
|
||||||
|
protected boolean enableAutoRunIfMissing;
|
||||||
|
|
||||||
|
protected final Pipeline pipeline;
|
||||||
|
|
||||||
|
protected boolean runAttempted;
|
||||||
|
|
||||||
|
private PipelineRunEnforcement(final Pipeline pipeline) {
|
||||||
|
this.pipeline = pipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void enableAutoRunIfMissing(final boolean enable) {
|
||||||
|
enableAutoRunIfMissing = enable;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void beforePipelineExecution() {
|
||||||
|
runAttempted = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void afterPipelineExecution() {}
|
||||||
|
|
||||||
|
protected void afterUserCodeFinished() {
|
||||||
|
if (!runAttempted && enableAutoRunIfMissing) {
|
||||||
|
pipeline.run().waitUntilFinish();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class PipelineAbandonedNodeEnforcement extends PipelineRunEnforcement {
|
||||||
|
|
||||||
|
// Null until the pipeline has been run
|
||||||
|
@Nullable private List<TransformHierarchy.Node> runVisitedNodes;
|
||||||
|
|
||||||
|
private final Predicate<TransformHierarchy.Node> isPAssertNode =
|
||||||
|
node ->
|
||||||
|
node.getTransform() instanceof PAssert.GroupThenAssert
|
||||||
|
|| node.getTransform() instanceof PAssert.GroupThenAssertForSingleton
|
||||||
|
|| node.getTransform() instanceof PAssert.OneSideInputAssert;
|
||||||
|
|
||||||
|
private static class NodeRecorder extends PipelineVisitor.Defaults {
|
||||||
|
|
||||||
|
private final List<TransformHierarchy.Node> visited = new ArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void leaveCompositeTransform(final TransformHierarchy.Node node) {
|
||||||
|
visited.add(node);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void visitPrimitiveTransform(final TransformHierarchy.Node node) {
|
||||||
|
visited.add(node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private PipelineAbandonedNodeEnforcement(final TestPipelineExtension pipeline) {
|
||||||
|
super(pipeline);
|
||||||
|
runVisitedNodes = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<TransformHierarchy.Node> recordPipelineNodes(final Pipeline pipeline) {
|
||||||
|
final NodeRecorder nodeRecorder = new NodeRecorder();
|
||||||
|
pipeline.traverseTopologically(nodeRecorder);
|
||||||
|
return nodeRecorder.visited;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isEmptyPipeline(final Pipeline pipeline) {
|
||||||
|
final IsEmptyVisitor isEmptyVisitor = new IsEmptyVisitor();
|
||||||
|
pipeline.traverseTopologically(isEmptyVisitor);
|
||||||
|
return isEmptyVisitor.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyPipelineExecution() {
|
||||||
|
if (!isEmptyPipeline(pipeline)) {
|
||||||
|
if (!runAttempted && !enableAutoRunIfMissing) {
|
||||||
|
throw new PipelineRunMissingException("The pipeline has not been run.");
|
||||||
|
|
||||||
|
} else {
|
||||||
|
final List<TransformHierarchy.Node> pipelineNodes = recordPipelineNodes(pipeline);
|
||||||
|
if (pipelineRunSucceeded() && !visitedAll(pipelineNodes)) {
|
||||||
|
final boolean hasDanglingPAssert =
|
||||||
|
FluentIterable.from(pipelineNodes)
|
||||||
|
.filter(Predicates.not(Predicates.in(runVisitedNodes)))
|
||||||
|
.anyMatch(isPAssertNode);
|
||||||
|
if (hasDanglingPAssert) {
|
||||||
|
throw new AbandonedNodeException("The pipeline contains abandoned PAssert(s).");
|
||||||
|
} else {
|
||||||
|
throw new AbandonedNodeException("The pipeline contains abandoned PTransform(s).");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean visitedAll(final List<TransformHierarchy.Node> pipelineNodes) {
|
||||||
|
return runVisitedNodes.equals(pipelineNodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean pipelineRunSucceeded() {
|
||||||
|
return runVisitedNodes != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void afterPipelineExecution() {
|
||||||
|
runVisitedNodes = recordPipelineNodes(pipeline);
|
||||||
|
super.afterPipelineExecution();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void afterUserCodeFinished() {
|
||||||
|
super.afterUserCodeFinished();
|
||||||
|
verifyPipelineExecution();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An exception thrown in case an abandoned {@link org.apache.beam.sdk.transforms.PTransform} is
|
||||||
|
* detected, that is, a {@link org.apache.beam.sdk.transforms.PTransform} that has not been run.
|
||||||
|
*/
|
||||||
|
public static class AbandonedNodeException extends RuntimeException {
|
||||||
|
|
||||||
|
AbandonedNodeException(final String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** An exception thrown in case a test finishes without invoking {@link Pipeline#run()}. */
|
||||||
|
public static class PipelineRunMissingException extends RuntimeException {
|
||||||
|
|
||||||
|
PipelineRunMissingException(final String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** System property used to set {@link TestPipelineOptions}. */
|
||||||
|
public static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
|
||||||
|
|
||||||
|
static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner";
|
||||||
|
|
||||||
|
private static final ObjectMapper MAPPER =
|
||||||
|
new ObjectMapper()
|
||||||
|
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
|
||||||
|
|
||||||
|
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
|
||||||
|
private Optional<? extends PipelineRunEnforcement> enforcement = Optional.absent();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates and returns a new test pipeline.
|
||||||
|
*
|
||||||
|
* <p>Use {@link PAssert} to add tests, then call {@link Pipeline#run} to execute the pipeline and
|
||||||
|
* check the tests.
|
||||||
|
*/
|
||||||
|
public static TestPipelineExtension create() {
|
||||||
|
return fromOptions(testingPipelineOptions());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static TestPipelineExtension fromOptions(PipelineOptions options) {
|
||||||
|
return new TestPipelineExtension(options);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TestPipelineExtension(final PipelineOptions options) {
|
||||||
|
super(options);
|
||||||
|
this.options = options;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PipelineOptions getOptions() {
|
||||||
|
return this.options;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Statement apply(final Statement statement, final Description description) {
|
||||||
|
return new Statement() {
|
||||||
|
|
||||||
|
private void setDeducedEnforcementLevel() {
|
||||||
|
// if the enforcement level has not been set by the user do auto-inference
|
||||||
|
if (!enforcement.isPresent()) {
|
||||||
|
|
||||||
|
final boolean annotatedWithNeedsRunner =
|
||||||
|
FluentIterable.from(description.getAnnotations())
|
||||||
|
.filter(Annotations.Predicates.isAnnotationOfType(Category.class))
|
||||||
|
.anyMatch(Annotations.Predicates.isCategoryOf(NeedsRunner.class, true));
|
||||||
|
|
||||||
|
final boolean crashingRunner = CrashingRunner.class.isAssignableFrom(options.getRunner());
|
||||||
|
|
||||||
|
checkState(
|
||||||
|
!(annotatedWithNeedsRunner && crashingRunner),
|
||||||
|
"The test was annotated with a [@%s] / [@%s] while the runner "
|
||||||
|
+ "was set to [%s]. Please re-check your configuration.",
|
||||||
|
NeedsRunner.class.getSimpleName(),
|
||||||
|
ValidatesRunner.class.getSimpleName(),
|
||||||
|
CrashingRunner.class.getSimpleName());
|
||||||
|
|
||||||
|
enableAbandonedNodeEnforcement(annotatedWithNeedsRunner || !crashingRunner);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void evaluate() throws Throwable {
|
||||||
|
options.as(ApplicationNameOptions.class).setAppName(getAppName(description));
|
||||||
|
|
||||||
|
setDeducedEnforcementLevel();
|
||||||
|
|
||||||
|
// statement.evaluate() essentially runs the user code contained in the unit test at hand.
|
||||||
|
// Exceptions thrown during the execution of the user's test code will propagate here,
|
||||||
|
// unless the user explicitly handles them with a "catch" clause in his code. If the
|
||||||
|
// exception is handled by a user's "catch" clause, is does not interrupt the flow and
|
||||||
|
// we move on to invoking the configured enforcements.
|
||||||
|
// If the user does not handle a thrown exception, it will propagate here and interrupt
|
||||||
|
// the flow, preventing the enforcement(s) from being activated.
|
||||||
|
// The motivation for this is avoiding enforcements over faulty pipelines.
|
||||||
|
statement.evaluate();
|
||||||
|
enforcement.get().afterUserCodeFinished();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs this {@link TestPipelineExtension}, unwrapping any {@code AssertionError} that is raised during
|
||||||
|
* testing.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public PipelineResult run() {
|
||||||
|
return run(getOptions());
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Like {@link #run} but with the given potentially modified options. */
|
||||||
|
@Override
|
||||||
|
public PipelineResult run(PipelineOptions options) {
|
||||||
|
checkState(
|
||||||
|
enforcement.isPresent(),
|
||||||
|
"Is your TestPipeline declaration missing a @Rule annotation? Usage: "
|
||||||
|
+ "@Rule public final transient TestPipeline pipeline = TestPipeline.create();");
|
||||||
|
|
||||||
|
final PipelineResult pipelineResult;
|
||||||
|
try {
|
||||||
|
enforcement.get().beforePipelineExecution();
|
||||||
|
PipelineOptions updatedOptions =
|
||||||
|
MAPPER.convertValue(MAPPER.valueToTree(options), PipelineOptions.class);
|
||||||
|
updatedOptions
|
||||||
|
.as(TestValueProviderOptions.class)
|
||||||
|
.setProviderRuntimeValues(StaticValueProvider.of(providerRuntimeValues));
|
||||||
|
pipelineResult = super.run(updatedOptions);
|
||||||
|
verifyPAssertsSucceeded(this, pipelineResult);
|
||||||
|
} catch (RuntimeException exc) {
|
||||||
|
Throwable cause = exc.getCause();
|
||||||
|
if (cause instanceof AssertionError) {
|
||||||
|
throw (AssertionError) cause;
|
||||||
|
} else {
|
||||||
|
throw exc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we reach this point, the pipeline has been run and no exceptions have been thrown during
|
||||||
|
// its execution.
|
||||||
|
enforcement.get().afterPipelineExecution();
|
||||||
|
return pipelineResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Implementation detail of {@link #newProvider}, do not use. */
|
||||||
|
@Internal
|
||||||
|
public interface TestValueProviderOptions extends PipelineOptions {
|
||||||
|
ValueProvider<Map<String, Object>> getProviderRuntimeValues();
|
||||||
|
|
||||||
|
void setProviderRuntimeValues(ValueProvider<Map<String, Object>> runtimeValues);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a new {@link ValueProvider} that is inaccessible before {@link #run}, but will be
|
||||||
|
* accessible while the pipeline runs.
|
||||||
|
*/
|
||||||
|
public <T> ValueProvider<T> newProvider(T runtimeValue) {
|
||||||
|
String uuid = UUID.randomUUID().toString();
|
||||||
|
providerRuntimeValues.put(uuid, runtimeValue);
|
||||||
|
return ValueProvider.NestedValueProvider.of(
|
||||||
|
options.as(TestValueProviderOptions.class).getProviderRuntimeValues(),
|
||||||
|
new GetFromRuntimeValues<T>(uuid));
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Map<String, Object> providerRuntimeValues = Maps.newHashMap();
|
||||||
|
|
||||||
|
private static class GetFromRuntimeValues<T>
|
||||||
|
implements SerializableFunction<Map<String, Object>, T> {
|
||||||
|
private final String key;
|
||||||
|
|
||||||
|
private GetFromRuntimeValues(String key) {
|
||||||
|
this.key = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T apply(Map<String, Object> input) {
|
||||||
|
return (T) input.get(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enables the abandoned node detection. Abandoned nodes are <code>PTransforms</code>, <code>
|
||||||
|
* PAsserts</code> included, that were not executed by the pipeline runner. Abandoned nodes are
|
||||||
|
* most likely to occur due to the one of the following scenarios:
|
||||||
|
*
|
||||||
|
* <ul>
|
||||||
|
* <li>Lack of a <code>pipeline.run()</code> statement at the end of a test.
|
||||||
|
* <li>Addition of PTransforms after the pipeline has already run.
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
* Abandoned node detection is automatically enabled when a real pipeline runner (i.e. not a
|
||||||
|
* {@link CrashingRunner}) and/or a {@link NeedsRunner} or a {@link ValidatesRunner} annotation
|
||||||
|
* are detected.
|
||||||
|
*/
|
||||||
|
public TestPipelineExtension enableAbandonedNodeEnforcement(final boolean enable) {
|
||||||
|
enforcement =
|
||||||
|
enable
|
||||||
|
? Optional.of(new PipelineAbandonedNodeEnforcement(this))
|
||||||
|
: Optional.of(new PipelineRunEnforcement(this));
|
||||||
|
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If enabled, a <code>pipeline.run()</code> statement will be added automatically in case it is
|
||||||
|
* missing in the test.
|
||||||
|
*/
|
||||||
|
public TestPipelineExtension enableAutoRunIfMissing(final boolean enable) {
|
||||||
|
enforcement.get().enableAutoRunIfMissing(enable);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "TestPipeline#" + options.as(ApplicationNameOptions.class).getAppName();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Creates {@link PipelineOptions} for testing. */
|
||||||
|
public static PipelineOptions testingPipelineOptions() {
|
||||||
|
try {
|
||||||
|
@Nullable
|
||||||
|
String beamTestPipelineOptions = System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS);
|
||||||
|
|
||||||
|
PipelineOptions options =
|
||||||
|
Strings.isNullOrEmpty(beamTestPipelineOptions)
|
||||||
|
? PipelineOptionsFactory.create()
|
||||||
|
: PipelineOptionsFactory.fromArgs(
|
||||||
|
MAPPER.readValue(beamTestPipelineOptions, String[].class))
|
||||||
|
.as(TestPipelineOptions.class);
|
||||||
|
|
||||||
|
// If no options were specified, set some reasonable defaults
|
||||||
|
if (Strings.isNullOrEmpty(beamTestPipelineOptions)) {
|
||||||
|
// If there are no provided options, check to see if a dummy runner should be used.
|
||||||
|
String useDefaultDummy = System.getProperty(PROPERTY_USE_DEFAULT_DUMMY_RUNNER);
|
||||||
|
if (!Strings.isNullOrEmpty(useDefaultDummy) && Boolean.valueOf(useDefaultDummy)) {
|
||||||
|
options.setRunner(CrashingRunner.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
options.setStableUniqueNames(CheckEnabled.ERROR);
|
||||||
|
|
||||||
|
FileSystems.setDefaultPipelineOptions(options);
|
||||||
|
return options;
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(
|
||||||
|
"Unable to instantiate test options from system property "
|
||||||
|
+ PROPERTY_BEAM_TEST_PIPELINE_OPTIONS
|
||||||
|
+ ":"
|
||||||
|
+ System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS),
|
||||||
|
e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Returns the class + method name of the test. */
|
||||||
|
private String getAppName(Description description) {
|
||||||
|
String methodName = description.getMethodName();
|
||||||
|
Class<?> testClass = description.getTestClass();
|
||||||
|
if (testClass.isMemberClass()) {
|
||||||
|
return String.format(
|
||||||
|
"%s$%s-%s",
|
||||||
|
testClass.getEnclosingClass().getSimpleName(), testClass.getSimpleName(), methodName);
|
||||||
|
} else {
|
||||||
|
return String.format("%s-%s", testClass.getSimpleName(), methodName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies all {{@link PAssert PAsserts}} in the pipeline have been executed and were successful.
|
||||||
|
*
|
||||||
|
* <p>Note this only runs for runners which support Metrics. Runners which do not should verify
|
||||||
|
* this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001
|
||||||
|
*/
|
||||||
|
public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pipelineResult) {
|
||||||
|
if (MetricsEnvironment.isMetricsSupported()) {
|
||||||
|
long expectedNumberOfAssertions = (long) PAssert.countAsserts(pipeline);
|
||||||
|
|
||||||
|
long successfulAssertions = 0;
|
||||||
|
Iterable<MetricResult<Long>> successCounterResults =
|
||||||
|
pipelineResult
|
||||||
|
.metrics()
|
||||||
|
.queryMetrics(
|
||||||
|
MetricsFilter.builder()
|
||||||
|
.addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER))
|
||||||
|
.build())
|
||||||
|
.getCounters();
|
||||||
|
for (MetricResult<Long> counter : successCounterResults) {
|
||||||
|
if (counter.getAttempted() > 0) {
|
||||||
|
successfulAssertions++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(
|
||||||
|
String.format(
|
||||||
|
"Expected %d successful assertions, but found %d.",
|
||||||
|
expectedNumberOfAssertions, successfulAssertions),
|
||||||
|
successfulAssertions,
|
||||||
|
is(expectedNumberOfAssertions));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class IsEmptyVisitor extends PipelineVisitor.Defaults {
|
||||||
|
private boolean empty = true;
|
||||||
|
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return empty;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
|
||||||
|
empty = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A utility class for querying annotations.
|
||||||
|
*
|
||||||
|
* <p>NOTE: This was copied from the Apache Beam project from a separate file only for visibility
|
||||||
|
* reasons (it's package-private there).
|
||||||
|
*/
|
||||||
|
static class Annotations {
|
||||||
|
|
||||||
|
/** Annotation predicates. */
|
||||||
|
static class Predicates {
|
||||||
|
|
||||||
|
static Predicate<Annotation> isAnnotationOfType(final Class<? extends Annotation> clazz) {
|
||||||
|
return annotation ->
|
||||||
|
annotation.annotationType() != null && annotation.annotationType().equals(clazz);
|
||||||
|
}
|
||||||
|
|
||||||
|
static Predicate<Annotation> isCategoryOf(final Class<?> value, final boolean allowDerived) {
|
||||||
|
return category ->
|
||||||
|
FluentIterable.from(Arrays.asList(((Category) category).value()))
|
||||||
|
.anyMatch(
|
||||||
|
aClass -> allowDerived ? value.isAssignableFrom(aClass) : value.equals(aClass));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,6 +21,7 @@ import static google.registry.testing.DatastoreHelper.newRegistry;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import google.registry.backup.VersionedEntity;
|
import google.registry.backup.VersionedEntity;
|
||||||
|
import google.registry.beam.TestPipelineExtension;
|
||||||
import google.registry.model.contact.ContactResource;
|
import google.registry.model.contact.ContactResource;
|
||||||
import google.registry.model.domain.DomainBase;
|
import google.registry.model.domain.DomainBase;
|
||||||
import google.registry.model.ofy.Ofy;
|
import google.registry.model.ofy.Ofy;
|
||||||
|
@ -34,7 +35,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
|
||||||
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
|
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
|
||||||
import org.apache.beam.sdk.testing.NeedsRunner;
|
import org.apache.beam.sdk.testing.NeedsRunner;
|
||||||
import org.apache.beam.sdk.testing.PAssert;
|
import org.apache.beam.sdk.testing.PAssert;
|
||||||
import org.apache.beam.sdk.testing.TestPipeline;
|
|
||||||
import org.apache.beam.sdk.transforms.Create;
|
import org.apache.beam.sdk.transforms.Create;
|
||||||
import org.apache.beam.sdk.transforms.DoFn;
|
import org.apache.beam.sdk.transforms.DoFn;
|
||||||
import org.apache.beam.sdk.transforms.ParDo;
|
import org.apache.beam.sdk.transforms.ParDo;
|
||||||
|
@ -51,8 +51,6 @@ import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.JUnit4;
|
import org.junit.runners.JUnit4;
|
||||||
|
|
||||||
/** Unit tests for {@link Transforms} related to loading CommitLogs. */
|
/** Unit tests for {@link Transforms} related to loading CommitLogs. */
|
||||||
// TODO(weiminyu): Upgrade to JUnit5 when TestPipeline is upgraded. It is also easy to adapt with
|
|
||||||
// a wrapper.
|
|
||||||
@RunWith(JUnit4.class)
|
@RunWith(JUnit4.class)
|
||||||
public class CommitLogTransformsTest implements Serializable {
|
public class CommitLogTransformsTest implements Serializable {
|
||||||
|
|
||||||
|
@ -63,8 +61,8 @@ public class CommitLogTransformsTest implements Serializable {
|
||||||
@Rule public final transient InjectRule injectRule = new InjectRule();
|
@Rule public final transient InjectRule injectRule = new InjectRule();
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final transient TestPipeline pipeline =
|
public final transient TestPipelineExtension pipeline =
|
||||||
TestPipeline.create().enableAbandonedNodeEnforcement(true);
|
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
|
||||||
|
|
||||||
private FakeClock fakeClock;
|
private FakeClock fakeClock;
|
||||||
private transient BackupTestStore store;
|
private transient BackupTestStore store;
|
||||||
|
|
|
@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.googlecode.objectify.Key;
|
import com.googlecode.objectify.Key;
|
||||||
import google.registry.backup.VersionedEntity;
|
import google.registry.backup.VersionedEntity;
|
||||||
|
import google.registry.beam.TestPipelineExtension;
|
||||||
import google.registry.model.contact.ContactResource;
|
import google.registry.model.contact.ContactResource;
|
||||||
import google.registry.model.domain.DomainBase;
|
import google.registry.model.domain.DomainBase;
|
||||||
import google.registry.model.ofy.Ofy;
|
import google.registry.model.ofy.Ofy;
|
||||||
|
@ -35,7 +36,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
|
||||||
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
|
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
|
||||||
import org.apache.beam.sdk.testing.NeedsRunner;
|
import org.apache.beam.sdk.testing.NeedsRunner;
|
||||||
import org.apache.beam.sdk.testing.PAssert;
|
import org.apache.beam.sdk.testing.PAssert;
|
||||||
import org.apache.beam.sdk.testing.TestPipeline;
|
|
||||||
import org.apache.beam.sdk.transforms.Create;
|
import org.apache.beam.sdk.transforms.Create;
|
||||||
import org.apache.beam.sdk.transforms.DoFn;
|
import org.apache.beam.sdk.transforms.DoFn;
|
||||||
import org.apache.beam.sdk.transforms.ParDo;
|
import org.apache.beam.sdk.transforms.ParDo;
|
||||||
|
@ -56,8 +56,6 @@ import org.junit.runners.JUnit4;
|
||||||
*
|
*
|
||||||
* <p>This class implements {@link Serializable} so that test {@link DoFn} classes may be inlined.
|
* <p>This class implements {@link Serializable} so that test {@link DoFn} classes may be inlined.
|
||||||
*/
|
*/
|
||||||
// TODO(weiminyu): Upgrade to JUnit5 when TestPipeline is upgraded. It is also easy to adapt with
|
|
||||||
// a wrapper.
|
|
||||||
@RunWith(JUnit4.class)
|
@RunWith(JUnit4.class)
|
||||||
public class ExportloadingTransformsTest implements Serializable {
|
public class ExportloadingTransformsTest implements Serializable {
|
||||||
private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
|
private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
|
||||||
|
@ -72,8 +70,8 @@ public class ExportloadingTransformsTest implements Serializable {
|
||||||
@Rule public final transient InjectRule injectRule = new InjectRule();
|
@Rule public final transient InjectRule injectRule = new InjectRule();
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final transient TestPipeline pipeline =
|
public final transient TestPipelineExtension pipeline =
|
||||||
TestPipeline.create().enableAbandonedNodeEnforcement(true);
|
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
|
||||||
|
|
||||||
private FakeClock fakeClock;
|
private FakeClock fakeClock;
|
||||||
private transient BackupTestStore store;
|
private transient BackupTestStore store;
|
||||||
|
|
|
@ -22,6 +22,7 @@ import com.google.appengine.api.datastore.Entity;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.googlecode.objectify.Key;
|
import com.googlecode.objectify.Key;
|
||||||
|
import google.registry.beam.TestPipelineExtension;
|
||||||
import google.registry.model.contact.ContactResource;
|
import google.registry.model.contact.ContactResource;
|
||||||
import google.registry.model.domain.DomainAuthInfo;
|
import google.registry.model.domain.DomainAuthInfo;
|
||||||
import google.registry.model.domain.DomainBase;
|
import google.registry.model.domain.DomainBase;
|
||||||
|
@ -32,7 +33,6 @@ import google.registry.testing.FakeClock;
|
||||||
import google.registry.testing.InjectRule;
|
import google.registry.testing.InjectRule;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import org.apache.beam.sdk.testing.NeedsRunner;
|
import org.apache.beam.sdk.testing.NeedsRunner;
|
||||||
import org.apache.beam.sdk.testing.TestPipeline;
|
|
||||||
import org.apache.beam.sdk.values.KV;
|
import org.apache.beam.sdk.values.KV;
|
||||||
import org.apache.beam.sdk.values.PCollectionTuple;
|
import org.apache.beam.sdk.values.PCollectionTuple;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
|
@ -85,8 +85,8 @@ public class LoadDatastoreSnapshotTest {
|
||||||
@Rule public final transient InjectRule injectRule = new InjectRule();
|
@Rule public final transient InjectRule injectRule = new InjectRule();
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final transient TestPipeline pipeline =
|
public final transient TestPipelineExtension pipeline =
|
||||||
TestPipeline.create().enableAbandonedNodeEnforcement(true);
|
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
|
||||||
|
|
||||||
private FakeClock fakeClock;
|
private FakeClock fakeClock;
|
||||||
private File exportRootDir;
|
private File exportRootDir;
|
||||||
|
|
|
@ -20,6 +20,7 @@ import static google.registry.persistence.transaction.TransactionManagerFactory.
|
||||||
import com.google.appengine.api.datastore.Entity;
|
import com.google.appengine.api.datastore.Entity;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import google.registry.backup.VersionedEntity;
|
import google.registry.backup.VersionedEntity;
|
||||||
|
import google.registry.beam.TestPipelineExtension;
|
||||||
import google.registry.model.contact.ContactResource;
|
import google.registry.model.contact.ContactResource;
|
||||||
import google.registry.model.ofy.Ofy;
|
import google.registry.model.ofy.Ofy;
|
||||||
import google.registry.model.registrar.Registrar;
|
import google.registry.model.registrar.Registrar;
|
||||||
|
@ -35,7 +36,6 @@ import java.io.PrintStream;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import org.apache.beam.sdk.testing.NeedsRunner;
|
import org.apache.beam.sdk.testing.NeedsRunner;
|
||||||
import org.apache.beam.sdk.testing.TestPipeline;
|
|
||||||
import org.apache.beam.sdk.transforms.Create;
|
import org.apache.beam.sdk.transforms.Create;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -65,8 +65,8 @@ public class WriteToSqlTest implements Serializable {
|
||||||
@Rule public transient TemporaryFolder temporaryFolder = new TemporaryFolder();
|
@Rule public transient TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final transient TestPipeline pipeline =
|
public final transient TestPipelineExtension pipeline =
|
||||||
TestPipeline.create().enableAbandonedNodeEnforcement(true);
|
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
|
||||||
|
|
||||||
private ImmutableList<Entity> contacts;
|
private ImmutableList<Entity> contacts;
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ import static com.google.common.truth.Truth.assertThat;
|
||||||
import com.google.auth.oauth2.GoogleCredentials;
|
import com.google.auth.oauth2.GoogleCredentials;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import google.registry.beam.TestPipelineExtension;
|
||||||
import google.registry.util.GoogleCredentialsBundle;
|
import google.registry.util.GoogleCredentialsBundle;
|
||||||
import google.registry.util.ResourceUtils;
|
import google.registry.util.ResourceUtils;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -30,7 +31,6 @@ import org.apache.beam.runners.direct.DirectRunner;
|
||||||
import org.apache.beam.sdk.options.PipelineOptions;
|
import org.apache.beam.sdk.options.PipelineOptions;
|
||||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||||
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
|
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
|
||||||
import org.apache.beam.sdk.testing.TestPipeline;
|
|
||||||
import org.apache.beam.sdk.transforms.Create;
|
import org.apache.beam.sdk.transforms.Create;
|
||||||
import org.apache.beam.sdk.values.PCollection;
|
import org.apache.beam.sdk.values.PCollection;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -53,7 +53,10 @@ public class InvoicingPipelineTest {
|
||||||
pipelineOptions.setRunner(DirectRunner.class);
|
pipelineOptions.setRunner(DirectRunner.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions);
|
@Rule
|
||||||
|
public final transient TestPipelineExtension p =
|
||||||
|
TestPipelineExtension.fromOptions(pipelineOptions);
|
||||||
|
|
||||||
@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
|
@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
|
||||||
|
|
||||||
private InvoicingPipeline invoicingPipeline;
|
private InvoicingPipeline invoicingPipeline;
|
||||||
|
|
|
@ -24,6 +24,7 @@ import static org.mockito.Mockito.withSettings;
|
||||||
import com.google.auth.oauth2.GoogleCredentials;
|
import com.google.auth.oauth2.GoogleCredentials;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.io.CharStreams;
|
import com.google.common.io.CharStreams;
|
||||||
|
import google.registry.beam.TestPipelineExtension;
|
||||||
import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn;
|
import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn;
|
||||||
import google.registry.testing.FakeClock;
|
import google.registry.testing.FakeClock;
|
||||||
import google.registry.testing.FakeSleeper;
|
import google.registry.testing.FakeSleeper;
|
||||||
|
@ -43,7 +44,6 @@ import org.apache.beam.runners.direct.DirectRunner;
|
||||||
import org.apache.beam.sdk.options.PipelineOptions;
|
import org.apache.beam.sdk.options.PipelineOptions;
|
||||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||||
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
|
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
|
||||||
import org.apache.beam.sdk.testing.TestPipeline;
|
|
||||||
import org.apache.beam.sdk.transforms.Create;
|
import org.apache.beam.sdk.transforms.Create;
|
||||||
import org.apache.beam.sdk.values.PCollection;
|
import org.apache.beam.sdk.values.PCollection;
|
||||||
import org.apache.http.ProtocolVersion;
|
import org.apache.http.ProtocolVersion;
|
||||||
|
@ -78,7 +78,10 @@ public class Spec11PipelineTest {
|
||||||
pipelineOptions.setRunner(DirectRunner.class);
|
pipelineOptions.setRunner(DirectRunner.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions);
|
@Rule
|
||||||
|
public final transient TestPipelineExtension p =
|
||||||
|
TestPipelineExtension.fromOptions(pipelineOptions);
|
||||||
|
|
||||||
@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
|
@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
|
||||||
|
|
||||||
private final Retrier retrier =
|
private final Retrier retrier =
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue