mirror of
https://github.com/google/nomulus.git
synced 2025-05-13 07:57:13 +02:00
Fix open source build
It broke because I forgot to add the new spec11 packages to gtld. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=206021827
This commit is contained in:
parent
d199b383e5
commit
c87fde605c
10 changed files with 976 additions and 0 deletions
26
java/google/registry/beam/spec11/BUILD
Normal file
26
java/google/registry/beam/spec11/BUILD
Normal file
|
@ -0,0 +1,26 @@
|
|||
package(
|
||||
default_visibility = ["//visibility:public"],
|
||||
)
|
||||
|
||||
licenses(["notice"]) # Apache 2.0
|
||||
|
||||
java_library(
|
||||
name = "spec11",
|
||||
srcs = glob(["*.java"]),
|
||||
resources = glob(["sql/*"]),
|
||||
deps = [
|
||||
"//java/google/registry/beam",
|
||||
"//java/google/registry/config",
|
||||
"@com_google_auto_value",
|
||||
"@com_google_dagger",
|
||||
"@com_google_flogger",
|
||||
"@com_google_flogger_system_backend",
|
||||
"@com_google_guava",
|
||||
"@javax_inject",
|
||||
"@org_apache_avro",
|
||||
"@org_apache_beam_runners_direct_java",
|
||||
"@org_apache_beam_runners_google_cloud_dataflow_java",
|
||||
"@org_apache_beam_sdks_java_core",
|
||||
"@org_apache_beam_sdks_java_io_google_cloud_platform",
|
||||
],
|
||||
)
|
118
java/google/registry/beam/spec11/Spec11Pipeline.java
Normal file
118
java/google/registry/beam/spec11/Spec11Pipeline.java
Normal file
|
@ -0,0 +1,118 @@
|
|||
// Copyright 2018 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.spec11;
|
||||
|
||||
import google.registry.config.RegistryConfig.Config;
|
||||
import java.io.Serializable;
|
||||
import javax.inject.Inject;
|
||||
import org.apache.beam.runners.dataflow.DataflowRunner;
|
||||
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
|
||||
import org.apache.beam.sdk.Pipeline;
|
||||
import org.apache.beam.sdk.coders.SerializableCoder;
|
||||
import org.apache.beam.sdk.io.TextIO;
|
||||
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
|
||||
import org.apache.beam.sdk.options.Description;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
import org.apache.beam.sdk.options.ValueProvider;
|
||||
import org.apache.beam.sdk.transforms.Count;
|
||||
import org.apache.beam.sdk.transforms.ToString;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
|
||||
/**
|
||||
* Definition of a Dataflow pipeline template, which generates a given month's spec11 report.
|
||||
*
|
||||
* <p>To stage this template on GCS, run the {@link
|
||||
* google.registry.tools.DeploySpec11PipelineCommand} Nomulus command.
|
||||
*
|
||||
* <p>Then, you can run the staged template via the API client library, gCloud or a raw REST call.
|
||||
*
|
||||
* @see <a href="https://cloud.google.com/dataflow/docs/templates/overview">Dataflow Templates</a>
|
||||
*/
|
||||
public class Spec11Pipeline implements Serializable {
|
||||
|
||||
@Inject
|
||||
@Config("projectId")
|
||||
String projectId;
|
||||
|
||||
@Inject
|
||||
@Config("beamStagingUrl")
|
||||
String beamStagingUrl;
|
||||
|
||||
@Inject
|
||||
@Config("spec11TemplateUrl")
|
||||
String spec11TemplateUrl;
|
||||
|
||||
@Inject
|
||||
@Config("spec11BucketUrl")
|
||||
String spec11BucketUrl;
|
||||
|
||||
@Inject
|
||||
Spec11Pipeline() {}
|
||||
|
||||
/** Custom options for running the spec11 pipeline. */
|
||||
interface Spec11PipelineOptions extends DataflowPipelineOptions {
|
||||
/** Returns the yearMonth we're generating the report for, in yyyy-MM format. */
|
||||
@Description("The yearMonth we generate the report for, in yyyy-MM format.")
|
||||
ValueProvider<String> getYearMonth();
|
||||
|
||||
/**
|
||||
* Sets the yearMonth we generate invoices for.
|
||||
*
|
||||
* <p>This is implicitly set when executing the Dataflow template, by specifying the 'yearMonth
|
||||
* parameter.
|
||||
*/
|
||||
void setYearMonth(ValueProvider<String> value);
|
||||
}
|
||||
|
||||
/** Deploys the spec11 pipeline as a template on GCS, for a given projectID and GCS bucket. */
|
||||
public void deploy() {
|
||||
// We can't store options as a member variable due to serialization concerns.
|
||||
Spec11PipelineOptions options = PipelineOptionsFactory.as(Spec11PipelineOptions.class);
|
||||
options.setProject(projectId);
|
||||
options.setRunner(DataflowRunner.class);
|
||||
// This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it.
|
||||
options.setTemplateLocation(spec11TemplateUrl);
|
||||
options.setStagingLocation(beamStagingUrl);
|
||||
Pipeline p = Pipeline.create(options);
|
||||
PCollection<Subdomain> domains =
|
||||
p.apply(
|
||||
"Read active domains from BigQuery",
|
||||
BigQueryIO.read(Subdomain::parseFromRecord)
|
||||
.fromQuery(
|
||||
// This query must be customized for your own use.
|
||||
"SELECT * FROM YOUR_TABLE_HERE")
|
||||
.withCoder(SerializableCoder.of(Subdomain.class))
|
||||
.usingStandardSql()
|
||||
.withoutValidation()
|
||||
.withTemplateCompatibility());
|
||||
countDomainsAndOutputResults(domains);
|
||||
p.run();
|
||||
}
|
||||
|
||||
/** Globally count the number of elements and output the results to GCS. */
|
||||
void countDomainsAndOutputResults(PCollection<Subdomain> domains) {
|
||||
// TODO(b/111545355): Actually process each domain with the SafeBrowsing API
|
||||
domains
|
||||
.apply("Count number of subdomains", Count.globally())
|
||||
.apply("Convert global count to string", ToString.elements())
|
||||
.apply(
|
||||
"Output to text file",
|
||||
TextIO.write()
|
||||
// TODO(b/111545355): Replace this with a templated directory based on yearMonth
|
||||
.to(spec11BucketUrl)
|
||||
.withoutSharding()
|
||||
.withHeader("HELLO WORLD"));
|
||||
}
|
||||
}
|
84
java/google/registry/beam/spec11/Subdomain.java
Normal file
84
java/google/registry/beam/spec11/Subdomain.java
Normal file
|
@ -0,0 +1,84 @@
|
|||
// Copyright 2018 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.spec11;
|
||||
|
||||
import static google.registry.beam.BeamUtils.checkFieldsNotNull;
|
||||
import static google.registry.beam.BeamUtils.extractField;
|
||||
|
||||
import com.google.auto.value.AutoValue;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.flogger.FluentLogger;
|
||||
import java.io.Serializable;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
|
||||
|
||||
/**
|
||||
* A POJO representing a single subdomain, parsed from a {@code SchemaAndRecord}.
|
||||
*
|
||||
* <p>This is a trivially serializable class that allows Beam to transform the results of a Bigquery
|
||||
* query into a standard Java representation, giving us the type guarantees and ease of manipulation
|
||||
* Bigquery lacks, while localizing any Bigquery-side failures to the {@link #parseFromRecord}
|
||||
* function.
|
||||
*/
|
||||
@AutoValue
|
||||
public abstract class Subdomain implements Serializable {
|
||||
|
||||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||
|
||||
private static final ImmutableList<String> FIELD_NAMES =
|
||||
ImmutableList.of("fullyQualifiedDomainName", "statuses", "creationTime");
|
||||
|
||||
/** Returns the fully qualified domain name. */
|
||||
abstract String fullyQualifiedDomainName();
|
||||
/** Returns the UTC DateTime this domain was created. */
|
||||
abstract ZonedDateTime creationTime();
|
||||
/** Returns the space-delimited list of statuses on this domain. */
|
||||
abstract String statuses();
|
||||
|
||||
/**
|
||||
* Constructs a {@link Subdomain} from an Apache Avro {@code SchemaAndRecord}.
|
||||
*
|
||||
* @see <a
|
||||
* href=http://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/generic/GenericData.Record.html>
|
||||
* Apache AVRO GenericRecord</a>
|
||||
*/
|
||||
static Subdomain parseFromRecord(SchemaAndRecord schemaAndRecord) {
|
||||
checkFieldsNotNull(FIELD_NAMES, schemaAndRecord);
|
||||
GenericRecord record = schemaAndRecord.getRecord();
|
||||
return create(
|
||||
extractField(record, "fullyQualifiedDomainName"),
|
||||
// Bigquery provides UNIX timestamps with microsecond precision.
|
||||
Instant.ofEpochMilli(Long.parseLong(extractField(record, "creationTime")) / 1000)
|
||||
.atZone(ZoneId.of("UTC")),
|
||||
extractField(record, "statuses"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a concrete {@link Subdomain}.
|
||||
*
|
||||
* <p>This should only be used outside this class for testing- instances of {@link Subdomain}
|
||||
* should otherwise come from {@link #parseFromRecord}.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static Subdomain create(
|
||||
String fullyQualifiedDomainName, ZonedDateTime creationTime, String statuses) {
|
||||
return new AutoValue_Subdomain(fullyQualifiedDomainName, creationTime, statuses);
|
||||
}
|
||||
}
|
||||
|
39
javatests/google/registry/beam/invoicing/BUILD
Normal file
39
javatests/google/registry/beam/invoicing/BUILD
Normal file
|
@ -0,0 +1,39 @@
|
|||
package(
|
||||
default_testonly = 1,
|
||||
default_visibility = ["//java/google/registry:registry_project"],
|
||||
)
|
||||
|
||||
licenses(["notice"]) # Apache 2.0
|
||||
|
||||
load("//java/com/google/testing/builddefs:GenTestRules.bzl", "GenTestRules")
|
||||
|
||||
java_library(
|
||||
name = "invoicing",
|
||||
srcs = glob(["*.java"]),
|
||||
resources = glob(["testdata/*"]),
|
||||
deps = [
|
||||
"//java/google/registry/beam/invoicing",
|
||||
"//java/google/registry/util",
|
||||
"//javatests/google/registry/testing",
|
||||
"@com_google_apis_google_api_services_bigquery",
|
||||
"@com_google_dagger",
|
||||
"@com_google_guava",
|
||||
"@com_google_truth",
|
||||
"@com_google_truth_extensions_truth_java8_extension",
|
||||
"@junit",
|
||||
"@org_apache_avro",
|
||||
"@org_apache_beam_runners_direct_java",
|
||||
"@org_apache_beam_runners_google_cloud_dataflow_java",
|
||||
"@org_apache_beam_sdks_java_core",
|
||||
"@org_apache_beam_sdks_java_io_google_cloud_platform",
|
||||
"@org_mockito_all",
|
||||
],
|
||||
)
|
||||
|
||||
GenTestRules(
|
||||
name = "GeneratedTestRules",
|
||||
default_test_size = "small",
|
||||
medium_tests = ["InvoicingPipelineTest.java"],
|
||||
test_files = glob(["*Test.java"]),
|
||||
deps = [":invoicing"],
|
||||
)
|
188
javatests/google/registry/beam/invoicing/BillingEventTest.java
Normal file
188
javatests/google/registry/beam/invoicing/BillingEventTest.java
Normal file
|
@ -0,0 +1,188 @@
|
|||
// Copyright 2018 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.invoicing;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static google.registry.testing.JUnitBackports.assertThrows;
|
||||
|
||||
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey;
|
||||
import google.registry.beam.invoicing.BillingEvent.InvoiceGroupingKey.InvoiceGroupingKeyCoder;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
/** Unit tests for {@link BillingEvent} */
|
||||
@RunWith(JUnit4.class)
|
||||
public class BillingEventTest {
|
||||
|
||||
private static final String BILLING_EVENT_SCHEMA =
|
||||
"{\"name\": \"BillingEvent\", "
|
||||
+ "\"type\": \"record\", "
|
||||
+ "\"fields\": ["
|
||||
+ "{\"name\": \"id\", \"type\": \"long\"},"
|
||||
+ "{\"name\": \"billingTime\", \"type\": \"string\"},"
|
||||
+ "{\"name\": \"eventTime\", \"type\": \"string\"},"
|
||||
+ "{\"name\": \"registrarId\", \"type\": \"string\"},"
|
||||
+ "{\"name\": \"billingId\", \"type\": \"long\"},"
|
||||
+ "{\"name\": \"tld\", \"type\": \"string\"},"
|
||||
+ "{\"name\": \"action\", \"type\": \"string\"},"
|
||||
+ "{\"name\": \"domain\", \"type\": \"string\"},"
|
||||
+ "{\"name\": \"repositoryId\", \"type\": \"string\"},"
|
||||
+ "{\"name\": \"years\", \"type\": \"int\"},"
|
||||
+ "{\"name\": \"currency\", \"type\": \"string\"},"
|
||||
+ "{\"name\": \"amount\", \"type\": \"float\"},"
|
||||
+ "{\"name\": \"flags\", \"type\": \"string\"}"
|
||||
+ "]}";
|
||||
|
||||
private SchemaAndRecord schemaAndRecord;
|
||||
|
||||
@Before
|
||||
public void initializeRecord() {
|
||||
// Create a record with a given JSON schema.
|
||||
GenericRecord record = new GenericData.Record(new Schema.Parser().parse(BILLING_EVENT_SCHEMA));
|
||||
record.put("id", "1");
|
||||
record.put("billingTime", 1508835963000000L);
|
||||
record.put("eventTime", 1484870383000000L);
|
||||
record.put("registrarId", "myRegistrar");
|
||||
record.put("billingId", "12345-CRRHELLO");
|
||||
record.put("tld", "test");
|
||||
record.put("action", "RENEW");
|
||||
record.put("domain", "example.test");
|
||||
record.put("repositoryId", "123456");
|
||||
record.put("years", 5);
|
||||
record.put("currency", "USD");
|
||||
record.put("amount", 20.5);
|
||||
record.put("flags", "AUTO_RENEW SYNTHETIC");
|
||||
schemaAndRecord = new SchemaAndRecord(record, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseBillingEventFromRecord_success() {
|
||||
BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
|
||||
assertThat(event.id()).isEqualTo(1);
|
||||
assertThat(event.billingTime())
|
||||
.isEqualTo(ZonedDateTime.of(2017, 10, 24, 9, 6, 3, 0, ZoneId.of("UTC")));
|
||||
assertThat(event.eventTime())
|
||||
.isEqualTo(ZonedDateTime.of(2017, 1, 19, 23, 59, 43, 0, ZoneId.of("UTC")));
|
||||
assertThat(event.registrarId()).isEqualTo("myRegistrar");
|
||||
assertThat(event.billingId()).isEqualTo("12345-CRRHELLO");
|
||||
assertThat(event.tld()).isEqualTo("test");
|
||||
assertThat(event.action()).isEqualTo("RENEW");
|
||||
assertThat(event.domain()).isEqualTo("example.test");
|
||||
assertThat(event.repositoryId()).isEqualTo("123456");
|
||||
assertThat(event.years()).isEqualTo(5);
|
||||
assertThat(event.currency()).isEqualTo("USD");
|
||||
assertThat(event.amount()).isEqualTo(20.5);
|
||||
assertThat(event.flags()).isEqualTo("AUTO_RENEW SYNTHETIC");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseBillingEventFromRecord_sunriseCreate_reducedPrice_success() {
|
||||
schemaAndRecord.getRecord().put("flags", "SUNRISE");
|
||||
BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
|
||||
assertThat(event.amount()).isEqualTo(17.43);
|
||||
assertThat(event.flags()).isEqualTo("SUNRISE");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseBillingEventFromRecord_anchorTenant_zeroPrice_success() {
|
||||
schemaAndRecord.getRecord().put("flags", "SUNRISE ANCHOR_TENANT");
|
||||
BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
|
||||
assertThat(event.amount()).isZero();
|
||||
assertThat(event.flags()).isEqualTo("SUNRISE ANCHOR_TENANT");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseBillingEventFromRecord_nullValue_throwsException() {
|
||||
schemaAndRecord.getRecord().put("tld", null);
|
||||
assertThrows(IllegalStateException.class, () -> BillingEvent.parseFromRecord(schemaAndRecord));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertBillingEvent_toCsv() {
|
||||
BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
|
||||
assertThat(event.toCsv())
|
||||
.isEqualTo("1,2017-10-24 09:06:03 UTC,2017-01-19 23:59:43 UTC,myRegistrar,"
|
||||
+ "12345-CRRHELLO,test,RENEW,example.test,123456,5,USD,20.50,AUTO_RENEW");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGenerateBillingEventFilename() {
|
||||
BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
|
||||
assertThat(event.toFilename("2017-10")).isEqualTo("invoice_details_2017-10_myRegistrar_test");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetInvoiceGroupingKey_fromBillingEvent() {
|
||||
BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
|
||||
InvoiceGroupingKey invoiceKey = event.getInvoiceGroupingKey();
|
||||
assertThat(invoiceKey.startDate()).isEqualTo("2017-10-01");
|
||||
assertThat(invoiceKey.endDate()).isEqualTo("2022-09-30");
|
||||
assertThat(invoiceKey.productAccountKey()).isEqualTo("12345-CRRHELLO");
|
||||
assertThat(invoiceKey.usageGroupingKey()).isEqualTo("myRegistrar - test");
|
||||
assertThat(invoiceKey.description()).isEqualTo("RENEW | TLD: test | TERM: 5-year");
|
||||
assertThat(invoiceKey.unitPrice()).isEqualTo(20.5);
|
||||
assertThat(invoiceKey.unitPriceCurrency()).isEqualTo("USD");
|
||||
assertThat(invoiceKey.poNumber()).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertInvoiceGroupingKey_toCsv() {
|
||||
BillingEvent event = BillingEvent.parseFromRecord(schemaAndRecord);
|
||||
InvoiceGroupingKey invoiceKey = event.getInvoiceGroupingKey();
|
||||
assertThat(invoiceKey.toCsv(3L))
|
||||
.isEqualTo(
|
||||
"2017-10-01,2022-09-30,12345-CRRHELLO,61.50,USD,10125,1,PURCHASE,"
|
||||
+ "myRegistrar - test,3,RENEW | TLD: test | TERM: 5-year,20.50,USD,");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvoiceGroupingKeyCoder_deterministicSerialization() throws IOException {
|
||||
InvoiceGroupingKey invoiceKey =
|
||||
BillingEvent.parseFromRecord(schemaAndRecord).getInvoiceGroupingKey();
|
||||
InvoiceGroupingKeyCoder coder = new InvoiceGroupingKeyCoder();
|
||||
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
|
||||
coder.encode(invoiceKey, outStream);
|
||||
InputStream inStream = new ByteArrayInputStream(outStream.toByteArray());
|
||||
assertThat(coder.decode(inStream)).isEqualTo(invoiceKey);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetDetailReportHeader() {
|
||||
assertThat(BillingEvent.getHeader())
|
||||
.isEqualTo(
|
||||
"id,billingTime,eventTime,registrarId,billingId,tld,action,"
|
||||
+ "domain,repositoryId,years,currency,amount,flags");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetOverallInvoiceHeader() {
|
||||
assertThat(InvoiceGroupingKey.invoiceHeader())
|
||||
.isEqualTo("StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode,"
|
||||
+ "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice,"
|
||||
+ "UnitPriceCurrency,PONumber");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,215 @@
|
|||
// Copyright 2018 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.invoicing;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import google.registry.util.ResourceUtils;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Map.Entry;
|
||||
import org.apache.beam.runners.direct.DirectRunner;
|
||||
import org.apache.beam.sdk.options.PipelineOptions;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
|
||||
import org.apache.beam.sdk.testing.TestPipeline;
|
||||
import org.apache.beam.sdk.transforms.Create;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
/** Unit tests for {@link InvoicingPipeline}. */
|
||||
@RunWith(JUnit4.class)
|
||||
public class InvoicingPipelineTest {
|
||||
|
||||
private static PipelineOptions pipelineOptions;
|
||||
|
||||
@BeforeClass
|
||||
public static void initializePipelineOptions() {
|
||||
pipelineOptions = PipelineOptionsFactory.create();
|
||||
pipelineOptions.setRunner(DirectRunner.class);
|
||||
}
|
||||
|
||||
@Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions);
|
||||
@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
|
||||
|
||||
private InvoicingPipeline invoicingPipeline;
|
||||
|
||||
@Before
|
||||
public void initializePipeline() throws IOException {
|
||||
invoicingPipeline = new InvoicingPipeline();
|
||||
invoicingPipeline.projectId = "test-project";
|
||||
File beamTempFolder = tempFolder.newFolder();
|
||||
invoicingPipeline.beamBucketUrl = beamTempFolder.getAbsolutePath();
|
||||
invoicingPipeline.beamStagingUrl = beamTempFolder.getAbsolutePath() + "/staging";
|
||||
invoicingPipeline.invoiceTemplateUrl =
|
||||
beamTempFolder.getAbsolutePath() + "/templates/invoicing";
|
||||
invoicingPipeline.billingBucketUrl = tempFolder.getRoot().getAbsolutePath();
|
||||
}
|
||||
|
||||
private ImmutableList<BillingEvent> getInputEvents() {
|
||||
return ImmutableList.of(
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"theRegistrar",
|
||||
"234",
|
||||
"test",
|
||||
"RENEW",
|
||||
"mydomain.test",
|
||||
"REPO-ID",
|
||||
3,
|
||||
"USD",
|
||||
20.5,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"theRegistrar",
|
||||
"234",
|
||||
"test",
|
||||
"RENEW",
|
||||
"mydomain2.test",
|
||||
"REPO-ID",
|
||||
3,
|
||||
"USD",
|
||||
20.5,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 2, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"theRegistrar",
|
||||
"234",
|
||||
"hello",
|
||||
"CREATE",
|
||||
"mydomain3.hello",
|
||||
"REPO-ID",
|
||||
5,
|
||||
"JPY",
|
||||
70.75,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"googledomains",
|
||||
"456",
|
||||
"test",
|
||||
"RENEW",
|
||||
"mydomain4.test",
|
||||
"REPO-ID",
|
||||
1,
|
||||
"USD",
|
||||
20.5,
|
||||
""),
|
||||
BillingEvent.create(
|
||||
1,
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
ZonedDateTime.of(2017, 10, 4, 0, 0, 0, 0, ZoneId.of("UTC")),
|
||||
"anotherRegistrar",
|
||||
"789",
|
||||
"test",
|
||||
"CREATE",
|
||||
"mydomain5.test",
|
||||
"REPO-ID",
|
||||
1,
|
||||
"USD",
|
||||
0,
|
||||
"SUNRISE ANCHOR_TENANT"));
|
||||
}
|
||||
|
||||
/** Returns a map from filename to expected contents for detail reports. */
|
||||
private ImmutableMap<String, ImmutableList<String>> getExpectedDetailReportMap() {
|
||||
return ImmutableMap.of(
|
||||
"invoice_details_2017-10_theRegistrar_test.csv",
|
||||
ImmutableList.of(
|
||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,"
|
||||
+ "test,RENEW,mydomain2.test,REPO-ID,3,USD,20.50,",
|
||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,theRegistrar,234,"
|
||||
+ "test,RENEW,mydomain.test,REPO-ID,3,USD,20.50,"),
|
||||
"invoice_details_2017-10_theRegistrar_hello.csv",
|
||||
ImmutableList.of(
|
||||
"1,2017-10-02 00:00:00 UTC,2017-09-29 00:00:00 UTC,theRegistrar,234,"
|
||||
+ "hello,CREATE,mydomain3.hello,REPO-ID,5,JPY,70.75,"),
|
||||
"invoice_details_2017-10_googledomains_test.csv",
|
||||
ImmutableList.of(
|
||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,googledomains,456,"
|
||||
+ "test,RENEW,mydomain4.test,REPO-ID,1,USD,20.50,"),
|
||||
"invoice_details_2017-10_anotherRegistrar_test.csv",
|
||||
ImmutableList.of(
|
||||
"1,2017-10-04 00:00:00 UTC,2017-10-04 00:00:00 UTC,anotherRegistrar,789,"
|
||||
+ "test,CREATE,mydomain5.test,REPO-ID,1,USD,0.00,SUNRISE ANCHOR_TENANT"));
|
||||
}
|
||||
|
||||
private ImmutableList<String> getExpectedInvoiceOutput() {
|
||||
return ImmutableList.of(
|
||||
"2017-10-01,2020-09-30,234,41.00,USD,10125,1,PURCHASE,theRegistrar - test,2,"
|
||||
+ "RENEW | TLD: test | TERM: 3-year,20.50,USD,",
|
||||
"2017-10-01,2022-09-30,234,70.75,JPY,10125,1,PURCHASE,theRegistrar - hello,1,"
|
||||
+ "CREATE | TLD: hello | TERM: 5-year,70.75,JPY,",
|
||||
"2017-10-01,2018-09-30,456,20.50,USD,10125,1,PURCHASE,googledomains - test,1,"
|
||||
+ "RENEW | TLD: test | TERM: 1-year,20.50,USD,",
|
||||
"2017-10-01,2018-09-30,789,0.00,USD,10125,1,PURCHASE,anotherRegistrar - test,1,"
|
||||
+ "CREATE | TLD: test | TERM: 1-year,0.00,USD,");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEndToEndPipeline_generatesExpectedFiles() throws Exception {
|
||||
ImmutableList<BillingEvent> inputRows = getInputEvents();
|
||||
PCollection<BillingEvent> input = p.apply(Create.of(inputRows));
|
||||
invoicingPipeline.applyTerminalTransforms(input, StaticValueProvider.of("2017-10"));
|
||||
p.run();
|
||||
|
||||
for (Entry<String, ImmutableList<String>> entry : getExpectedDetailReportMap().entrySet()) {
|
||||
ImmutableList<String> detailReport = resultFileContents(entry.getKey());
|
||||
assertThat(detailReport.get(0))
|
||||
.isEqualTo("id,billingTime,eventTime,registrarId,billingId,tld,action,"
|
||||
+ "domain,repositoryId,years,currency,amount,flags");
|
||||
assertThat(detailReport.subList(1, detailReport.size()))
|
||||
.containsExactlyElementsIn(entry.getValue());
|
||||
}
|
||||
|
||||
ImmutableList<String> overallInvoice = resultFileContents("CRR-INV-2017-10.csv");
|
||||
assertThat(overallInvoice.get(0))
|
||||
.isEqualTo(
|
||||
"StartDate,EndDate,ProductAccountKey,Amount,AmountCurrency,BillingProductCode,"
|
||||
+ "SalesChannel,LineItemType,UsageGroupingKey,Quantity,Description,UnitPrice,"
|
||||
+ "UnitPriceCurrency,PONumber");
|
||||
assertThat(overallInvoice.subList(1, overallInvoice.size()))
|
||||
.containsExactlyElementsIn(getExpectedInvoiceOutput());
|
||||
}
|
||||
|
||||
/** Returns the text contents of a file under the beamBucket/results directory. */
|
||||
private ImmutableList<String> resultFileContents(String filename) throws Exception {
|
||||
File resultFile =
|
||||
new File(
|
||||
String.format(
|
||||
"%s/invoices/2017-10/%s", tempFolder.getRoot().getAbsolutePath(), filename));
|
||||
return ImmutableList.copyOf(
|
||||
ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n"));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
// Copyright 2018 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.invoicing;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import google.registry.testing.TestDataHelper;
|
||||
import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
|
||||
import org.apache.beam.sdk.io.FileBasedSink;
|
||||
import org.apache.beam.sdk.options.ValueProvider;
|
||||
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
|
||||
import org.apache.beam.sdk.transforms.SerializableFunction;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
/** Unit tests for {@link InvoicingUtils}. */
|
||||
@RunWith(JUnit4.class)
|
||||
public class InvoicingUtilsTest {
|
||||
|
||||
@Test
|
||||
public void testDestinationFunction_generatesProperFileParams() {
|
||||
SerializableFunction<BillingEvent, Params> destinationFunction =
|
||||
InvoicingUtils.makeDestinationFunction("my/directory", StaticValueProvider.of("2017-10"));
|
||||
|
||||
BillingEvent billingEvent = mock(BillingEvent.class);
|
||||
// We mock BillingEvent to make the test independent of the implementation of toFilename()
|
||||
when(billingEvent.toFilename(any())).thenReturn("invoice_details_2017-10_registrar_tld");
|
||||
|
||||
assertThat(destinationFunction.apply(billingEvent))
|
||||
.isEqualTo(
|
||||
new Params()
|
||||
.withShardTemplate("")
|
||||
.withSuffix(".csv")
|
||||
.withBaseFilename(
|
||||
FileBasedSink.convertToFileResourceIfPossible(
|
||||
"my/directory/2017-10/invoice_details_2017-10_registrar_tld")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyDestinationParams() {
|
||||
assertThat(InvoicingUtils.makeEmptyDestinationParams("my/directory"))
|
||||
.isEqualTo(
|
||||
new Params()
|
||||
.withBaseFilename(
|
||||
FileBasedSink.convertToFileResourceIfPossible("my/directory/FAILURES")));
|
||||
}
|
||||
|
||||
/** Asserts that the instantiated sql template matches a golden expected file. */
|
||||
@Test
|
||||
public void testMakeQueryProvider() {
|
||||
ValueProvider<String> queryProvider =
|
||||
InvoicingUtils.makeQueryProvider(StaticValueProvider.of("2017-10"), "my-project-id");
|
||||
assertThat(queryProvider.get()).isEqualTo(loadFile("billing_events_test.sql"));
|
||||
}
|
||||
|
||||
/** Returns a {@link String} from a file in the {@code billing/testdata/} directory. */
|
||||
private static String loadFile(String filename) {
|
||||
return TestDataHelper.loadFile(InvoicingUtilsTest.class, filename);
|
||||
}
|
||||
}
|
100
javatests/google/registry/beam/invoicing/testdata/billing_events_test.sql
vendored
Normal file
100
javatests/google/registry/beam/invoicing/testdata/billing_events_test.sql
vendored
Normal file
|
@ -0,0 +1,100 @@
|
|||
#standardSQL
|
||||
-- Copyright 2017 The Nomulus Authors. All Rights Reserved.
|
||||
--
|
||||
-- Licensed under the Apache License, Version 2.0 (the "License");
|
||||
-- you may not use this file except in compliance with the License.
|
||||
-- You may obtain a copy of the License at
|
||||
--
|
||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||
--
|
||||
-- Unless required by applicable law or agreed to in writing, software
|
||||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
-- See the License for the specific language governing permissions and
|
||||
-- limitations under the License.
|
||||
|
||||
-- This query gathers all non-canceled billing events for a given
|
||||
-- YEAR_MONTH in yyyy-MM format.
|
||||
|
||||
SELECT
|
||||
__key__.id AS id,
|
||||
billingTime,
|
||||
eventTime,
|
||||
BillingEvent.clientId AS registrarId,
|
||||
RegistrarData.accountId AS billingId,
|
||||
tld,
|
||||
reason as action,
|
||||
targetId as domain,
|
||||
BillingEvent.domainRepoId as repositoryId,
|
||||
periodYears as years,
|
||||
BillingEvent.currency AS currency,
|
||||
BillingEvent.amount as amount,
|
||||
-- We'll strip out non-useful flags downstream
|
||||
ARRAY_TO_STRING(flags, " ") AS flags
|
||||
FROM (
|
||||
SELECT
|
||||
*,
|
||||
-- We store cost as "CURRENCY AMOUNT" such as "JPY 800" or "USD 20.00"
|
||||
SPLIT(cost, ' ')[OFFSET(0)] AS currency,
|
||||
SPLIT(cost, ' ')[OFFSET(1)] AS amount,
|
||||
-- Extract everything after the first dot in the domain as the TLD
|
||||
REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld,
|
||||
-- __key__.path looks like '"DomainBase", "<repoId>", ...'
|
||||
REGEXP_REPLACE(SPLIT(__key__.path, ', ')[OFFSET(1)], '"', '')
|
||||
AS domainRepoId,
|
||||
COALESCE(cancellationMatchingBillingEvent.path,
|
||||
__key__.path) AS cancellationMatchingPath
|
||||
FROM
|
||||
`my-project-id.latest_datastore_export.OneTime`
|
||||
-- Only include real TLDs (filter prober data)
|
||||
WHERE
|
||||
REGEXP_EXTRACT(targetId, r'[.](.+)') IN (
|
||||
SELECT
|
||||
tldStr
|
||||
FROM
|
||||
`my-project-id.latest_datastore_export.Registry`
|
||||
WHERE
|
||||
-- TODO(b/18092292): Add a filter for tldState (not PDT/PREDELEGATION)
|
||||
tldType = 'REAL') ) AS BillingEvent
|
||||
-- Gather billing ID from registrar table
|
||||
-- This is a 'JOIN' as opposed to 'LEFT JOIN' to filter out
|
||||
-- non-billable registrars
|
||||
JOIN (
|
||||
SELECT
|
||||
__key__.name AS clientId,
|
||||
billingIdentifier,
|
||||
r.billingAccountMap.currency[SAFE_OFFSET(index)] AS currency,
|
||||
r.billingAccountMap.accountId[SAFE_OFFSET(index)] AS accountId
|
||||
FROM
|
||||
`my-project-id.latest_datastore_export.Registrar` AS r,
|
||||
UNNEST(GENERATE_ARRAY(0, ARRAY_LENGTH(r.billingAccountMap.currency) - 1))
|
||||
AS index
|
||||
WHERE billingAccountMap IS NOT NULL
|
||||
AND type = 'REAL') AS RegistrarData
|
||||
ON
|
||||
BillingEvent.clientId = RegistrarData.clientId
|
||||
AND BillingEvent.currency = RegistrarData.currency
|
||||
-- Gather cancellations
|
||||
LEFT JOIN (
|
||||
SELECT __key__.id AS cancellationId,
|
||||
COALESCE(refOneTime.path, refRecurring.path) AS cancelledEventPath,
|
||||
eventTime as cancellationTime,
|
||||
billingTime as cancellationBillingTime
|
||||
FROM
|
||||
(SELECT
|
||||
*,
|
||||
-- Count everything after first dot as TLD (to support multi-part TLDs).
|
||||
REGEXP_EXTRACT(targetId, r'[.](.+)') AS tld
|
||||
FROM
|
||||
`my-project-id.latest_datastore_export.Cancellation`)
|
||||
) AS Cancellation
|
||||
ON BillingEvent.cancellationMatchingPath = Cancellation.cancelledEventPath
|
||||
AND BillingEvent.billingTime = Cancellation.cancellationBillingTime
|
||||
WHERE billingTime BETWEEN TIMESTAMP('2017-10-01 00:00:00.000000')
|
||||
AND TIMESTAMP('2017-10-31 23:59:59.999999')
|
||||
-- Filter out canceled events
|
||||
AND Cancellation.cancellationId IS NULL
|
||||
ORDER BY
|
||||
billingTime DESC,
|
||||
id,
|
||||
tld
|
36
javatests/google/registry/beam/spec11/BUILD
Normal file
36
javatests/google/registry/beam/spec11/BUILD
Normal file
|
@ -0,0 +1,36 @@
|
|||
package(
|
||||
default_testonly = 1,
|
||||
default_visibility = ["//java/google/registry:registry_project"],
|
||||
)
|
||||
|
||||
licenses(["notice"]) # Apache 2.0
|
||||
|
||||
load("//java/com/google/testing/builddefs:GenTestRules.bzl", "GenTestRules")
|
||||
|
||||
java_library(
|
||||
name = "spec11",
|
||||
srcs = glob(["*.java"]),
|
||||
deps = [
|
||||
"//java/google/registry/beam/spec11",
|
||||
"//java/google/registry/util",
|
||||
"//javatests/google/registry/testing",
|
||||
"@com_google_dagger",
|
||||
"@com_google_guava",
|
||||
"@com_google_truth",
|
||||
"@com_google_truth_extensions_truth_java8_extension",
|
||||
"@junit",
|
||||
"@org_apache_avro",
|
||||
"@org_apache_beam_runners_direct_java",
|
||||
"@org_apache_beam_runners_google_cloud_dataflow_java",
|
||||
"@org_apache_beam_sdks_java_core",
|
||||
"@org_apache_beam_sdks_java_io_google_cloud_platform",
|
||||
"@org_mockito_all",
|
||||
],
|
||||
)
|
||||
|
||||
GenTestRules(
|
||||
name = "GeneratedTestRules",
|
||||
default_test_size = "small",
|
||||
test_files = glob(["*Test.java"]),
|
||||
deps = [":spec11"],
|
||||
)
|
|
@ -0,0 +1,94 @@
|
|||
// Copyright 2018 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.spec11;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import google.registry.util.ResourceUtils;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import org.apache.beam.runners.direct.DirectRunner;
|
||||
import org.apache.beam.sdk.options.PipelineOptions;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
import org.apache.beam.sdk.testing.TestPipeline;
|
||||
import org.apache.beam.sdk.transforms.Create;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
/** Unit tests for {@link Spec11Pipeline}. */
|
||||
@RunWith(JUnit4.class)
|
||||
public class Spec11PipelineTest {
|
||||
|
||||
private static PipelineOptions pipelineOptions;
|
||||
|
||||
@BeforeClass
|
||||
public static void initializePipelineOptions() {
|
||||
pipelineOptions = PipelineOptionsFactory.create();
|
||||
pipelineOptions.setRunner(DirectRunner.class);
|
||||
}
|
||||
|
||||
@Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions);
|
||||
@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
|
||||
|
||||
private Spec11Pipeline spec11Pipeline;
|
||||
|
||||
@Before
|
||||
public void initializePipeline() throws IOException {
|
||||
spec11Pipeline = new Spec11Pipeline();
|
||||
spec11Pipeline.projectId = "test-project";
|
||||
spec11Pipeline.spec11BucketUrl = tempFolder.getRoot().getAbsolutePath() + "/results";
|
||||
File beamTempFolder = tempFolder.newFolder();
|
||||
spec11Pipeline.beamStagingUrl = beamTempFolder.getAbsolutePath() + "/staging";
|
||||
spec11Pipeline.spec11TemplateUrl = beamTempFolder.getAbsolutePath() + "/templates/invoicing";
|
||||
}
|
||||
|
||||
private ImmutableList<Subdomain> getInputDomains() {
|
||||
return ImmutableList.of(
|
||||
Subdomain.create(
|
||||
"a.com", ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")), "OK"),
|
||||
Subdomain.create(
|
||||
"b.com", ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")), "OK"),
|
||||
Subdomain.create(
|
||||
"c.com", ZonedDateTime.of(2017, 9, 29, 0, 0, 0, 0, ZoneId.of("UTC")), "OK"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEndToEndPipeline_generatesExpectedFiles() throws Exception {
|
||||
ImmutableList<Subdomain> inputRows = getInputDomains();
|
||||
PCollection<Subdomain> input = p.apply(Create.of(inputRows));
|
||||
spec11Pipeline.countDomainsAndOutputResults(input);
|
||||
p.run();
|
||||
|
||||
ImmutableList<String> generatedReport = resultFileContents();
|
||||
assertThat(generatedReport.get(0)).isEqualTo("HELLO WORLD");
|
||||
assertThat(generatedReport.get(1)).isEqualTo("3");
|
||||
}
|
||||
|
||||
/** Returns the text contents of a file under the beamBucket/results directory. */
|
||||
private ImmutableList<String> resultFileContents() throws Exception {
|
||||
File resultFile = new File(String.format("%s/results", tempFolder.getRoot().getAbsolutePath()));
|
||||
return ImmutableList.copyOf(
|
||||
ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n"));
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue