From d199b383e5dc5fdb3cc6dffd7f577684d2199b48 Mon Sep 17 00:00:00 2001 From: larryruili Date: Wed, 25 Jul 2018 08:34:58 -0700 Subject: [PATCH] Add preliminary spec11 monthly pipeline This adds the scaffolding for a basic Spec11 pipeline- it gathers all domains from all time for a given project and counts how many there are. I've factored out a few common utilities for beam pipelines to avoid excessive duplication. Future CLs will: - Actually process domains via the SafeBrowsing API - Generate a real spec11 report - Template queries based on the input YearMonth - Abstract more commonalities across beam pipelines to reduce boilerplate when adding new pipelines. TESTED: FOSS test passed, and ran successfully on alpha ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=205997741 --- java/google/registry/beam/BUILD | 11 +++ java/google/registry/beam/BeamUtils.java | 57 ++++++++++++ java/google/registry/beam/invoicing/BUILD | 1 + .../registry/beam/invoicing/BillingEvent.java | 31 +------ .../beam/invoicing/InvoicingPipeline.java | 6 +- .../registry/config/RegistryConfig.java | 36 ++++++-- .../reporting/icann/IcannReportingStager.java | 2 +- .../icann/IcannReportingUploadAction.java | 2 +- java/google/registry/tools/BUILD | 1 + .../tools/DeploySpec11PipelineCommand.java | 32 +++++++ java/google/registry/tools/RegistryTool.java | 1 + .../registry/tools/RegistryToolComponent.java | 1 + javatests/google/registry/beam/BUILD | 23 +++++ .../google/registry/beam/BeamUtilsTest.java | 86 +++++++++++++++++++ 14 files changed, 252 insertions(+), 38 deletions(-) create mode 100644 java/google/registry/beam/BeamUtils.java create mode 100644 java/google/registry/tools/DeploySpec11PipelineCommand.java create mode 100644 javatests/google/registry/beam/BeamUtilsTest.java diff --git a/java/google/registry/beam/BUILD b/java/google/registry/beam/BUILD index e96d0117d..842dcf407 100644 --- a/java/google/registry/beam/BUILD +++ b/java/google/registry/beam/BUILD @@ -6,4 +6,15 @@ licenses(["notice"]) # Apache 2.0 java_library( name = "beam", + srcs = glob(["*.java"]), + deps = [ + "@com_google_flogger", + "@com_google_flogger_system_backend", + "@com_google_guava", + "@org_apache_avro", + "@org_apache_beam_runners_direct_java", + "@org_apache_beam_runners_google_cloud_dataflow_java", + "@org_apache_beam_sdks_java_core", + "@org_apache_beam_sdks_java_io_google_cloud_platform", + ], ) diff --git a/java/google/registry/beam/BeamUtils.java b/java/google/registry/beam/BeamUtils.java new file mode 100644 index 000000000..faa603a81 --- /dev/null +++ b/java/google/registry/beam/BeamUtils.java @@ -0,0 +1,57 @@ +// Copyright 2018 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.flogger.FluentLogger; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; + +/** Static utilities for {@code Beam} pipelines. */ +public class BeamUtils { + + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + /** Extracts a string representation of a field in a {@link GenericRecord}. */ + public static String extractField(GenericRecord record, String fieldName) { + return String.valueOf(record.get(fieldName)); + } + + /** + * Checks that no expected fields in the record are missing. + * + *

