Rename Spec11Pipeline's Subdomain -> DomainNameInfo (#1318)

* Rename Spec11Pipeline's Subdomain -> DomainNameInfo

"Subdomain" never made any sense as a class name because these are all
second-level domain names, along with a little bit of metadata such as some
registrar info. "DomainNameInfo" is a better fit.
This commit is contained in:
Ben McIlwain 2021-09-14 14:07:26 -04:00 committed by GitHub
parent d282c35c64
commit fd4a94b9e7
7 changed files with 100 additions and 91 deletions

View file

@ -25,7 +25,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
/**
* A POJO representing a single subdomain, parsed from a {@code SchemaAndRecord}.
* A POJO representing a domain name and associated info, parsed from a {@code SchemaAndRecord}.
*
* <p>This is a trivially serializable class that allows Beam to transform the results of a Bigquery
* query into a standard Java representation, giving us the type guarantees and ease of manipulation
@ -33,28 +33,31 @@ import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
* function.
*/
@AutoValue
public abstract class Subdomain implements Serializable {
public abstract class DomainNameInfo implements Serializable {
private static final ImmutableList<String> FIELD_NAMES =
ImmutableList.of("domainName", "domainRepoId", "registrarId", "registrarEmailAddress");
/** Returns the fully qualified domain name. */
abstract String domainName();
/** Returns the domain repo ID (the primary key of the domain table). */
abstract String domainRepoId();
/** Returns the registrar ID of the associated registrar for this domain. */
abstract String registrarId();
/** Returns the email address of the registrar associated with this domain. */
abstract String registrarEmailAddress();
/**
* Constructs a {@link Subdomain} from an Apache Avro {@code SchemaAndRecord}.
* Constructs a {@link DomainNameInfo} from an Apache Avro {@code SchemaAndRecord}.
*
* @see <a
* href=http://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/generic/GenericData.Record.html>
* Apache AVRO GenericRecord</a>
*/
static Subdomain parseFromRecord(SchemaAndRecord schemaAndRecord) {
static DomainNameInfo parseFromRecord(SchemaAndRecord schemaAndRecord) {
checkFieldsNotNull(FIELD_NAMES, schemaAndRecord);
GenericRecord record = schemaAndRecord.getRecord();
return create(
@ -65,18 +68,15 @@ public abstract class Subdomain implements Serializable {
}
/**
* Creates a concrete {@link Subdomain}.
* Creates a concrete {@link DomainNameInfo}.
*
* <p>This should only be used outside this class for testing- instances of {@link Subdomain}
* <p>This should only be used outside this class for testing- instances of {@link DomainNameInfo}
* should otherwise come from {@link #parseFromRecord}.
*/
@VisibleForTesting
static Subdomain create(
String domainName,
String domainRepoId,
String registrarId,
String registrarEmailAddress) {
return new AutoValue_Subdomain(
static DomainNameInfo create(
String domainName, String domainRepoId, String registrarId, String registrarEmailAddress) {
return new AutoValue_DomainNameInfo(
domainName, domainRepoId, registrarId, registrarEmailAddress);
}
}

View file

@ -55,13 +55,14 @@ public class SafeBrowsingTransforms {
"https://safebrowsing.googleapis.com/v4/threatMatches:find";
/**
* {@link DoFn} mapping a {@link Subdomain} to its evaluation report from SafeBrowsing.
* {@link DoFn} mapping a {@link DomainNameInfo} to its evaluation report from SafeBrowsing.
*
* <p>Refer to the Lookup API documentation for the request/response format and other details.
*
* @see <a href=https://developers.google.com/safe-browsing/v4/lookup-api>Lookup API</a>
*/
static class EvaluateSafeBrowsingFn extends DoFn<Subdomain, KV<Subdomain, ThreatMatch>> {
static class EvaluateSafeBrowsingFn
extends DoFn<DomainNameInfo, KV<DomainNameInfo, ThreatMatch>> {
/**
* Max number of urls we can check in a single query.
@ -74,10 +75,11 @@ public class SafeBrowsingTransforms {
private final String apiKey;
/**
* Maps a subdomain's {@code fullyQualifiedDomainName} to its corresponding {@link Subdomain} to
* facilitate batching SafeBrowsing API requests.
* Maps a domain name's {@code fullyQualifiedDomainName} to its corresponding {@link
* DomainNameInfo} to facilitate batching SafeBrowsing API requests.
*/
private final Map<String, Subdomain> subdomainBuffer = new LinkedHashMap<>(BATCH_SIZE);
private final Map<String, DomainNameInfo> domainNameInfoBuffer =
new LinkedHashMap<>(BATCH_SIZE);
/**
* Provides the HTTP client we use to interact with the SafeBrowsing API.
@ -119,37 +121,38 @@ public class SafeBrowsingTransforms {
closeableHttpClientSupplier = clientSupplier;
}
/** Evaluates any buffered {@link Subdomain} objects upon completing the bundle. */
/** Evaluates any buffered {@link DomainNameInfo} objects upon completing the bundle. */
@FinishBundle
public void finishBundle(FinishBundleContext context) {
if (!subdomainBuffer.isEmpty()) {
ImmutableSet<KV<Subdomain, ThreatMatch>> results = evaluateAndFlush();
if (!domainNameInfoBuffer.isEmpty()) {
ImmutableSet<KV<DomainNameInfo, ThreatMatch>> results = evaluateAndFlush();
results.forEach((kv) -> context.output(kv, Instant.now(), GlobalWindow.INSTANCE));
}
}
/**
* Buffers {@link Subdomain} objects until we reach the batch size, then bulk-evaluate the URLs
* with the SafeBrowsing API.
* Buffers {@link DomainNameInfo} objects until we reach the batch size, then bulk-evaluate the
* URLs with the SafeBrowsing API.
*/
@ProcessElement
public void processElement(ProcessContext context) {
Subdomain subdomain = context.element();
subdomainBuffer.put(subdomain.domainName(), subdomain);
if (subdomainBuffer.size() >= BATCH_SIZE) {
ImmutableSet<KV<Subdomain, ThreatMatch>> results = evaluateAndFlush();
DomainNameInfo domainNameInfo = context.element();
domainNameInfoBuffer.put(domainNameInfo.domainName(), domainNameInfo);
if (domainNameInfoBuffer.size() >= BATCH_SIZE) {
ImmutableSet<KV<DomainNameInfo, ThreatMatch>> results = evaluateAndFlush();
results.forEach(context::output);
}
}
/**
* Evaluates all {@link Subdomain} objects in the buffer and returns a list of key-value pairs
* from {@link Subdomain} to its SafeBrowsing report.
* Evaluates all {@link DomainNameInfo} objects in the buffer and returns a list of key-value
* pairs from {@link DomainNameInfo} to its SafeBrowsing report.
*
* <p>If a {@link Subdomain} is safe according to the API, it will not emit a report.
* <p>If a {@link DomainNameInfo} is safe according to the API, it will not emit a report.
*/
private ImmutableSet<KV<Subdomain, ThreatMatch>> evaluateAndFlush() {
ImmutableSet.Builder<KV<Subdomain, ThreatMatch>> resultBuilder = new ImmutableSet.Builder<>();
private ImmutableSet<KV<DomainNameInfo, ThreatMatch>> evaluateAndFlush() {
ImmutableSet.Builder<KV<DomainNameInfo, ThreatMatch>> resultBuilder =
new ImmutableSet.Builder<>();
try {
URIBuilder uriBuilder = new URIBuilder(SAFE_BROWSING_URL);
// Add the API key param
@ -174,7 +177,7 @@ public class SafeBrowsingTransforms {
throw new RuntimeException("Caught parsing exception, failing pipeline.", e);
} finally {
// Flush the buffer
subdomainBuffer.clear();
domainNameInfoBuffer.clear();
}
return resultBuilder.build();
}
@ -183,7 +186,7 @@ public class SafeBrowsingTransforms {
private JSONObject createRequestBody() throws JSONException {
// Accumulate all domain names to evaluate.
JSONArray threatArray = new JSONArray();
for (String fullyQualifiedDomainName : subdomainBuffer.keySet()) {
for (String fullyQualifiedDomainName : domainNameInfoBuffer.keySet()) {
threatArray.put(new JSONObject().put("url", fullyQualifiedDomainName));
}
// Construct the JSON request body
@ -211,7 +214,7 @@ public class SafeBrowsingTransforms {
*/
private void processResponse(
CloseableHttpResponse response,
ImmutableSet.Builder<KV<Subdomain, ThreatMatch>> resultBuilder)
ImmutableSet.Builder<KV<DomainNameInfo, ThreatMatch>> resultBuilder)
throws JSONException, IOException {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != SC_OK) {
@ -226,16 +229,17 @@ public class SafeBrowsingTransforms {
if (responseBody.length() == 0) {
logger.atInfo().log("Response was empty, no threats detected");
} else {
// Emit all Subdomains with their API results.
// Emit all DomainNameInfos with their API results.
JSONArray threatMatches = responseBody.getJSONArray("matches");
for (int i = 0; i < threatMatches.length(); i++) {
JSONObject match = threatMatches.getJSONObject(i);
String url = match.getJSONObject("threat").getString("url");
Subdomain subdomain = subdomainBuffer.get(url);
DomainNameInfo domainNameInfo = domainNameInfoBuffer.get(url);
resultBuilder.add(
KV.of(
subdomain,
ThreatMatch.create(match.getString("threatType"), subdomain.domainName())));
domainNameInfo,
ThreatMatch.create(
match.getString("threatType"), domainNameInfo.domainName())));
}
}
}

View file

@ -100,20 +100,20 @@ public class Spec11Pipeline implements Serializable {
void setupPipeline(Pipeline pipeline) {
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED);
PCollection<Subdomain> domains =
PCollection<DomainNameInfo> domains =
options.getDatabase().equals("DATASTORE")
? readFromBigQuery(options, pipeline)
: readFromCloudSql(pipeline);
PCollection<KV<Subdomain, ThreatMatch>> threatMatches =
PCollection<KV<DomainNameInfo, ThreatMatch>> threatMatches =
domains.apply("Run through SafeBrowsing API", ParDo.of(safeBrowsingFn));
saveToSql(threatMatches, options);
saveToGcs(threatMatches, options);
}
static PCollection<Subdomain> readFromCloudSql(Pipeline pipeline) {
Read<Object[], Subdomain> read =
static PCollection<DomainNameInfo> readFromCloudSql(Pipeline pipeline) {
Read<Object[], DomainNameInfo> read =
RegistryJpaIO.read(
"select d, r.emailAddress from Domain d join Registrar r on"
+ " d.currentSponsorClientId = r.clientIdentifier where r.type = 'REAL'"
@ -124,30 +124,31 @@ public class Spec11Pipeline implements Serializable {
return pipeline.apply("Read active domains from Cloud SQL", read);
}
static PCollection<Subdomain> readFromBigQuery(Spec11PipelineOptions options, Pipeline pipeline) {
static PCollection<DomainNameInfo> readFromBigQuery(
Spec11PipelineOptions options, Pipeline pipeline) {
return pipeline.apply(
"Read active domains from BigQuery",
BigQueryIO.read(Subdomain::parseFromRecord)
BigQueryIO.read(DomainNameInfo::parseFromRecord)
.fromQuery(
SqlTemplate.create(getQueryFromFile(Spec11Pipeline.class, "subdomains.sql"))
SqlTemplate.create(getQueryFromFile(Spec11Pipeline.class, "domain_name_infos.sql"))
.put("PROJECT_ID", options.getProject())
.put("DATASTORE_EXPORT_DATASET", "latest_datastore_export")
.put("REGISTRAR_TABLE", "Registrar")
.put("DOMAIN_BASE_TABLE", "DomainBase")
.build())
.withCoder(SerializableCoder.of(Subdomain.class))
.withCoder(SerializableCoder.of(DomainNameInfo.class))
.usingStandardSql()
.withoutValidation()
.withTemplateCompatibility());
}
private static Subdomain parseRow(Object[] row) {
private static DomainNameInfo parseRow(Object[] row) {
DomainBase domainBase = (DomainBase) row[0];
String emailAddress = (String) row[1];
if (emailAddress == null) {
emailAddress = "";
}
return Subdomain.create(
return DomainNameInfo.create(
domainBase.getDomainName(),
domainBase.getRepoId(),
domainBase.getCurrentSponsorClientId(),
@ -155,31 +156,31 @@ public class Spec11Pipeline implements Serializable {
}
static void saveToSql(
PCollection<KV<Subdomain, ThreatMatch>> threatMatches, Spec11PipelineOptions options) {
PCollection<KV<DomainNameInfo, ThreatMatch>> threatMatches, Spec11PipelineOptions options) {
String transformId = "Spec11 Threat Matches";
LocalDate date = LocalDate.parse(options.getDate(), ISODateTimeFormat.date());
threatMatches.apply(
"Write to Sql: " + transformId,
RegistryJpaIO.<KV<Subdomain, ThreatMatch>>write()
RegistryJpaIO.<KV<DomainNameInfo, ThreatMatch>>write()
.withName(transformId)
.withBatchSize(options.getSqlWriteBatchSize())
.withShards(options.getSqlWriteShards())
.withJpaConverter(
(kv) -> {
Subdomain subdomain = kv.getKey();
DomainNameInfo domainNameInfo = kv.getKey();
return new Spec11ThreatMatch.Builder()
.setThreatTypes(
ImmutableSet.of(ThreatType.valueOf(kv.getValue().threatType())))
.setCheckDate(date)
.setDomainName(subdomain.domainName())
.setDomainRepoId(subdomain.domainRepoId())
.setRegistrarId(subdomain.registrarId())
.setDomainName(domainNameInfo.domainName())
.setDomainRepoId(domainNameInfo.domainRepoId())
.setRegistrarId(domainNameInfo.registrarId())
.build();
}));
}
static void saveToGcs(
PCollection<KV<Subdomain, ThreatMatch>> threatMatches, Spec11PipelineOptions options) {
PCollection<KV<DomainNameInfo, ThreatMatch>> threatMatches, Spec11PipelineOptions options) {
threatMatches
.apply(
"Map registrar ID to email/ThreatMatch pair",
@ -187,7 +188,7 @@ public class Spec11Pipeline implements Serializable {
TypeDescriptors.kvs(
TypeDescriptors.strings(), TypeDescriptor.of(EmailAndThreatMatch.class)))
.via(
(KV<Subdomain, ThreatMatch> kv) ->
(KV<DomainNameInfo, ThreatMatch> kv) ->
KV.of(
kv.getKey().registrarId(),
EmailAndThreatMatch.create(
@ -230,7 +231,7 @@ public class Spec11Pipeline implements Serializable {
options.getReportingBucketUrl(),
getSpec11ReportFilePath(LocalDate.parse(options.getDate()))))
.withoutSharding()
.withHeader("Map from registrar email / name to detected subdomain threats:"));
.withHeader("Map from registrar email / name to detected domain name threats:"));
}
public static void main(String[] args) {

View file

@ -80,7 +80,7 @@ class SafeBrowsingTransformsTest {
private static final String REGISTRAR_ID = "registrarID";
private static final String REGISTRAR_EMAIL = "email@registrar.net";
private static ImmutableMap<Subdomain, ThreatMatch> THREAT_MATCH_MAP;
private static ImmutableMap<DomainNameInfo, ThreatMatch> THREAT_MATCH_MAP;
private final CloseableHttpClient mockHttpClient =
mock(CloseableHttpClient.class, withSettings().serializable());
@ -95,24 +95,25 @@ class SafeBrowsingTransformsTest {
final TestPipelineExtension pipeline =
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
private static Subdomain createSubdomain(String url) {
return Subdomain.create(url, REPO_ID, REGISTRAR_ID, REGISTRAR_EMAIL);
private static DomainNameInfo createDomainNameInfo(String url) {
return DomainNameInfo.create(url, REPO_ID, REGISTRAR_ID, REGISTRAR_EMAIL);
}
private KV<Subdomain, ThreatMatch> getKv(String url) {
Subdomain subdomain = createSubdomain(url);
return KV.of(subdomain, THREAT_MATCH_MAP.get(subdomain));
private KV<DomainNameInfo, ThreatMatch> getKv(String url) {
DomainNameInfo domainNameInfo = createDomainNameInfo(url);
return KV.of(domainNameInfo, THREAT_MATCH_MAP.get(domainNameInfo));
}
@BeforeAll
static void beforeAll() {
ImmutableMap.Builder<Subdomain, ThreatMatch> builder = new ImmutableMap.Builder<>();
ImmutableMap.Builder<DomainNameInfo, ThreatMatch> builder = new ImmutableMap.Builder<>();
THREAT_MAP
.entrySet()
.forEach(
kv ->
builder.put(
createSubdomain(kv.getKey()), ThreatMatch.create(kv.getValue(), kv.getKey())));
createDomainNameInfo(kv.getKey()),
ThreatMatch.create(kv.getValue(), kv.getKey())));
THREAT_MATCH_MAP = builder.build();
}
@ -123,16 +124,16 @@ class SafeBrowsingTransformsTest {
@Test
void testSuccess_someBadDomains() throws Exception {
ImmutableList<Subdomain> subdomains =
ImmutableList<DomainNameInfo> domainNameInfos =
ImmutableList.of(
createSubdomain("111.com"),
createSubdomain("hooli.com"),
createSubdomain("party-night.net"),
createSubdomain("anti-anti-anti-virus.dev"),
createSubdomain("no-email.com"));
PCollection<KV<Subdomain, ThreatMatch>> threats =
createDomainNameInfo("111.com"),
createDomainNameInfo("hooli.com"),
createDomainNameInfo("party-night.net"),
createDomainNameInfo("anti-anti-anti-virus.dev"),
createDomainNameInfo("no-email.com"));
PCollection<KV<DomainNameInfo, ThreatMatch>> threats =
pipeline
.apply(Create.of(subdomains).withCoder(SerializableCoder.of(Subdomain.class)))
.apply(Create.of(domainNameInfos).withCoder(SerializableCoder.of(DomainNameInfo.class)))
.apply(ParDo.of(safeBrowsingFn));
PAssert.that(threats)
@ -146,14 +147,14 @@ class SafeBrowsingTransformsTest {
@Test
void testSuccess_noBadDomains() throws Exception {
ImmutableList<Subdomain> subdomains =
ImmutableList<DomainNameInfo> domainNameInfos =
ImmutableList.of(
createSubdomain("hello_kitty.dev"),
createSubdomain("555.com"),
createSubdomain("goodboy.net"));
PCollection<KV<Subdomain, ThreatMatch>> threats =
createDomainNameInfo("hello_kitty.dev"),
createDomainNameInfo("555.com"),
createDomainNameInfo("goodboy.net"));
PCollection<KV<DomainNameInfo, ThreatMatch>> threats =
pipeline
.apply(Create.of(subdomains).withCoder(SerializableCoder.of(Subdomain.class)))
.apply(Create.of(domainNameInfos).withCoder(SerializableCoder.of(DomainNameInfo.class)))
.apply(ParDo.of(safeBrowsingFn));
PAssert.that(threats).empty();

View file

@ -94,13 +94,16 @@ class Spec11PipelineTest {
private final CloseableHttpClient mockHttpClient =
mock(CloseableHttpClient.class, withSettings().serializable());
private static final ImmutableList<Subdomain> SUBDOMAINS =
private static final ImmutableList<DomainNameInfo> DOMAIN_NAME_INFOS =
ImmutableList.of(
Subdomain.create("111.com", "123456789-COM", "hello-registrar", "email@hello.net"),
Subdomain.create("party-night.net", "2244AABBC-NET", "kitty-registrar", "contact@kit.ty"),
Subdomain.create("bitcoin.bank", "1C3D5E7F9-BANK", "hello-registrar", "email@hello.net"),
Subdomain.create("no-email.com", "2A4BA9BBC-COM", "kitty-registrar", "contact@kit.ty"),
Subdomain.create(
DomainNameInfo.create("111.com", "123456789-COM", "hello-registrar", "email@hello.net"),
DomainNameInfo.create(
"party-night.net", "2244AABBC-NET", "kitty-registrar", "contact@kit.ty"),
DomainNameInfo.create(
"bitcoin.bank", "1C3D5E7F9-BANK", "hello-registrar", "email@hello.net"),
DomainNameInfo.create(
"no-email.com", "2A4BA9BBC-COM", "kitty-registrar", "contact@kit.ty"),
DomainNameInfo.create(
"anti-anti-anti-virus.dev", "555666888-DEV", "cool-registrar", "cool@aid.net"));
private static final ImmutableList<ThreatMatch> THREAT_MATCHES =
@ -129,7 +132,7 @@ class Spec11PipelineTest {
PipelineOptionsFactory.create().as(Spec11PipelineOptions.class);
private File reportingBucketUrl;
private PCollection<KV<Subdomain, ThreatMatch>> threatMatches;
private PCollection<KV<DomainNameInfo, ThreatMatch>> threatMatches;
ImmutableSet<Spec11ThreatMatch> sqlThreatMatches;
@ -143,11 +146,11 @@ class Spec11PipelineTest {
threatMatches =
pipeline.apply(
Create.of(
Streams.zip(SUBDOMAINS.stream(), THREAT_MATCHES.stream(), KV::of)
Streams.zip(DOMAIN_NAME_INFOS.stream(), THREAT_MATCHES.stream(), KV::of)
.collect(toImmutableList()))
.withCoder(
KvCoder.of(
SerializableCoder.of(Subdomain.class),
SerializableCoder.of(DomainNameInfo.class),
SerializableCoder.of(ThreatMatch.class))));
sqlThreatMatches =
@ -223,8 +226,8 @@ class Spec11PipelineTest {
@Test
void testSuccess_readFromCloudSql() throws Exception {
setupCloudSql();
PCollection<Subdomain> subdomains = Spec11Pipeline.readFromCloudSql(pipeline);
PAssert.that(subdomains).containsInAnyOrder(SUBDOMAINS);
PCollection<DomainNameInfo> domainNameInfos = Spec11Pipeline.readFromCloudSql(pipeline);
PAssert.that(domainNameInfos).containsInAnyOrder(DOMAIN_NAME_INFOS);
pipeline.run().waitUntilFinish();
}

View file

@ -1,4 +1,4 @@
Map from registrar email / name to detected subdomain threats:
Map from registrar email / name to detected domain name threats:
{"threatMatches":[{"threatType":"UNWANTED_SOFTWARE","fullyQualifiedDomainName":"anti-anti-anti-virus.dev"}],"registrarClientId":"cool-registrar","registrarEmailAddress":"cool@aid.net"}
{"threatMatches":[{"threatType":"MALWARE","fullyQualifiedDomainName":"111.com"},{"threatType":"POTENTIALLY_HARMFUL_APPLICATION","fullyQualifiedDomainName":"bitcoin.bank"}],"registrarClientId":"hello-registrar","registrarEmailAddress":"email@hello.net"}
{"threatMatches":[{"threatType":"THREAT_TYPE_UNSPECIFIED","fullyQualifiedDomainName":"no-eamil.com"},{"threatType":"SOCIAL_ENGINEERING","fullyQualifiedDomainName":"party-night.net"}],"registrarClientId":"kitty-registrar","registrarEmailAddress":"contact@kit.ty"}