mirror of
https://github.com/google/nomulus.git
synced 2025-05-13 16:07:15 +02:00
Add preliminary spec11 monthly pipeline
This adds the scaffolding for a basic Spec11 pipeline- it gathers all domains from all time for a given project and counts how many there are. I've factored out a few common utilities for beam pipelines to avoid excessive duplication. Future CLs will: - Actually process domains via the SafeBrowsing API - Generate a real spec11 report - Template queries based on the input YearMonth - Abstract more commonalities across beam pipelines to reduce boilerplate when adding new pipelines. TESTED: FOSS test passed, and ran successfully on alpha ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=205997741
This commit is contained in:
parent
ded40851d3
commit
d199b383e5
14 changed files with 252 additions and 38 deletions
|
@ -6,4 +6,15 @@ licenses(["notice"]) # Apache 2.0
|
||||||
|
|
||||||
java_library(
|
java_library(
|
||||||
name = "beam",
|
name = "beam",
|
||||||
|
srcs = glob(["*.java"]),
|
||||||
|
deps = [
|
||||||
|
"@com_google_flogger",
|
||||||
|
"@com_google_flogger_system_backend",
|
||||||
|
"@com_google_guava",
|
||||||
|
"@org_apache_avro",
|
||||||
|
"@org_apache_beam_runners_direct_java",
|
||||||
|
"@org_apache_beam_runners_google_cloud_dataflow_java",
|
||||||
|
"@org_apache_beam_sdks_java_core",
|
||||||
|
"@org_apache_beam_sdks_java_io_google_cloud_platform",
|
||||||
|
],
|
||||||
)
|
)
|
||||||
|
|
57
java/google/registry/beam/BeamUtils.java
Normal file
57
java/google/registry/beam/BeamUtils.java
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
// Copyright 2018 The Nomulus Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package google.registry.beam;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.flogger.FluentLogger;
|
||||||
|
import org.apache.avro.generic.GenericRecord;
|
||||||
|
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
|
||||||
|
|
||||||
|
/** Static utilities for {@code Beam} pipelines. */
|
||||||
|
public class BeamUtils {
|
||||||
|
|
||||||
|
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||||
|
|
||||||
|
/** Extracts a string representation of a field in a {@link GenericRecord}. */
|
||||||
|
public static String extractField(GenericRecord record, String fieldName) {
|
||||||
|
return String.valueOf(record.get(fieldName));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks that no expected fields in the record are missing.
|
||||||
|
*
|
||||||
|
* <p>Note that this simply makes sure the field is not null; it may still generate a parse error
|
||||||
|
* when interpreting the string representation of an object.
|
||||||
|
*
|
||||||
|
* @throws IllegalStateException if the record returns null for any field in {@code fieldNames}
|
||||||
|
*/
|
||||||
|
public static void checkFieldsNotNull(
|
||||||
|
ImmutableList<String> fieldNames, SchemaAndRecord schemaAndRecord) {
|
||||||
|
GenericRecord record = schemaAndRecord.getRecord();
|
||||||
|
ImmutableList<String> nullFields =
|
||||||
|
fieldNames
|
||||||
|
.stream()
|
||||||
|
.filter(fieldName -> record.get(fieldName) == null)
|
||||||
|
.collect(ImmutableList.toImmutableList());
|
||||||
|
String missingFieldList = Joiner.on(", ").join(nullFields);
|
||||||
|
if (!nullFields.isEmpty()) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
String.format(
|
||||||
|
"Read unexpected null value for field(s) %s for record %s",
|
||||||
|
missingFieldList, record));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -9,6 +9,7 @@ java_library(
|
||||||
srcs = glob(["*.java"]),
|
srcs = glob(["*.java"]),
|
||||||
resources = glob(["sql/*"]),
|
resources = glob(["sql/*"]),
|
||||||
deps = [
|
deps = [
|
||||||
|
"//java/google/registry/beam",
|
||||||
"//java/google/registry/config",
|
"//java/google/registry/config",
|
||||||
"//java/google/registry/model",
|
"//java/google/registry/model",
|
||||||
"//java/google/registry/reporting/billing",
|
"//java/google/registry/reporting/billing",
|
||||||
|
|
|
@ -14,6 +14,9 @@
|
||||||
|
|
||||||
package google.registry.beam.invoicing;
|
package google.registry.beam.invoicing;
|
||||||
|
|
||||||
|
import static google.registry.beam.BeamUtils.checkFieldsNotNull;
|
||||||
|
import static google.registry.beam.BeamUtils.extractField;
|
||||||
|
|
||||||
import com.google.auto.value.AutoValue;
|
import com.google.auto.value.AutoValue;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
|
@ -108,7 +111,7 @@ public abstract class BillingEvent implements Serializable {
|
||||||
* Apache AVRO GenericRecord</a>
|
* Apache AVRO GenericRecord</a>
|
||||||
*/
|
*/
|
||||||
static BillingEvent parseFromRecord(SchemaAndRecord schemaAndRecord) {
|
static BillingEvent parseFromRecord(SchemaAndRecord schemaAndRecord) {
|
||||||
checkFieldsNotNull(schemaAndRecord);
|
checkFieldsNotNull(FIELD_NAMES, schemaAndRecord);
|
||||||
GenericRecord record = schemaAndRecord.getRecord();
|
GenericRecord record = schemaAndRecord.getRecord();
|
||||||
String flags = extractField(record, "flags");
|
String flags = extractField(record, "flags");
|
||||||
double amount = getDiscountedAmount(Double.parseDouble(extractField(record, "amount")), flags);
|
double amount = getDiscountedAmount(Double.parseDouble(extractField(record, "amount")), flags);
|
||||||
|
@ -337,30 +340,4 @@ public abstract class BillingEvent implements Serializable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Extracts a string representation of a field in a {@code GenericRecord}. */
|
|
||||||
private static String extractField(GenericRecord record, String fieldName) {
|
|
||||||
return String.valueOf(record.get(fieldName));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks that no expected fields in the record are missing.
|
|
||||||
*
|
|
||||||
* <p>Note that this simply makes sure the field is not null; it may still generate a parse error
|
|
||||||
* in {@code parseFromRecord}.
|
|
||||||
*/
|
|
||||||
private static void checkFieldsNotNull(SchemaAndRecord schemaAndRecord) {
|
|
||||||
GenericRecord record = schemaAndRecord.getRecord();
|
|
||||||
ImmutableList<String> nullFields =
|
|
||||||
FIELD_NAMES
|
|
||||||
.stream()
|
|
||||||
.filter(fieldName -> record.get(fieldName) == null)
|
|
||||||
.collect(ImmutableList.toImmutableList());
|
|
||||||
if (!nullFields.isEmpty()) {
|
|
||||||
logger.atSevere().log(
|
|
||||||
"Found unexpected null value(s) in field(s) %s for record %s",
|
|
||||||
Joiner.on(", ").join(nullFields), record);
|
|
||||||
throw new IllegalStateException("Read null value from Bigquery query");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,8 +67,8 @@ public class InvoicingPipeline implements Serializable {
|
||||||
String invoiceTemplateUrl;
|
String invoiceTemplateUrl;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
@Config("invoiceStagingUrl")
|
@Config("beamStagingUrl")
|
||||||
String invoiceStagingUrl;
|
String beamStagingUrl;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
@Config("billingBucketUrl")
|
@Config("billingBucketUrl")
|
||||||
|
@ -99,7 +99,7 @@ public class InvoicingPipeline implements Serializable {
|
||||||
options.setRunner(DataflowRunner.class);
|
options.setRunner(DataflowRunner.class);
|
||||||
// This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it.
|
// This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it.
|
||||||
options.setTemplateLocation(invoiceTemplateUrl);
|
options.setTemplateLocation(invoiceTemplateUrl);
|
||||||
options.setStagingLocation(invoiceStagingUrl);
|
options.setStagingLocation(beamStagingUrl);
|
||||||
Pipeline p = Pipeline.create(options);
|
Pipeline p = Pipeline.create(options);
|
||||||
|
|
||||||
PCollection<BillingEvent> billingEvents =
|
PCollection<BillingEvent> billingEvents =
|
||||||
|
|
|
@ -520,25 +520,38 @@ public final class RegistryConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the URL of the GCS location we store jar dependencies for the invoicing pipeline.
|
* Returns the URL of the GCS location for storing the monthly spec11 Beam template.
|
||||||
*
|
*
|
||||||
* @see google.registry.beam.invoicing.InvoicingPipeline
|
* @see google.registry.beam.spec11.Spec11Pipeline
|
||||||
*/
|
*/
|
||||||
@Provides
|
@Provides
|
||||||
@Config("invoiceStagingUrl")
|
@Config("spec11TemplateUrl")
|
||||||
|
public static String provideSpec11TemplateUrl(
|
||||||
|
@Config("apacheBeamBucketUrl") String beamBucketUrl) {
|
||||||
|
return beamBucketUrl + "/templates/spec11";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the URL of the GCS location we store jar dependencies for beam pipelines.
|
||||||
|
*
|
||||||
|
* @see google.registry.beam.invoicing.InvoicingPipeline
|
||||||
|
* @see google.registry.beam.spec11.Spec11Pipeline
|
||||||
|
*/
|
||||||
|
@Provides
|
||||||
|
@Config("beamStagingUrl")
|
||||||
public static String provideInvoiceStagingUrl(
|
public static String provideInvoiceStagingUrl(
|
||||||
@Config("apacheBeamBucketUrl") String beamBucketUrl) {
|
@Config("apacheBeamBucketUrl") String beamBucketUrl) {
|
||||||
return beamBucketUrl + "/staging";
|
return beamBucketUrl + "/staging";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the Google Cloud Storage bucket for ICANN transaction and activity reports to
|
* Returns the Google Cloud Storage bucket for Spec11 and ICANN transaction and activity reports
|
||||||
* be uploaded.
|
* to be uploaded.
|
||||||
*
|
*
|
||||||
* @see google.registry.reporting.icann.IcannReportingUploadAction
|
* @see google.registry.reporting.icann.IcannReportingUploadAction
|
||||||
*/
|
*/
|
||||||
@Provides
|
@Provides
|
||||||
@Config("icannReportingBucket")
|
@Config("reportingBucket")
|
||||||
public static String provideIcannReportingBucket(@Config("projectId") String projectId) {
|
public static String provideIcannReportingBucket(@Config("projectId") String projectId) {
|
||||||
return projectId + "-reporting";
|
return projectId + "-reporting";
|
||||||
}
|
}
|
||||||
|
@ -588,6 +601,17 @@ public final class RegistryConfig {
|
||||||
return "gs://" + billingBucket;
|
return "gs://" + billingBucket;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the URL of the GCS subdirectory we store Spec11 reports in.
|
||||||
|
*
|
||||||
|
* @see google.registry.beam.spec11.Spec11Pipeline
|
||||||
|
*/
|
||||||
|
@Provides
|
||||||
|
@Config("spec11BucketUrl")
|
||||||
|
public static String provideSpec11BucketUrl(@Config("reportingBucket") String reportingBucket) {
|
||||||
|
return "gs://" + reportingBucket + "/icann/spec11";
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns whether or not we should publish invoices to partners automatically by default.
|
* Returns whether or not we should publish invoices to partners automatically by default.
|
||||||
*
|
*
|
||||||
|
|
|
@ -60,7 +60,7 @@ public class IcannReportingStager {
|
||||||
|
|
||||||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||||
|
|
||||||
@Inject @Config("icannReportingBucket") String reportingBucket;
|
@Inject @Config("reportingBucket") String reportingBucket;
|
||||||
|
|
||||||
@Inject YearMonth yearMonth;
|
@Inject YearMonth yearMonth;
|
||||||
@Inject @ReportingSubdir
|
@Inject @ReportingSubdir
|
||||||
|
|
|
@ -60,7 +60,7 @@ public final class IcannReportingUploadAction implements Runnable {
|
||||||
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
@Config("icannReportingBucket")
|
@Config("reportingBucket")
|
||||||
String reportingBucket;
|
String reportingBucket;
|
||||||
|
|
||||||
@Inject @ReportingSubdir
|
@Inject @ReportingSubdir
|
||||||
|
|
|
@ -36,6 +36,7 @@ java_library(
|
||||||
deps = [
|
deps = [
|
||||||
"//java/google/registry/backup",
|
"//java/google/registry/backup",
|
||||||
"//java/google/registry/beam/invoicing",
|
"//java/google/registry/beam/invoicing",
|
||||||
|
"//java/google/registry/beam/spec11",
|
||||||
"//java/google/registry/bigquery",
|
"//java/google/registry/bigquery",
|
||||||
"//java/google/registry/config",
|
"//java/google/registry/config",
|
||||||
"//java/google/registry/dns",
|
"//java/google/registry/dns",
|
||||||
|
|
32
java/google/registry/tools/DeploySpec11PipelineCommand.java
Normal file
32
java/google/registry/tools/DeploySpec11PipelineCommand.java
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
// Copyright 2018 The Nomulus Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package google.registry.tools;
|
||||||
|
|
||||||
|
import com.beust.jcommander.Parameters;
|
||||||
|
import google.registry.beam.spec11.Spec11Pipeline;
|
||||||
|
import javax.inject.Inject;
|
||||||
|
|
||||||
|
/** Nomulus command that deploys the {@link Spec11Pipeline} template. */
|
||||||
|
@Parameters(commandDescription = "Deploy the invoicing pipeline to GCS.")
|
||||||
|
public class DeploySpec11PipelineCommand implements Command {
|
||||||
|
|
||||||
|
@Inject Spec11Pipeline spec11Pipeline;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
spec11Pipeline.deploy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -54,6 +54,7 @@ public final class RegistryTool {
|
||||||
.put("delete_reserved_list", DeleteReservedListCommand.class)
|
.put("delete_reserved_list", DeleteReservedListCommand.class)
|
||||||
.put("delete_tld", DeleteTldCommand.class)
|
.put("delete_tld", DeleteTldCommand.class)
|
||||||
.put("deploy_invoicing_pipeline", DeployInvoicingPipelineCommand.class)
|
.put("deploy_invoicing_pipeline", DeployInvoicingPipelineCommand.class)
|
||||||
|
.put("deploy_spec11_pipeline", DeploySpec11PipelineCommand.class)
|
||||||
.put("domain_application_info", DomainApplicationInfoCommand.class)
|
.put("domain_application_info", DomainApplicationInfoCommand.class)
|
||||||
.put("encrypt_escrow_deposit", EncryptEscrowDepositCommand.class)
|
.put("encrypt_escrow_deposit", EncryptEscrowDepositCommand.class)
|
||||||
.put("execute_epp", ExecuteEppCommand.class)
|
.put("execute_epp", ExecuteEppCommand.class)
|
||||||
|
|
|
@ -83,6 +83,7 @@ interface RegistryToolComponent {
|
||||||
void inject(CreateLrpTokensCommand command);
|
void inject(CreateLrpTokensCommand command);
|
||||||
void inject(CreateTldCommand command);
|
void inject(CreateTldCommand command);
|
||||||
void inject(DeployInvoicingPipelineCommand command);
|
void inject(DeployInvoicingPipelineCommand command);
|
||||||
|
void inject(DeploySpec11PipelineCommand command);
|
||||||
void inject(EncryptEscrowDepositCommand command);
|
void inject(EncryptEscrowDepositCommand command);
|
||||||
void inject(GenerateAllocationTokensCommand command);
|
void inject(GenerateAllocationTokensCommand command);
|
||||||
void inject(GenerateApplicationsReportCommand command);
|
void inject(GenerateApplicationsReportCommand command);
|
||||||
|
|
|
@ -9,4 +9,27 @@ load("//java/com/google/testing/builddefs:GenTestRules.bzl", "GenTestRules")
|
||||||
|
|
||||||
java_library(
|
java_library(
|
||||||
name = "beam",
|
name = "beam",
|
||||||
|
srcs = glob(["*.java"]),
|
||||||
|
deps = [
|
||||||
|
"//java/google/registry/beam",
|
||||||
|
"//javatests/google/registry/testing",
|
||||||
|
"@com_google_dagger",
|
||||||
|
"@com_google_guava",
|
||||||
|
"@com_google_truth",
|
||||||
|
"@com_google_truth_extensions_truth_java8_extension",
|
||||||
|
"@junit",
|
||||||
|
"@org_apache_avro",
|
||||||
|
"@org_apache_beam_runners_direct_java",
|
||||||
|
"@org_apache_beam_runners_google_cloud_dataflow_java",
|
||||||
|
"@org_apache_beam_sdks_java_core",
|
||||||
|
"@org_apache_beam_sdks_java_io_google_cloud_platform",
|
||||||
|
"@org_mockito_all",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
GenTestRules(
|
||||||
|
name = "GeneratedTestRules",
|
||||||
|
default_test_size = "small",
|
||||||
|
test_files = glob(["*Test.java"]),
|
||||||
|
deps = [":beam"],
|
||||||
)
|
)
|
||||||
|
|
86
javatests/google/registry/beam/BeamUtilsTest.java
Normal file
86
javatests/google/registry/beam/BeamUtilsTest.java
Normal file
|
@ -0,0 +1,86 @@
|
||||||
|
// Copyright 2018 The Nomulus Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package google.registry.beam;
|
||||||
|
|
||||||
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
|
import static google.registry.testing.JUnitBackports.assertThrows;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.generic.GenericData;
|
||||||
|
import org.apache.avro.generic.GenericRecord;
|
||||||
|
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.JUnit4;
|
||||||
|
|
||||||
|
/** Unit tests for {@link BeamUtils} */
|
||||||
|
@RunWith(JUnit4.class)
|
||||||
|
public class BeamUtilsTest {
|
||||||
|
|
||||||
|
private static final String GENERIC_SCHEMA =
|
||||||
|
"{\"name\": \"AnObject\", "
|
||||||
|
+ "\"type\": \"record\", "
|
||||||
|
+ "\"fields\": ["
|
||||||
|
+ "{\"name\": \"aString\", \"type\": \"string\"},"
|
||||||
|
+ "{\"name\": \"aFloat\", \"type\": \"float\"}"
|
||||||
|
+ "]}";
|
||||||
|
|
||||||
|
private SchemaAndRecord schemaAndRecord;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void initializeRecord() {
|
||||||
|
// Create a record with a given JSON schema.
|
||||||
|
GenericRecord record = new GenericData.Record(new Schema.Parser().parse(GENERIC_SCHEMA));
|
||||||
|
record.put("aString", "hello world");
|
||||||
|
record.put("aFloat", 2.54);
|
||||||
|
schemaAndRecord = new SchemaAndRecord(record, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExtractField_fieldExists_returnsExpectedStringValues() {
|
||||||
|
assertThat(BeamUtils.extractField(schemaAndRecord.getRecord(), "aString"))
|
||||||
|
.isEqualTo("hello world");
|
||||||
|
assertThat(BeamUtils.extractField(schemaAndRecord.getRecord(), "aFloat")).isEqualTo("2.54");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExtractField_fieldDoesntExist_returnsNull() {
|
||||||
|
schemaAndRecord.getRecord().put("aFloat", null);
|
||||||
|
assertThat(BeamUtils.extractField(schemaAndRecord.getRecord(), "aFloat")).isEqualTo("null");
|
||||||
|
assertThat(BeamUtils.extractField(schemaAndRecord.getRecord(), "missing")).isEqualTo("null");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckFieldsNotNull_noExceptionIfAllPresent() {
|
||||||
|
BeamUtils.checkFieldsNotNull(ImmutableList.of("aString", "aFloat"), schemaAndRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckFieldsNotNull_fieldMissing_throwsException() {
|
||||||
|
IllegalStateException expected =
|
||||||
|
assertThrows(
|
||||||
|
IllegalStateException.class,
|
||||||
|
() ->
|
||||||
|
BeamUtils.checkFieldsNotNull(
|
||||||
|
ImmutableList.of("aString", "aFloat", "notAField"), schemaAndRecord));
|
||||||
|
assertThat(expected)
|
||||||
|
.hasMessageThat()
|
||||||
|
.isEqualTo(
|
||||||
|
"Read unexpected null value for field(s) notAField for record "
|
||||||
|
+ "{\"aString\": \"hello world\", \"aFloat\": 2.54}");
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue