From fd4a94b9e7b55cc11e8575772aa49aafb184af71 Mon Sep 17 00:00:00 2001 From: Ben McIlwain Date: Tue, 14 Sep 2021 14:07:26 -0400 Subject: [PATCH] Rename Spec11Pipeline's Subdomain -> DomainNameInfo (#1318) * Rename Spec11Pipeline's Subdomain -> DomainNameInfo "Subdomain" never made any sense as a class name because these are all second-level domain names, along with a little bit of metadata such as some registrar info. "DomainNameInfo" is a better fit. --- .../{Subdomain.java => DomainNameInfo.java} | 24 ++++---- .../beam/spec11/SafeBrowsingTransforms.java | 56 ++++++++++--------- .../registry/beam/spec11/Spec11Pipeline.java | 39 ++++++------- .../{subdomains.sql => domain_name_infos.sql} | 0 .../spec11/SafeBrowsingTransformsTest.java | 45 +++++++-------- .../beam/spec11/Spec11PipelineTest.java | 25 +++++---- .../registry/beam/spec11/test_output.txt | 2 +- 7 files changed, 100 insertions(+), 91 deletions(-) rename core/src/main/java/google/registry/beam/spec11/{Subdomain.java => DomainNameInfo.java} (82%) rename core/src/main/resources/google/registry/beam/spec11/sql/{subdomains.sql => domain_name_infos.sql} (100%) diff --git a/core/src/main/java/google/registry/beam/spec11/Subdomain.java b/core/src/main/java/google/registry/beam/spec11/DomainNameInfo.java similarity index 82% rename from core/src/main/java/google/registry/beam/spec11/Subdomain.java rename to core/src/main/java/google/registry/beam/spec11/DomainNameInfo.java index 8d60751df..4107ea539 100644 --- a/core/src/main/java/google/registry/beam/spec11/Subdomain.java +++ b/core/src/main/java/google/registry/beam/spec11/DomainNameInfo.java @@ -25,7 +25,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; /** - * A POJO representing a single subdomain, parsed from a {@code SchemaAndRecord}. + * A POJO representing a domain name and associated info, parsed from a {@code SchemaAndRecord}. * *

This is a trivially serializable class that allows Beam to transform the results of a Bigquery * query into a standard Java representation, giving us the type guarantees and ease of manipulation @@ -33,28 +33,31 @@ import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; * function. */ @AutoValue -public abstract class Subdomain implements Serializable { +public abstract class DomainNameInfo implements Serializable { private static final ImmutableList FIELD_NAMES = ImmutableList.of("domainName", "domainRepoId", "registrarId", "registrarEmailAddress"); /** Returns the fully qualified domain name. */ abstract String domainName(); + /** Returns the domain repo ID (the primary key of the domain table). */ abstract String domainRepoId(); + /** Returns the registrar ID of the associated registrar for this domain. */ abstract String registrarId(); + /** Returns the email address of the registrar associated with this domain. */ abstract String registrarEmailAddress(); /** - * Constructs a {@link Subdomain} from an Apache Avro {@code SchemaAndRecord}. + * Constructs a {@link DomainNameInfo} from an Apache Avro {@code SchemaAndRecord}. * * @see * Apache AVRO GenericRecord */ - static Subdomain parseFromRecord(SchemaAndRecord schemaAndRecord) { + static DomainNameInfo parseFromRecord(SchemaAndRecord schemaAndRecord) { checkFieldsNotNull(FIELD_NAMES, schemaAndRecord); GenericRecord record = schemaAndRecord.getRecord(); return create( @@ -65,18 +68,15 @@ public abstract class Subdomain implements Serializable { } /** - * Creates a concrete {@link Subdomain}. + * Creates a concrete {@link DomainNameInfo}. * - *

This should only be used outside this class for testing- instances of {@link Subdomain} + *

This should only be used outside this class for testing- instances of {@link DomainNameInfo} * should otherwise come from {@link #parseFromRecord}. */ @VisibleForTesting - static Subdomain create( - String domainName, - String domainRepoId, - String registrarId, - String registrarEmailAddress) { - return new AutoValue_Subdomain( + static DomainNameInfo create( + String domainName, String domainRepoId, String registrarId, String registrarEmailAddress) { + return new AutoValue_DomainNameInfo( domainName, domainRepoId, registrarId, registrarEmailAddress); } } diff --git a/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java b/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java index f896e1c89..bd7849936 100644 --- a/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java +++ b/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java @@ -55,13 +55,14 @@ public class SafeBrowsingTransforms { "https://safebrowsing.googleapis.com/v4/threatMatches:find"; /** - * {@link DoFn} mapping a {@link Subdomain} to its evaluation report from SafeBrowsing. + * {@link DoFn} mapping a {@link DomainNameInfo} to its evaluation report from SafeBrowsing. * *

Refer to the Lookup API documentation for the request/response format and other details. * * @see Lookup API */ - static class EvaluateSafeBrowsingFn extends DoFn> { + static class EvaluateSafeBrowsingFn + extends DoFn> { /** * Max number of urls we can check in a single query. @@ -74,10 +75,11 @@ public class SafeBrowsingTransforms { private final String apiKey; /** - * Maps a subdomain's {@code fullyQualifiedDomainName} to its corresponding {@link Subdomain} to - * facilitate batching SafeBrowsing API requests. + * Maps a domain name's {@code fullyQualifiedDomainName} to its corresponding {@link + * DomainNameInfo} to facilitate batching SafeBrowsing API requests. */ - private final Map subdomainBuffer = new LinkedHashMap<>(BATCH_SIZE); + private final Map domainNameInfoBuffer = + new LinkedHashMap<>(BATCH_SIZE); /** * Provides the HTTP client we use to interact with the SafeBrowsing API. @@ -119,37 +121,38 @@ public class SafeBrowsingTransforms { closeableHttpClientSupplier = clientSupplier; } - /** Evaluates any buffered {@link Subdomain} objects upon completing the bundle. */ + /** Evaluates any buffered {@link DomainNameInfo} objects upon completing the bundle. */ @FinishBundle public void finishBundle(FinishBundleContext context) { - if (!subdomainBuffer.isEmpty()) { - ImmutableSet> results = evaluateAndFlush(); + if (!domainNameInfoBuffer.isEmpty()) { + ImmutableSet> results = evaluateAndFlush(); results.forEach((kv) -> context.output(kv, Instant.now(), GlobalWindow.INSTANCE)); } } /** - * Buffers {@link Subdomain} objects until we reach the batch size, then bulk-evaluate the URLs - * with the SafeBrowsing API. + * Buffers {@link DomainNameInfo} objects until we reach the batch size, then bulk-evaluate the + * URLs with the SafeBrowsing API. */ @ProcessElement public void processElement(ProcessContext context) { - Subdomain subdomain = context.element(); - subdomainBuffer.put(subdomain.domainName(), subdomain); - if (subdomainBuffer.size() >= BATCH_SIZE) { - ImmutableSet> results = evaluateAndFlush(); + DomainNameInfo domainNameInfo = context.element(); + domainNameInfoBuffer.put(domainNameInfo.domainName(), domainNameInfo); + if (domainNameInfoBuffer.size() >= BATCH_SIZE) { + ImmutableSet> results = evaluateAndFlush(); results.forEach(context::output); } } /** - * Evaluates all {@link Subdomain} objects in the buffer and returns a list of key-value pairs - * from {@link Subdomain} to its SafeBrowsing report. + * Evaluates all {@link DomainNameInfo} objects in the buffer and returns a list of key-value + * pairs from {@link DomainNameInfo} to its SafeBrowsing report. * - *

If a {@link Subdomain} is safe according to the API, it will not emit a report. + *

If a {@link DomainNameInfo} is safe according to the API, it will not emit a report. */ - private ImmutableSet> evaluateAndFlush() { - ImmutableSet.Builder> resultBuilder = new ImmutableSet.Builder<>(); + private ImmutableSet> evaluateAndFlush() { + ImmutableSet.Builder> resultBuilder = + new ImmutableSet.Builder<>(); try { URIBuilder uriBuilder = new URIBuilder(SAFE_BROWSING_URL); // Add the API key param @@ -174,7 +177,7 @@ public class SafeBrowsingTransforms { throw new RuntimeException("Caught parsing exception, failing pipeline.", e); } finally { // Flush the buffer - subdomainBuffer.clear(); + domainNameInfoBuffer.clear(); } return resultBuilder.build(); } @@ -183,7 +186,7 @@ public class SafeBrowsingTransforms { private JSONObject createRequestBody() throws JSONException { // Accumulate all domain names to evaluate. JSONArray threatArray = new JSONArray(); - for (String fullyQualifiedDomainName : subdomainBuffer.keySet()) { + for (String fullyQualifiedDomainName : domainNameInfoBuffer.keySet()) { threatArray.put(new JSONObject().put("url", fullyQualifiedDomainName)); } // Construct the JSON request body @@ -211,7 +214,7 @@ public class SafeBrowsingTransforms { */ private void processResponse( CloseableHttpResponse response, - ImmutableSet.Builder> resultBuilder) + ImmutableSet.Builder> resultBuilder) throws JSONException, IOException { int statusCode = response.getStatusLine().getStatusCode(); if (statusCode != SC_OK) { @@ -226,16 +229,17 @@ public class SafeBrowsingTransforms { if (responseBody.length() == 0) { logger.atInfo().log("Response was empty, no threats detected"); } else { - // Emit all Subdomains with their API results. + // Emit all DomainNameInfos with their API results. JSONArray threatMatches = responseBody.getJSONArray("matches"); for (int i = 0; i < threatMatches.length(); i++) { JSONObject match = threatMatches.getJSONObject(i); String url = match.getJSONObject("threat").getString("url"); - Subdomain subdomain = subdomainBuffer.get(url); + DomainNameInfo domainNameInfo = domainNameInfoBuffer.get(url); resultBuilder.add( KV.of( - subdomain, - ThreatMatch.create(match.getString("threatType"), subdomain.domainName()))); + domainNameInfo, + ThreatMatch.create( + match.getString("threatType"), domainNameInfo.domainName()))); } } } diff --git a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java index 4f03caa80..d1c1a3898 100644 --- a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java +++ b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java @@ -100,20 +100,20 @@ public class Spec11Pipeline implements Serializable { void setupPipeline(Pipeline pipeline) { options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED); - PCollection domains = + PCollection domains = options.getDatabase().equals("DATASTORE") ? readFromBigQuery(options, pipeline) : readFromCloudSql(pipeline); - PCollection> threatMatches = + PCollection> threatMatches = domains.apply("Run through SafeBrowsing API", ParDo.of(safeBrowsingFn)); saveToSql(threatMatches, options); saveToGcs(threatMatches, options); } - static PCollection readFromCloudSql(Pipeline pipeline) { - Read read = + static PCollection readFromCloudSql(Pipeline pipeline) { + Read read = RegistryJpaIO.read( "select d, r.emailAddress from Domain d join Registrar r on" + " d.currentSponsorClientId = r.clientIdentifier where r.type = 'REAL'" @@ -124,30 +124,31 @@ public class Spec11Pipeline implements Serializable { return pipeline.apply("Read active domains from Cloud SQL", read); } - static PCollection readFromBigQuery(Spec11PipelineOptions options, Pipeline pipeline) { + static PCollection readFromBigQuery( + Spec11PipelineOptions options, Pipeline pipeline) { return pipeline.apply( "Read active domains from BigQuery", - BigQueryIO.read(Subdomain::parseFromRecord) + BigQueryIO.read(DomainNameInfo::parseFromRecord) .fromQuery( - SqlTemplate.create(getQueryFromFile(Spec11Pipeline.class, "subdomains.sql")) + SqlTemplate.create(getQueryFromFile(Spec11Pipeline.class, "domain_name_infos.sql")) .put("PROJECT_ID", options.getProject()) .put("DATASTORE_EXPORT_DATASET", "latest_datastore_export") .put("REGISTRAR_TABLE", "Registrar") .put("DOMAIN_BASE_TABLE", "DomainBase") .build()) - .withCoder(SerializableCoder.of(Subdomain.class)) + .withCoder(SerializableCoder.of(DomainNameInfo.class)) .usingStandardSql() .withoutValidation() .withTemplateCompatibility()); } - private static Subdomain parseRow(Object[] row) { + private static DomainNameInfo parseRow(Object[] row) { DomainBase domainBase = (DomainBase) row[0]; String emailAddress = (String) row[1]; if (emailAddress == null) { emailAddress = ""; } - return Subdomain.create( + return DomainNameInfo.create( domainBase.getDomainName(), domainBase.getRepoId(), domainBase.getCurrentSponsorClientId(), @@ -155,31 +156,31 @@ public class Spec11Pipeline implements Serializable { } static void saveToSql( - PCollection> threatMatches, Spec11PipelineOptions options) { + PCollection> threatMatches, Spec11PipelineOptions options) { String transformId = "Spec11 Threat Matches"; LocalDate date = LocalDate.parse(options.getDate(), ISODateTimeFormat.date()); threatMatches.apply( "Write to Sql: " + transformId, - RegistryJpaIO.>write() + RegistryJpaIO.>write() .withName(transformId) .withBatchSize(options.getSqlWriteBatchSize()) .withShards(options.getSqlWriteShards()) .withJpaConverter( (kv) -> { - Subdomain subdomain = kv.getKey(); + DomainNameInfo domainNameInfo = kv.getKey(); return new Spec11ThreatMatch.Builder() .setThreatTypes( ImmutableSet.of(ThreatType.valueOf(kv.getValue().threatType()))) .setCheckDate(date) - .setDomainName(subdomain.domainName()) - .setDomainRepoId(subdomain.domainRepoId()) - .setRegistrarId(subdomain.registrarId()) + .setDomainName(domainNameInfo.domainName()) + .setDomainRepoId(domainNameInfo.domainRepoId()) + .setRegistrarId(domainNameInfo.registrarId()) .build(); })); } static void saveToGcs( - PCollection> threatMatches, Spec11PipelineOptions options) { + PCollection> threatMatches, Spec11PipelineOptions options) { threatMatches .apply( "Map registrar ID to email/ThreatMatch pair", @@ -187,7 +188,7 @@ public class Spec11Pipeline implements Serializable { TypeDescriptors.kvs( TypeDescriptors.strings(), TypeDescriptor.of(EmailAndThreatMatch.class))) .via( - (KV kv) -> + (KV kv) -> KV.of( kv.getKey().registrarId(), EmailAndThreatMatch.create( @@ -230,7 +231,7 @@ public class Spec11Pipeline implements Serializable { options.getReportingBucketUrl(), getSpec11ReportFilePath(LocalDate.parse(options.getDate())))) .withoutSharding() - .withHeader("Map from registrar email / name to detected subdomain threats:")); + .withHeader("Map from registrar email / name to detected domain name threats:")); } public static void main(String[] args) { diff --git a/core/src/main/resources/google/registry/beam/spec11/sql/subdomains.sql b/core/src/main/resources/google/registry/beam/spec11/sql/domain_name_infos.sql similarity index 100% rename from core/src/main/resources/google/registry/beam/spec11/sql/subdomains.sql rename to core/src/main/resources/google/registry/beam/spec11/sql/domain_name_infos.sql diff --git a/core/src/test/java/google/registry/beam/spec11/SafeBrowsingTransformsTest.java b/core/src/test/java/google/registry/beam/spec11/SafeBrowsingTransformsTest.java index 224bc86c9..ac27c0d8d 100644 --- a/core/src/test/java/google/registry/beam/spec11/SafeBrowsingTransformsTest.java +++ b/core/src/test/java/google/registry/beam/spec11/SafeBrowsingTransformsTest.java @@ -80,7 +80,7 @@ class SafeBrowsingTransformsTest { private static final String REGISTRAR_ID = "registrarID"; private static final String REGISTRAR_EMAIL = "email@registrar.net"; - private static ImmutableMap THREAT_MATCH_MAP; + private static ImmutableMap THREAT_MATCH_MAP; private final CloseableHttpClient mockHttpClient = mock(CloseableHttpClient.class, withSettings().serializable()); @@ -95,24 +95,25 @@ class SafeBrowsingTransformsTest { final TestPipelineExtension pipeline = TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); - private static Subdomain createSubdomain(String url) { - return Subdomain.create(url, REPO_ID, REGISTRAR_ID, REGISTRAR_EMAIL); + private static DomainNameInfo createDomainNameInfo(String url) { + return DomainNameInfo.create(url, REPO_ID, REGISTRAR_ID, REGISTRAR_EMAIL); } - private KV getKv(String url) { - Subdomain subdomain = createSubdomain(url); - return KV.of(subdomain, THREAT_MATCH_MAP.get(subdomain)); + private KV getKv(String url) { + DomainNameInfo domainNameInfo = createDomainNameInfo(url); + return KV.of(domainNameInfo, THREAT_MATCH_MAP.get(domainNameInfo)); } @BeforeAll static void beforeAll() { - ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); + ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); THREAT_MAP .entrySet() .forEach( kv -> builder.put( - createSubdomain(kv.getKey()), ThreatMatch.create(kv.getValue(), kv.getKey()))); + createDomainNameInfo(kv.getKey()), + ThreatMatch.create(kv.getValue(), kv.getKey()))); THREAT_MATCH_MAP = builder.build(); } @@ -123,16 +124,16 @@ class SafeBrowsingTransformsTest { @Test void testSuccess_someBadDomains() throws Exception { - ImmutableList subdomains = + ImmutableList domainNameInfos = ImmutableList.of( - createSubdomain("111.com"), - createSubdomain("hooli.com"), - createSubdomain("party-night.net"), - createSubdomain("anti-anti-anti-virus.dev"), - createSubdomain("no-email.com")); - PCollection> threats = + createDomainNameInfo("111.com"), + createDomainNameInfo("hooli.com"), + createDomainNameInfo("party-night.net"), + createDomainNameInfo("anti-anti-anti-virus.dev"), + createDomainNameInfo("no-email.com")); + PCollection> threats = pipeline - .apply(Create.of(subdomains).withCoder(SerializableCoder.of(Subdomain.class))) + .apply(Create.of(domainNameInfos).withCoder(SerializableCoder.of(DomainNameInfo.class))) .apply(ParDo.of(safeBrowsingFn)); PAssert.that(threats) @@ -146,14 +147,14 @@ class SafeBrowsingTransformsTest { @Test void testSuccess_noBadDomains() throws Exception { - ImmutableList subdomains = + ImmutableList domainNameInfos = ImmutableList.of( - createSubdomain("hello_kitty.dev"), - createSubdomain("555.com"), - createSubdomain("goodboy.net")); - PCollection> threats = + createDomainNameInfo("hello_kitty.dev"), + createDomainNameInfo("555.com"), + createDomainNameInfo("goodboy.net")); + PCollection> threats = pipeline - .apply(Create.of(subdomains).withCoder(SerializableCoder.of(Subdomain.class))) + .apply(Create.of(domainNameInfos).withCoder(SerializableCoder.of(DomainNameInfo.class))) .apply(ParDo.of(safeBrowsingFn)); PAssert.that(threats).empty(); diff --git a/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java b/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java index f8bd530ed..b10aaef14 100644 --- a/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java +++ b/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java @@ -94,13 +94,16 @@ class Spec11PipelineTest { private final CloseableHttpClient mockHttpClient = mock(CloseableHttpClient.class, withSettings().serializable()); - private static final ImmutableList SUBDOMAINS = + private static final ImmutableList DOMAIN_NAME_INFOS = ImmutableList.of( - Subdomain.create("111.com", "123456789-COM", "hello-registrar", "email@hello.net"), - Subdomain.create("party-night.net", "2244AABBC-NET", "kitty-registrar", "contact@kit.ty"), - Subdomain.create("bitcoin.bank", "1C3D5E7F9-BANK", "hello-registrar", "email@hello.net"), - Subdomain.create("no-email.com", "2A4BA9BBC-COM", "kitty-registrar", "contact@kit.ty"), - Subdomain.create( + DomainNameInfo.create("111.com", "123456789-COM", "hello-registrar", "email@hello.net"), + DomainNameInfo.create( + "party-night.net", "2244AABBC-NET", "kitty-registrar", "contact@kit.ty"), + DomainNameInfo.create( + "bitcoin.bank", "1C3D5E7F9-BANK", "hello-registrar", "email@hello.net"), + DomainNameInfo.create( + "no-email.com", "2A4BA9BBC-COM", "kitty-registrar", "contact@kit.ty"), + DomainNameInfo.create( "anti-anti-anti-virus.dev", "555666888-DEV", "cool-registrar", "cool@aid.net")); private static final ImmutableList THREAT_MATCHES = @@ -129,7 +132,7 @@ class Spec11PipelineTest { PipelineOptionsFactory.create().as(Spec11PipelineOptions.class); private File reportingBucketUrl; - private PCollection> threatMatches; + private PCollection> threatMatches; ImmutableSet sqlThreatMatches; @@ -143,11 +146,11 @@ class Spec11PipelineTest { threatMatches = pipeline.apply( Create.of( - Streams.zip(SUBDOMAINS.stream(), THREAT_MATCHES.stream(), KV::of) + Streams.zip(DOMAIN_NAME_INFOS.stream(), THREAT_MATCHES.stream(), KV::of) .collect(toImmutableList())) .withCoder( KvCoder.of( - SerializableCoder.of(Subdomain.class), + SerializableCoder.of(DomainNameInfo.class), SerializableCoder.of(ThreatMatch.class)))); sqlThreatMatches = @@ -223,8 +226,8 @@ class Spec11PipelineTest { @Test void testSuccess_readFromCloudSql() throws Exception { setupCloudSql(); - PCollection subdomains = Spec11Pipeline.readFromCloudSql(pipeline); - PAssert.that(subdomains).containsInAnyOrder(SUBDOMAINS); + PCollection domainNameInfos = Spec11Pipeline.readFromCloudSql(pipeline); + PAssert.that(domainNameInfos).containsInAnyOrder(DOMAIN_NAME_INFOS); pipeline.run().waitUntilFinish(); } diff --git a/core/src/test/resources/google/registry/beam/spec11/test_output.txt b/core/src/test/resources/google/registry/beam/spec11/test_output.txt index f572d694b..abc5a11cb 100644 --- a/core/src/test/resources/google/registry/beam/spec11/test_output.txt +++ b/core/src/test/resources/google/registry/beam/spec11/test_output.txt @@ -1,4 +1,4 @@ -Map from registrar email / name to detected subdomain threats: +Map from registrar email / name to detected domain name threats: {"threatMatches":[{"threatType":"UNWANTED_SOFTWARE","fullyQualifiedDomainName":"anti-anti-anti-virus.dev"}],"registrarClientId":"cool-registrar","registrarEmailAddress":"cool@aid.net"} {"threatMatches":[{"threatType":"MALWARE","fullyQualifiedDomainName":"111.com"},{"threatType":"POTENTIALLY_HARMFUL_APPLICATION","fullyQualifiedDomainName":"bitcoin.bank"}],"registrarClientId":"hello-registrar","registrarEmailAddress":"email@hello.net"} {"threatMatches":[{"threatType":"THREAT_TYPE_UNSPECIFIED","fullyQualifiedDomainName":"no-eamil.com"},{"threatType":"SOCIAL_ENGINEERING","fullyQualifiedDomainName":"party-night.net"}],"registrarClientId":"kitty-registrar","registrarEmailAddress":"contact@kit.ty"} \ No newline at end of file