diff --git a/core/src/main/java/google/registry/beam/spec11/Subdomain.java b/core/src/main/java/google/registry/beam/spec11/DomainNameInfo.java
similarity index 82%
rename from core/src/main/java/google/registry/beam/spec11/Subdomain.java
rename to core/src/main/java/google/registry/beam/spec11/DomainNameInfo.java
index 8d60751df..4107ea539 100644
--- a/core/src/main/java/google/registry/beam/spec11/Subdomain.java
+++ b/core/src/main/java/google/registry/beam/spec11/DomainNameInfo.java
@@ -25,7 +25,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
/**
- * A POJO representing a single subdomain, parsed from a {@code SchemaAndRecord}.
+ * A POJO representing a domain name and associated info, parsed from a {@code SchemaAndRecord}.
*
*
This is a trivially serializable class that allows Beam to transform the results of a Bigquery
* query into a standard Java representation, giving us the type guarantees and ease of manipulation
@@ -33,28 +33,31 @@ import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
* function.
*/
@AutoValue
-public abstract class Subdomain implements Serializable {
+public abstract class DomainNameInfo implements Serializable {
private static final ImmutableList FIELD_NAMES =
ImmutableList.of("domainName", "domainRepoId", "registrarId", "registrarEmailAddress");
/** Returns the fully qualified domain name. */
abstract String domainName();
+
/** Returns the domain repo ID (the primary key of the domain table). */
abstract String domainRepoId();
+
/** Returns the registrar ID of the associated registrar for this domain. */
abstract String registrarId();
+
/** Returns the email address of the registrar associated with this domain. */
abstract String registrarEmailAddress();
/**
- * Constructs a {@link Subdomain} from an Apache Avro {@code SchemaAndRecord}.
+ * Constructs a {@link DomainNameInfo} from an Apache Avro {@code SchemaAndRecord}.
*
* @see
* Apache AVRO GenericRecord
*/
- static Subdomain parseFromRecord(SchemaAndRecord schemaAndRecord) {
+ static DomainNameInfo parseFromRecord(SchemaAndRecord schemaAndRecord) {
checkFieldsNotNull(FIELD_NAMES, schemaAndRecord);
GenericRecord record = schemaAndRecord.getRecord();
return create(
@@ -65,18 +68,15 @@ public abstract class Subdomain implements Serializable {
}
/**
- * Creates a concrete {@link Subdomain}.
+ * Creates a concrete {@link DomainNameInfo}.
*
- *
This should only be used outside this class for testing- instances of {@link Subdomain}
+ *
This should only be used outside this class for testing- instances of {@link DomainNameInfo}
* should otherwise come from {@link #parseFromRecord}.
*/
@VisibleForTesting
- static Subdomain create(
- String domainName,
- String domainRepoId,
- String registrarId,
- String registrarEmailAddress) {
- return new AutoValue_Subdomain(
+ static DomainNameInfo create(
+ String domainName, String domainRepoId, String registrarId, String registrarEmailAddress) {
+ return new AutoValue_DomainNameInfo(
domainName, domainRepoId, registrarId, registrarEmailAddress);
}
}
diff --git a/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java b/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java
index f896e1c89..bd7849936 100644
--- a/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java
+++ b/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java
@@ -55,13 +55,14 @@ public class SafeBrowsingTransforms {
"https://safebrowsing.googleapis.com/v4/threatMatches:find";
/**
- * {@link DoFn} mapping a {@link Subdomain} to its evaluation report from SafeBrowsing.
+ * {@link DoFn} mapping a {@link DomainNameInfo} to its evaluation report from SafeBrowsing.
*
*
Refer to the Lookup API documentation for the request/response format and other details.
*
* @see Lookup API
*/
- static class EvaluateSafeBrowsingFn extends DoFn> {
+ static class EvaluateSafeBrowsingFn
+ extends DoFn> {
/**
* Max number of urls we can check in a single query.
@@ -74,10 +75,11 @@ public class SafeBrowsingTransforms {
private final String apiKey;
/**
- * Maps a subdomain's {@code fullyQualifiedDomainName} to its corresponding {@link Subdomain} to
- * facilitate batching SafeBrowsing API requests.
+ * Maps a domain name's {@code fullyQualifiedDomainName} to its corresponding {@link
+ * DomainNameInfo} to facilitate batching SafeBrowsing API requests.
*/
- private final Map subdomainBuffer = new LinkedHashMap<>(BATCH_SIZE);
+ private final Map domainNameInfoBuffer =
+ new LinkedHashMap<>(BATCH_SIZE);
/**
* Provides the HTTP client we use to interact with the SafeBrowsing API.
@@ -119,37 +121,38 @@ public class SafeBrowsingTransforms {
closeableHttpClientSupplier = clientSupplier;
}
- /** Evaluates any buffered {@link Subdomain} objects upon completing the bundle. */
+ /** Evaluates any buffered {@link DomainNameInfo} objects upon completing the bundle. */
@FinishBundle
public void finishBundle(FinishBundleContext context) {
- if (!subdomainBuffer.isEmpty()) {
- ImmutableSet> results = evaluateAndFlush();
+ if (!domainNameInfoBuffer.isEmpty()) {
+ ImmutableSet> results = evaluateAndFlush();
results.forEach((kv) -> context.output(kv, Instant.now(), GlobalWindow.INSTANCE));
}
}
/**
- * Buffers {@link Subdomain} objects until we reach the batch size, then bulk-evaluate the URLs
- * with the SafeBrowsing API.
+ * Buffers {@link DomainNameInfo} objects until we reach the batch size, then bulk-evaluate the
+ * URLs with the SafeBrowsing API.
*/
@ProcessElement
public void processElement(ProcessContext context) {
- Subdomain subdomain = context.element();
- subdomainBuffer.put(subdomain.domainName(), subdomain);
- if (subdomainBuffer.size() >= BATCH_SIZE) {
- ImmutableSet> results = evaluateAndFlush();
+ DomainNameInfo domainNameInfo = context.element();
+ domainNameInfoBuffer.put(domainNameInfo.domainName(), domainNameInfo);
+ if (domainNameInfoBuffer.size() >= BATCH_SIZE) {
+ ImmutableSet> results = evaluateAndFlush();
results.forEach(context::output);
}
}
/**
- * Evaluates all {@link Subdomain} objects in the buffer and returns a list of key-value pairs
- * from {@link Subdomain} to its SafeBrowsing report.
+ * Evaluates all {@link DomainNameInfo} objects in the buffer and returns a list of key-value
+ * pairs from {@link DomainNameInfo} to its SafeBrowsing report.
*
- *
If a {@link Subdomain} is safe according to the API, it will not emit a report.
+ *
If a {@link DomainNameInfo} is safe according to the API, it will not emit a report.
*/
- private ImmutableSet> evaluateAndFlush() {
- ImmutableSet.Builder> resultBuilder = new ImmutableSet.Builder<>();
+ private ImmutableSet> evaluateAndFlush() {
+ ImmutableSet.Builder> resultBuilder =
+ new ImmutableSet.Builder<>();
try {
URIBuilder uriBuilder = new URIBuilder(SAFE_BROWSING_URL);
// Add the API key param
@@ -174,7 +177,7 @@ public class SafeBrowsingTransforms {
throw new RuntimeException("Caught parsing exception, failing pipeline.", e);
} finally {
// Flush the buffer
- subdomainBuffer.clear();
+ domainNameInfoBuffer.clear();
}
return resultBuilder.build();
}
@@ -183,7 +186,7 @@ public class SafeBrowsingTransforms {
private JSONObject createRequestBody() throws JSONException {
// Accumulate all domain names to evaluate.
JSONArray threatArray = new JSONArray();
- for (String fullyQualifiedDomainName : subdomainBuffer.keySet()) {
+ for (String fullyQualifiedDomainName : domainNameInfoBuffer.keySet()) {
threatArray.put(new JSONObject().put("url", fullyQualifiedDomainName));
}
// Construct the JSON request body
@@ -211,7 +214,7 @@ public class SafeBrowsingTransforms {
*/
private void processResponse(
CloseableHttpResponse response,
- ImmutableSet.Builder> resultBuilder)
+ ImmutableSet.Builder> resultBuilder)
throws JSONException, IOException {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != SC_OK) {
@@ -226,16 +229,17 @@ public class SafeBrowsingTransforms {
if (responseBody.length() == 0) {
logger.atInfo().log("Response was empty, no threats detected");
} else {
- // Emit all Subdomains with their API results.
+ // Emit all DomainNameInfos with their API results.
JSONArray threatMatches = responseBody.getJSONArray("matches");
for (int i = 0; i < threatMatches.length(); i++) {
JSONObject match = threatMatches.getJSONObject(i);
String url = match.getJSONObject("threat").getString("url");
- Subdomain subdomain = subdomainBuffer.get(url);
+ DomainNameInfo domainNameInfo = domainNameInfoBuffer.get(url);
resultBuilder.add(
KV.of(
- subdomain,
- ThreatMatch.create(match.getString("threatType"), subdomain.domainName())));
+ domainNameInfo,
+ ThreatMatch.create(
+ match.getString("threatType"), domainNameInfo.domainName())));
}
}
}
diff --git a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java
index 4f03caa80..d1c1a3898 100644
--- a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java
+++ b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java
@@ -100,20 +100,20 @@ public class Spec11Pipeline implements Serializable {
void setupPipeline(Pipeline pipeline) {
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED);
- PCollection domains =
+ PCollection domains =
options.getDatabase().equals("DATASTORE")
? readFromBigQuery(options, pipeline)
: readFromCloudSql(pipeline);
- PCollection> threatMatches =
+ PCollection> threatMatches =
domains.apply("Run through SafeBrowsing API", ParDo.of(safeBrowsingFn));
saveToSql(threatMatches, options);
saveToGcs(threatMatches, options);
}
- static PCollection readFromCloudSql(Pipeline pipeline) {
- Read