Add domainRepoId to Subdomain class (#674)

* Change Subdomain class to contain domainRepoId

* Remove jpaTm from Spec11PipelineTest and change clientId -> registrarId

* Remove 'client' from a comment

* Include changes to Spec11Pipeline

* add SafeBrowsingTransforms

* Run style
This commit is contained in:
Legina Chen 2020-07-09 16:26:35 -07:00 committed by GitHub
parent 9d72f87cfa
commit c53517cb4d
5 changed files with 45 additions and 39 deletions

View file

@ -141,7 +141,7 @@ public class SafeBrowsingTransforms {
@ProcessElement
public void processElement(ProcessContext context) {
Subdomain subdomain = context.element();
subdomainBuffer.put(subdomain.fullyQualifiedDomainName(), subdomain);
subdomainBuffer.put(subdomain.domainName(), subdomain);
if (subdomainBuffer.size() >= BATCH_SIZE) {
ImmutableSet<KV<Subdomain, ThreatMatch>> results = evaluateAndFlush();
results.forEach(context::output);
@ -239,7 +239,7 @@ public class SafeBrowsingTransforms {
String url = match.getJSONObject("threat").getString("url");
Subdomain subdomain = subdomainBuffer.get(url);
resultBuilder.add(
KV.of(subdomain, ThreatMatch.create(match, subdomain.fullyQualifiedDomainName())));
KV.of(subdomain, ThreatMatch.create(match, subdomain.domainName())));
}
}
}

View file

@ -77,7 +77,7 @@ public class Spec11Pipeline implements Serializable {
public static final String REGISTRAR_EMAIL_FIELD = "registrarEmailAddress";
/** The JSON object field into which we put the registrar's name for Spec11 reports. */
public static final String REGISTRAR_CLIENT_ID_FIELD = "registrarClientId";
/** The JSON object field we put the threat match array for Spec11 reports. */
/** The JSON object field into which we put the threat match array for Spec11 reports. */
public static final String THREAT_MATCHES_FIELD = "threatMatches";
private final String projectId;
@ -176,9 +176,11 @@ public class Spec11Pipeline implements Serializable {
PCollection<Subdomain> domains,
EvaluateSafeBrowsingFn evaluateSafeBrowsingFn,
ValueProvider<String> dateProvider) {
PCollection<KV<Subdomain, ThreatMatch>> subdomains =
/* Store ThreatMatch objects in JSON. */
PCollection<KV<Subdomain, ThreatMatch>> subdomainsJson =
domains.apply("Run through SafeBrowsingAPI", ParDo.of(evaluateSafeBrowsingFn));
subdomains
subdomainsJson
.apply(
"Map registrar client ID to email/ThreatMatch pair",
MapElements.into(
@ -187,7 +189,7 @@ public class Spec11Pipeline implements Serializable {
.via(
(KV<Subdomain, ThreatMatch> kv) ->
KV.of(
kv.getKey().registrarClientId(),
kv.getKey().registrarId(),
EmailAndThreatMatch.create(
kv.getKey().registrarEmailAddress(), kv.getValue()))))
.apply("Group by registrar client ID", GroupByKey.create())

View file

@ -36,12 +36,14 @@ import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
public abstract class Subdomain implements Serializable {
private static final ImmutableList<String> FIELD_NAMES =
ImmutableList.of("fullyQualifiedDomainName", "registrarClientId", "registrarEmailAddress");
ImmutableList.of("domainName", "domainRepoId", "registrarId", "registrarEmailAddress");
/** Returns the fully qualified domain name. */
abstract String fullyQualifiedDomainName();
/** Returns the client ID of the associated registrar for this domain. */
abstract String registrarClientId();
abstract String domainName();
/** Returns the domain repo ID (the primary key of the domain table). */
abstract String domainRepoId();
/** Returns the registrar ID of the associated registrar for this domain. */
abstract String registrarId();
/** Returns the email address of the registrar associated with this domain. */
abstract String registrarEmailAddress();
@ -56,8 +58,9 @@ public abstract class Subdomain implements Serializable {
checkFieldsNotNull(FIELD_NAMES, schemaAndRecord);
GenericRecord record = schemaAndRecord.getRecord();
return create(
extractField(record, "fullyQualifiedDomainName"),
extractField(record, "registrarClientId"),
extractField(record, "domainName"),
extractField(record, "domainRepoId"),
extractField(record, "registrarId"),
extractField(record, "registrarEmailAddress"));
}
@ -69,9 +72,11 @@ public abstract class Subdomain implements Serializable {
*/
@VisibleForTesting
static Subdomain create(
String fullyQualifiedDomainName, String registrarClientId, String registrarEmailAddress) {
String domainName,
String domainRepoId,
String registrarId,
String registrarEmailAddress) {
return new AutoValue_Subdomain(
fullyQualifiedDomainName, registrarClientId, registrarEmailAddress);
domainName, domainRepoId, registrarId, registrarEmailAddress);
}
}

View file

@ -19,11 +19,13 @@
-- email address.
SELECT
domain.fullyQualifiedDomainName AS fullyQualifiedDomainName,
registrar.clientId AS registrarClientId,
domain.fullyQualifiedDomainName AS domainName,
domain.__key__.name AS domainRepoId,
registrar.clientId AS clientId,
COALESCE(registrar.emailAddress, '') AS registrarEmailAddress
FROM ( (
SELECT
__key__,
fullyQualifiedDomainName,
currentSponsorClientId,
creationTime

View file

@ -81,21 +81,21 @@ public class Spec11PipelineTest {
@Rule public final transient TestPipeline p = TestPipeline.fromOptions(pipelineOptions);
@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
private final Retrier retrier = new Retrier(
new FakeSleeper(new FakeClock(DateTime.parse("2019-07-15TZ"))), 1);
private final Retrier retrier =
new Retrier(new FakeSleeper(new FakeClock(DateTime.parse("2019-07-15TZ"))), 1);
private Spec11Pipeline spec11Pipeline;
@Before
public void initializePipeline() throws IOException {
File beamTempFolder = tempFolder.newFolder();
spec11Pipeline = new Spec11Pipeline(
spec11Pipeline =
new Spec11Pipeline(
"test-project",
beamTempFolder.getAbsolutePath() + "/staging",
beamTempFolder.getAbsolutePath() + "/templates/invoicing",
tempFolder.getRoot().getAbsolutePath(),
GoogleCredentialsBundle.create(GoogleCredentials.create(null)),
retrier
);
retrier);
}
private static final ImmutableList<String> BAD_DOMAINS =
@ -107,13 +107,15 @@ public class Spec11PipelineTest {
// Put in half for theRegistrar and half for someRegistrar
for (int i = 0; i < 255; i++) {
subdomainsBuilder.add(
Subdomain.create(String.format("%s.com", i), "theRegistrar", "fake@theRegistrar.com"));
Subdomain.create(
String.format("%s.com", i), "theDomain", "theRegistrar", "fake@theRegistrar.com"));
}
for (int i = 255; i < 510; i++) {
subdomainsBuilder.add(
Subdomain.create(String.format("%s.com", i), "someRegistrar", "fake@someRegistrar.com"));
Subdomain.create(
String.format("%s.com", i), "someDomain", "someRegistrar", "fake@someRegistrar.com"));
}
subdomainsBuilder.add(Subdomain.create("no-email.com", "noEmailRegistrar", ""));
subdomainsBuilder.add(Subdomain.create("no-email.com", "fakeDomain", "noEmailRegistrar", ""));
return subdomainsBuilder.build();
}
@ -165,8 +167,7 @@ public class Spec11PipelineTest {
assertThat(noEmailThreatMatch.length()).isEqualTo(1);
assertThat(noEmailThreatMatch.getJSONObject(0).get("fullyQualifiedDomainName"))
.isEqualTo("no-email.com");
assertThat(noEmailThreatMatch.getJSONObject(0).get("threatType"))
.isEqualTo("MALWARE");
assertThat(noEmailThreatMatch.getJSONObject(0).get("threatType")).isEqualTo("MALWARE");
JSONObject someRegistrarJSON = new JSONObject(sortedLines.get(1));
assertThat(someRegistrarJSON.get("registrarEmailAddress")).isEqualTo("fake@someRegistrar.com");
@ -176,8 +177,7 @@ public class Spec11PipelineTest {
assertThat(someThreatMatch.length()).isEqualTo(1);
assertThat(someThreatMatch.getJSONObject(0).get("fullyQualifiedDomainName"))
.isEqualTo("444.com");
assertThat(someThreatMatch.getJSONObject(0).get("threatType"))
.isEqualTo("MALWARE");
assertThat(someThreatMatch.getJSONObject(0).get("threatType")).isEqualTo("MALWARE");
// theRegistrar has two ThreatMatches, we have to parse it explicitly
JSONObject theRegistrarJSON = new JSONObject(sortedLines.get(2));
@ -228,10 +228,8 @@ public class Spec11PipelineTest {
CloseableHttpResponse httpResponse =
mock(CloseableHttpResponse.class, withSettings().serializable());
when(httpResponse.getStatusLine())
.thenReturn(
new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 200, "Done"));
when(httpResponse.getEntity())
.thenReturn(new FakeHttpEntity(getAPIResponse(badUrls)));
.thenReturn(new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 200, "Done"));
when(httpResponse.getEntity()).thenReturn(new FakeHttpEntity(getAPIResponse(badUrls)));
return httpResponse;
}
@ -298,5 +296,4 @@ public class Spec11PipelineTest {
return ImmutableList.copyOf(
ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n"));
}
}