diff --git a/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java b/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java index b02d17044..57aa324e2 100644 --- a/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java +++ b/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java @@ -141,7 +141,7 @@ public class SafeBrowsingTransforms { @ProcessElement public void processElement(ProcessContext context) { Subdomain subdomain = context.element(); - subdomainBuffer.put(subdomain.fullyQualifiedDomainName(), subdomain); + subdomainBuffer.put(subdomain.domainName(), subdomain); if (subdomainBuffer.size() >= BATCH_SIZE) { ImmutableSet> results = evaluateAndFlush(); results.forEach(context::output); @@ -239,7 +239,7 @@ public class SafeBrowsingTransforms { String url = match.getJSONObject("threat").getString("url"); Subdomain subdomain = subdomainBuffer.get(url); resultBuilder.add( - KV.of(subdomain, ThreatMatch.create(match, subdomain.fullyQualifiedDomainName()))); + KV.of(subdomain, ThreatMatch.create(match, subdomain.domainName()))); } } } diff --git a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java index ce9247d01..685c64878 100644 --- a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java +++ b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java @@ -77,7 +77,7 @@ public class Spec11Pipeline implements Serializable { public static final String REGISTRAR_EMAIL_FIELD = "registrarEmailAddress"; /** The JSON object field into which we put the registrar's name for Spec11 reports. */ public static final String REGISTRAR_CLIENT_ID_FIELD = "registrarClientId"; - /** The JSON object field we put the threat match array for Spec11 reports. */ + /** The JSON object field into which we put the threat match array for Spec11 reports. */ public static final String THREAT_MATCHES_FIELD = "threatMatches"; private final String projectId; @@ -176,9 +176,11 @@ public class Spec11Pipeline implements Serializable { PCollection domains, EvaluateSafeBrowsingFn evaluateSafeBrowsingFn, ValueProvider dateProvider) { - PCollection> subdomains = + + /* Store ThreatMatch objects in JSON. */ + PCollection> subdomainsJson = domains.apply("Run through SafeBrowsingAPI", ParDo.of(evaluateSafeBrowsingFn)); - subdomains + subdomainsJson .apply( "Map registrar client ID to email/ThreatMatch pair", MapElements.into( @@ -187,7 +189,7 @@ public class Spec11Pipeline implements Serializable { .via( (KV kv) -> KV.of( - kv.getKey().registrarClientId(), + kv.getKey().registrarId(), EmailAndThreatMatch.create( kv.getKey().registrarEmailAddress(), kv.getValue())))) .apply("Group by registrar client ID", GroupByKey.create()) diff --git a/core/src/main/java/google/registry/beam/spec11/Subdomain.java b/core/src/main/java/google/registry/beam/spec11/Subdomain.java index e01f7bf91..8d60751df 100644 --- a/core/src/main/java/google/registry/beam/spec11/Subdomain.java +++ b/core/src/main/java/google/registry/beam/spec11/Subdomain.java @@ -36,12 +36,14 @@ import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; public abstract class Subdomain implements Serializable { private static final ImmutableList FIELD_NAMES = - ImmutableList.of("fullyQualifiedDomainName", "registrarClientId", "registrarEmailAddress"); + ImmutableList.of("domainName", "domainRepoId", "registrarId", "registrarEmailAddress"); /** Returns the fully qualified domain name. */ - abstract String fullyQualifiedDomainName(); - /** Returns the client ID of the associated registrar for this domain. */ - abstract String registrarClientId(); + abstract String domainName(); + /** Returns the domain repo ID (the primary key of the domain table). */ + abstract String domainRepoId(); + /** Returns the registrar ID of the associated registrar for this domain. */ + abstract String registrarId(); /** Returns the email address of the registrar associated with this domain. */ abstract String registrarEmailAddress(); @@ -56,8 +58,9 @@ public abstract class Subdomain implements Serializable { checkFieldsNotNull(FIELD_NAMES, schemaAndRecord); GenericRecord record = schemaAndRecord.getRecord(); return create( - extractField(record, "fullyQualifiedDomainName"), - extractField(record, "registrarClientId"), + extractField(record, "domainName"), + extractField(record, "domainRepoId"), + extractField(record, "registrarId"), extractField(record, "registrarEmailAddress")); } @@ -69,9 +72,11 @@ public abstract class Subdomain implements Serializable { */ @VisibleForTesting static Subdomain create( - String fullyQualifiedDomainName, String registrarClientId, String registrarEmailAddress) { + String domainName, + String domainRepoId, + String registrarId, + String registrarEmailAddress) { return new AutoValue_Subdomain( - fullyQualifiedDomainName, registrarClientId, registrarEmailAddress); + domainName, domainRepoId, registrarId, registrarEmailAddress); } } - diff --git a/core/src/main/java/google/registry/beam/spec11/sql/subdomains.sql b/core/src/main/java/google/registry/beam/spec11/sql/subdomains.sql index 5b7535bd3..c542821e4 100644 --- a/core/src/main/java/google/registry/beam/spec11/sql/subdomains.sql +++ b/core/src/main/java/google/registry/beam/spec11/sql/subdomains.sql @@ -19,11 +19,13 @@ -- email address. SELECT - domain.fullyQualifiedDomainName AS fullyQualifiedDomainName, - registrar.clientId AS registrarClientId, + domain.fullyQualifiedDomainName AS domainName, + domain.__key__.name AS domainRepoId, + registrar.clientId AS clientId, COALESCE(registrar.emailAddress, '') AS registrarEmailAddress FROM ( ( SELECT + __key__, fullyQualifiedDomainName, currentSponsorClientId, creationTime diff --git a/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java b/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java index 5e27cef67..bf9c41ab0 100644 --- a/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java +++ b/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java @@ -81,21 +81,21 @@ public class Spec11PipelineTest { @Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions); @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); - private final Retrier retrier = new Retrier( - new FakeSleeper(new FakeClock(DateTime.parse("2019-07-15TZ"))), 1); + private final Retrier retrier = + new Retrier(new FakeSleeper(new FakeClock(DateTime.parse("2019-07-15TZ"))), 1); private Spec11Pipeline spec11Pipeline; @Before public void initializePipeline() throws IOException { File beamTempFolder = tempFolder.newFolder(); - spec11Pipeline = new Spec11Pipeline( - "test-project", - beamTempFolder.getAbsolutePath() + "/staging", - beamTempFolder.getAbsolutePath() + "/templates/invoicing", - tempFolder.getRoot().getAbsolutePath(), - GoogleCredentialsBundle.create(GoogleCredentials.create(null)), - retrier - ); + spec11Pipeline = + new Spec11Pipeline( + "test-project", + beamTempFolder.getAbsolutePath() + "/staging", + beamTempFolder.getAbsolutePath() + "/templates/invoicing", + tempFolder.getRoot().getAbsolutePath(), + GoogleCredentialsBundle.create(GoogleCredentials.create(null)), + retrier); } private static final ImmutableList BAD_DOMAINS = @@ -107,13 +107,15 @@ public class Spec11PipelineTest { // Put in half for theRegistrar and half for someRegistrar for (int i = 0; i < 255; i++) { subdomainsBuilder.add( - Subdomain.create(String.format("%s.com", i), "theRegistrar", "fake@theRegistrar.com")); + Subdomain.create( + String.format("%s.com", i), "theDomain", "theRegistrar", "fake@theRegistrar.com")); } for (int i = 255; i < 510; i++) { subdomainsBuilder.add( - Subdomain.create(String.format("%s.com", i), "someRegistrar", "fake@someRegistrar.com")); + Subdomain.create( + String.format("%s.com", i), "someDomain", "someRegistrar", "fake@someRegistrar.com")); } - subdomainsBuilder.add(Subdomain.create("no-email.com", "noEmailRegistrar", "")); + subdomainsBuilder.add(Subdomain.create("no-email.com", "fakeDomain", "noEmailRegistrar", "")); return subdomainsBuilder.build(); } @@ -165,8 +167,7 @@ public class Spec11PipelineTest { assertThat(noEmailThreatMatch.length()).isEqualTo(1); assertThat(noEmailThreatMatch.getJSONObject(0).get("fullyQualifiedDomainName")) .isEqualTo("no-email.com"); - assertThat(noEmailThreatMatch.getJSONObject(0).get("threatType")) - .isEqualTo("MALWARE"); + assertThat(noEmailThreatMatch.getJSONObject(0).get("threatType")).isEqualTo("MALWARE"); JSONObject someRegistrarJSON = new JSONObject(sortedLines.get(1)); assertThat(someRegistrarJSON.get("registrarEmailAddress")).isEqualTo("fake@someRegistrar.com"); @@ -176,8 +177,7 @@ public class Spec11PipelineTest { assertThat(someThreatMatch.length()).isEqualTo(1); assertThat(someThreatMatch.getJSONObject(0).get("fullyQualifiedDomainName")) .isEqualTo("444.com"); - assertThat(someThreatMatch.getJSONObject(0).get("threatType")) - .isEqualTo("MALWARE"); + assertThat(someThreatMatch.getJSONObject(0).get("threatType")).isEqualTo("MALWARE"); // theRegistrar has two ThreatMatches, we have to parse it explicitly JSONObject theRegistrarJSON = new JSONObject(sortedLines.get(2)); @@ -228,10 +228,8 @@ public class Spec11PipelineTest { CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class, withSettings().serializable()); when(httpResponse.getStatusLine()) - .thenReturn( - new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 200, "Done")); - when(httpResponse.getEntity()) - .thenReturn(new FakeHttpEntity(getAPIResponse(badUrls))); + .thenReturn(new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 200, "Done")); + when(httpResponse.getEntity()).thenReturn(new FakeHttpEntity(getAPIResponse(badUrls))); return httpResponse; } @@ -298,5 +296,4 @@ public class Spec11PipelineTest { return ImmutableList.copyOf( ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n")); } - }