Note that this simply makes sure the field is not null; it may still generate a parse error + * when interpreting the string representation of an object. + * + * @throws IllegalStateException if the record returns null for any field in {@code fieldNames} + */ + public static void checkFieldsNotNull( + ImmutableList fieldNames, SchemaAndRecord schemaAndRecord) { + GenericRecord record = schemaAndRecord.getRecord(); + ImmutableList nullFields = + fieldNames + .stream() + .filter(fieldName -> record.get(fieldName) == null) + .collect(ImmutableList.toImmutableList()); + String missingFieldList = Joiner.on(", ").join(nullFields); + if (!nullFields.isEmpty()) { + throw new IllegalStateException( + String.format( + "Read unexpected null value for field(s) %s for record %s", + missingFieldList, record)); + } + } +} diff --git a/java/google/registry/beam/invoicing/BUILD b/java/google/registry/beam/invoicing/BUILD index 155b119ec..af2e4b32b 100644 --- a/java/google/registry/beam/invoicing/BUILD +++ b/java/google/registry/beam/invoicing/BUILD @@ -9,6 +9,7 @@ java_library( srcs = glob(["*.java"]), resources = glob(["sql/*"]), deps = [ + "//java/google/registry/beam", "//java/google/registry/config", "//java/google/registry/model", "//java/google/registry/reporting/billing", diff --git a/java/google/registry/beam/invoicing/BillingEvent.java b/java/google/registry/beam/invoicing/BillingEvent.java index 719179154..69c108567 100644 --- a/java/google/registry/beam/invoicing/BillingEvent.java +++ b/java/google/registry/beam/invoicing/BillingEvent.java @@ -14,6 +14,9 @@ package google.registry.beam.invoicing; +import static google.registry.beam.BeamUtils.checkFieldsNotNull; +import static google.registry.beam.BeamUtils.extractField; + import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; @@ -108,7 +111,7 @@ public abstract class BillingEvent implements Serializable { * Apache AVRO GenericRecord */ static BillingEvent parseFromRecord(SchemaAndRecord schemaAndRecord) { - checkFieldsNotNull(schemaAndRecord); + checkFieldsNotNull(FIELD_NAMES, schemaAndRecord); GenericRecord record = schemaAndRecord.getRecord(); String flags = extractField(record, "flags"); double amount = getDiscountedAmount(Double.parseDouble(extractField(record, "amount")), flags); @@ -337,30 +340,4 @@ public abstract class BillingEvent implements Serializable { } } } - - /** Extracts a string representation of a field in a {@code GenericRecord}. */ - private static String extractField(GenericRecord record, String fieldName) { - return String.valueOf(record.get(fieldName)); - } - - /** - * Checks that no expected fields in the record are missing. - * - *

Note that this simply makes sure the field is not null; it may still generate a parse error - * in {@code parseFromRecord}. - */ - private static void checkFieldsNotNull(SchemaAndRecord schemaAndRecord) { - GenericRecord record = schemaAndRecord.getRecord(); - ImmutableList nullFields = - FIELD_NAMES - .stream() - .filter(fieldName -> record.get(fieldName) == null) - .collect(ImmutableList.toImmutableList()); - if (!nullFields.isEmpty()) { - logger.atSevere().log( - "Found unexpected null value(s) in field(s) %s for record %s", - Joiner.on(", ").join(nullFields), record); - throw new IllegalStateException("Read null value from Bigquery query"); - } - } } diff --git a/java/google/registry/beam/invoicing/InvoicingPipeline.java b/java/google/registry/beam/invoicing/InvoicingPipeline.java index 88c81a1cb..69c8c1f0f 100644 --- a/java/google/registry/beam/invoicing/InvoicingPipeline.java +++ b/java/google/registry/beam/invoicing/InvoicingPipeline.java @@ -67,8 +67,8 @@ public class InvoicingPipeline implements Serializable { String invoiceTemplateUrl; @Inject - @Config("invoiceStagingUrl") - String invoiceStagingUrl; + @Config("beamStagingUrl") + String beamStagingUrl; @Inject @Config("billingBucketUrl") @@ -99,7 +99,7 @@ public class InvoicingPipeline implements Serializable { options.setRunner(DataflowRunner.class); // This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it. options.setTemplateLocation(invoiceTemplateUrl); - options.setStagingLocation(invoiceStagingUrl); + options.setStagingLocation(beamStagingUrl); Pipeline p = Pipeline.create(options); PCollection billingEvents = diff --git a/java/google/registry/config/RegistryConfig.java b/java/google/registry/config/RegistryConfig.java index 428925a41..ea84636da 100644 --- a/java/google/registry/config/RegistryConfig.java +++ b/java/google/registry/config/RegistryConfig.java @@ -520,25 +520,38 @@ public final class RegistryConfig { } /** - * Returns the URL of the GCS location we store jar dependencies for the invoicing pipeline. + * Returns the URL of the GCS location for storing the monthly spec11 Beam template. * - * @see google.registry.beam.invoicing.InvoicingPipeline + * @see google.registry.beam.spec11.Spec11Pipeline */ @Provides - @Config("invoiceStagingUrl") + @Config("spec11TemplateUrl") + public static String provideSpec11TemplateUrl( + @Config("apacheBeamBucketUrl") String beamBucketUrl) { + return beamBucketUrl + "/templates/spec11"; + } + + /** + * Returns the URL of the GCS location we store jar dependencies for beam pipelines. + * + * @see google.registry.beam.invoicing.InvoicingPipeline + * @see google.registry.beam.spec11.Spec11Pipeline + */ + @Provides + @Config("beamStagingUrl") public static String provideInvoiceStagingUrl( @Config("apacheBeamBucketUrl") String beamBucketUrl) { return beamBucketUrl + "/staging"; } /** - * Returns the Google Cloud Storage bucket for ICANN transaction and activity reports to - * be uploaded. + * Returns the Google Cloud Storage bucket for Spec11 and ICANN transaction and activity reports + * to be uploaded. * * @see google.registry.reporting.icann.IcannReportingUploadAction */ @Provides - @Config("icannReportingBucket") + @Config("reportingBucket") public static String provideIcannReportingBucket(@Config("projectId") String projectId) { return projectId + "-reporting"; } @@ -588,6 +601,17 @@ public final class RegistryConfig { return "gs://" + billingBucket; } + /** + * Returns the URL of the GCS subdirectory we store Spec11 reports in. + * + * @see google.registry.beam.spec11.Spec11Pipeline + */ + @Provides + @Config("spec11BucketUrl") + public static String provideSpec11BucketUrl(@Config("reportingBucket") String reportingBucket) { + return "gs://" + reportingBucket + "/icann/spec11"; + } + /** * Returns whether or not we should publish invoices to partners automatically by default. * diff --git a/java/google/registry/reporting/icann/IcannReportingStager.java b/java/google/registry/reporting/icann/IcannReportingStager.java index ec94e8b5d..a988c29c6 100644 --- a/java/google/registry/reporting/icann/IcannReportingStager.java +++ b/java/google/registry/reporting/icann/IcannReportingStager.java @@ -60,7 +60,7 @@ public class IcannReportingStager { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); - @Inject @Config("icannReportingBucket") String reportingBucket; + @Inject @Config("reportingBucket") String reportingBucket; @Inject YearMonth yearMonth; @Inject @ReportingSubdir diff --git a/java/google/registry/reporting/icann/IcannReportingUploadAction.java b/java/google/registry/reporting/icann/IcannReportingUploadAction.java index f5c6d0f7b..1cd6c5371 100644 --- a/java/google/registry/reporting/icann/IcannReportingUploadAction.java +++ b/java/google/registry/reporting/icann/IcannReportingUploadAction.java @@ -60,7 +60,7 @@ public final class IcannReportingUploadAction implements Runnable { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); @Inject - @Config("icannReportingBucket") + @Config("reportingBucket") String reportingBucket; @Inject @ReportingSubdir diff --git a/java/google/registry/tools/BUILD b/java/google/registry/tools/BUILD index 3def12767..90e9fb3dc 100644 --- a/java/google/registry/tools/BUILD +++ b/java/google/registry/tools/BUILD @@ -36,6 +36,7 @@ java_library( deps = [ "//java/google/registry/backup", "//java/google/registry/beam/invoicing", + "//java/google/registry/beam/spec11", "//java/google/registry/bigquery", "//java/google/registry/config", "//java/google/registry/dns", diff --git a/java/google/registry/tools/DeploySpec11PipelineCommand.java b/java/google/registry/tools/DeploySpec11PipelineCommand.java new file mode 100644 index 000000000..bf0438e47 --- /dev/null +++ b/java/google/registry/tools/DeploySpec11PipelineCommand.java @@ -0,0 +1,32 @@ +// Copyright 2018 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.tools; + +import com.beust.jcommander.Parameters; +import google.registry.beam.spec11.Spec11Pipeline; +import javax.inject.Inject; + +/** Nomulus command that deploys the {@link Spec11Pipeline} template. */ +@Parameters(commandDescription = "Deploy the invoicing pipeline to GCS.") +public class DeploySpec11PipelineCommand implements Command { + + @Inject Spec11Pipeline spec11Pipeline; + + @Override + public void run() { + spec11Pipeline.deploy(); + } +} + diff --git a/java/google/registry/tools/RegistryTool.java b/java/google/registry/tools/RegistryTool.java index f0ba33c5a..e46b5980d 100644 --- a/java/google/registry/tools/RegistryTool.java +++ b/java/google/registry/tools/RegistryTool.java @@ -54,6 +54,7 @@ public final class RegistryTool { .put("delete_reserved_list", DeleteReservedListCommand.class) .put("delete_tld", DeleteTldCommand.class) .put("deploy_invoicing_pipeline", DeployInvoicingPipelineCommand.class) + .put("deploy_spec11_pipeline", DeploySpec11PipelineCommand.class) .put("domain_application_info", DomainApplicationInfoCommand.class) .put("encrypt_escrow_deposit", EncryptEscrowDepositCommand.class) .put("execute_epp", ExecuteEppCommand.class) diff --git a/java/google/registry/tools/RegistryToolComponent.java b/java/google/registry/tools/RegistryToolComponent.java index 290e1af79..157b69cf9 100644 --- a/java/google/registry/tools/RegistryToolComponent.java +++ b/java/google/registry/tools/RegistryToolComponent.java @@ -83,6 +83,7 @@ interface RegistryToolComponent { void inject(CreateLrpTokensCommand command); void inject(CreateTldCommand command); void inject(DeployInvoicingPipelineCommand command); + void inject(DeploySpec11PipelineCommand command); void inject(EncryptEscrowDepositCommand command); void inject(GenerateAllocationTokensCommand command); void inject(GenerateApplicationsReportCommand command); diff --git a/javatests/google/registry/beam/BUILD b/javatests/google/registry/beam/BUILD index 86750c423..ee5efa64e 100644 --- a/javatests/google/registry/beam/BUILD +++ b/javatests/google/registry/beam/BUILD @@ -9,4 +9,27 @@ load("//java/com/google/testing/builddefs:GenTestRules.bzl", "GenTestRules") java_library( name = "beam", + srcs = glob(["*.java"]), + deps = [ + "//java/google/registry/beam", + "//javatests/google/registry/testing", + "@com_google_dagger", + "@com_google_guava", + "@com_google_truth", + "@com_google_truth_extensions_truth_java8_extension", + "@junit", + "@org_apache_avro", + "@org_apache_beam_runners_direct_java", + "@org_apache_beam_runners_google_cloud_dataflow_java", + "@org_apache_beam_sdks_java_core", + "@org_apache_beam_sdks_java_io_google_cloud_platform", + "@org_mockito_all", + ], +) + +GenTestRules( + name = "GeneratedTestRules", + default_test_size = "small", + test_files = glob(["*Test.java"]), + deps = [":beam"], ) diff --git a/javatests/google/registry/beam/BeamUtilsTest.java b/javatests/google/registry/beam/BeamUtilsTest.java new file mode 100644 index 000000000..1f6c9d2db --- /dev/null +++ b/javatests/google/registry/beam/BeamUtilsTest.java @@ -0,0 +1,86 @@ +// Copyright 2018 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam; + +import static com.google.common.truth.Truth.assertThat; +import static google.registry.testing.JUnitBackports.assertThrows; + +import com.google.common.collect.ImmutableList; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link BeamUtils} */ +@RunWith(JUnit4.class) +public class BeamUtilsTest { + + private static final String GENERIC_SCHEMA = + "{\"name\": \"AnObject\", " + + "\"type\": \"record\", " + + "\"fields\": [" + + "{\"name\": \"aString\", \"type\": \"string\"}," + + "{\"name\": \"aFloat\", \"type\": \"float\"}" + + "]}"; + + private SchemaAndRecord schemaAndRecord; + + @Before + public void initializeRecord() { + // Create a record with a given JSON schema. + GenericRecord record = new GenericData.Record(new Schema.Parser().parse(GENERIC_SCHEMA)); + record.put("aString", "hello world"); + record.put("aFloat", 2.54); + schemaAndRecord = new SchemaAndRecord(record, null); + } + + @Test + public void testExtractField_fieldExists_returnsExpectedStringValues() { + assertThat(BeamUtils.extractField(schemaAndRecord.getRecord(), "aString")) + .isEqualTo("hello world"); + assertThat(BeamUtils.extractField(schemaAndRecord.getRecord(), "aFloat")).isEqualTo("2.54"); + } + + @Test + public void testExtractField_fieldDoesntExist_returnsNull() { + schemaAndRecord.getRecord().put("aFloat", null); + assertThat(BeamUtils.extractField(schemaAndRecord.getRecord(), "aFloat")).isEqualTo("null"); + assertThat(BeamUtils.extractField(schemaAndRecord.getRecord(), "missing")).isEqualTo("null"); + } + + @Test + public void testCheckFieldsNotNull_noExceptionIfAllPresent() { + BeamUtils.checkFieldsNotNull(ImmutableList.of("aString", "aFloat"), schemaAndRecord); + } + + @Test + public void testCheckFieldsNotNull_fieldMissing_throwsException() { + IllegalStateException expected = + assertThrows( + IllegalStateException.class, + () -> + BeamUtils.checkFieldsNotNull( + ImmutableList.of("aString", "aFloat", "notAField"), schemaAndRecord)); + assertThat(expected) + .hasMessageThat() + .isEqualTo( + "Read unexpected null value for field(s) notAField for record " + + "{\"aString\": \"hello world\", \"aFloat\": 2.54}"); + } +}