Update input/output of Spec11 pipeline to final format

This changes the BigQuery input to the fields we ultimately want (fqdn,
registrarName, registrarEmailAddress) and the output to a structured POJO
holding the results from the API. This POJO is then converted to its final text output, i.e.:

Map from registrar e-mail to list of threat-detected subdomains:
{"registrarEmail": "c@fake.com", "threats": [{"url": "a.com", "threatType": "MALWARE"}]}
{"registrarEmail": "d@fake.com", "threats": [{"url": "x.com", "threatType": "MALWARE"}, {"url": "y.com", "threatType": "MALWARE"}]}

This gives us all the data we want in a JSON structured format, to be acted upon downstream by the to-be-constructed PublishSpec11ReportAction. Ideally, we would send an e-mail directly from the beam pipeline, but this is only possible through third-party providers (as opposed to app engine itself).

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=209416880
This commit is contained in:
larryruili 2018-08-20 07:54:31 -07:00 committed by jianglai
parent 7dcadaecf6
commit f7bc17fbe8
11 changed files with 393 additions and 130 deletions

View file

@ -8,6 +8,7 @@ java_library(
name = "beam",
srcs = glob(["*.java"]),
deps = [
"//java/google/registry/util",
"@com_google_flogger",
"@com_google_flogger_system_backend",
"@com_google_guava",

View file

@ -17,6 +17,8 @@ package google.registry.beam;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.FluentLogger;
import com.google.common.io.Resources;
import google.registry.util.ResourceUtils;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
@ -54,4 +56,12 @@ public class BeamUtils {
missingFieldList, record));
}
}
/**
* Returns the {@link String} contents for a file in the {@code sql/} directory relative to a
* class.
*/
public static String getQueryFromFile(Class<?> clazz, String filename) {
return ResourceUtils.readResourceUtf8(Resources.getResource(clazz, "sql/" + filename));
}
}

View file

@ -14,8 +14,8 @@
package google.registry.beam.invoicing;
import com.google.common.io.Resources;
import google.registry.util.ResourceUtils;
import static google.registry.beam.BeamUtils.getQueryFromFile;
import google.registry.util.SqlTemplate;
import java.time.LocalDateTime;
import java.time.LocalTime;
@ -91,7 +91,7 @@ public class InvoicingUtils {
LocalDateTime firstMoment = reportingMonth.atDay(1).atTime(LocalTime.MIDNIGHT);
LocalDateTime lastMoment = reportingMonth.atEndOfMonth().atTime(LocalTime.MAX);
// Construct the month's query by filling in the billing_events.sql template
return SqlTemplate.create(getQueryFromFile("billing_events.sql"))
return SqlTemplate.create(getQueryFromFile(InvoicingPipeline.class, "billing_events.sql"))
.put("FIRST_TIMESTAMP_OF_MONTH", firstMoment.format(TIMESTAMP_FORMATTER))
.put("LAST_TIMESTAMP_OF_MONTH", lastMoment.format(TIMESTAMP_FORMATTER))
.put("PROJECT_ID", projectId)
@ -103,10 +103,4 @@ public class InvoicingUtils {
.build();
});
}
/** Returns the {@link String} contents for a file in the {@code beam/sql/} directory. */
private static String getQueryFromFile(String filename) {
return ResourceUtils.readResourceUtf8(
Resources.getResource(InvoicingUtils.class, "sql/" + filename));
}
}

View file

@ -11,6 +11,7 @@ java_library(
deps = [
"//java/google/registry/beam",
"//java/google/registry/config",
"//java/google/registry/util",
"@com_google_auto_value",
"@com_google_dagger",
"@com_google_flogger",

View file

@ -19,9 +19,10 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.http.HttpStatus.SC_OK;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.FluentLogger;
import com.google.common.io.CharStreams;
import google.registry.util.Retrier;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
@ -62,7 +63,7 @@ public class SafeBrowsingTransforms {
*
* @see <a href=https://developers.google.com/safe-browsing/v4/lookup-api>Lookup API</a>
*/
static class EvaluateSafeBrowsingFn extends DoFn<Subdomain, KV<Subdomain, String>> {
static class EvaluateSafeBrowsingFn extends DoFn<Subdomain, KV<Subdomain, ThreatMatch>> {
/**
* Max number of urls we can check in a single query.
@ -75,8 +76,8 @@ public class SafeBrowsingTransforms {
private final ValueProvider<String> apiKeyProvider;
/**
* Maps a subdomain's HTTP URL to its corresponding {@link Subdomain} to facilitate batching
* SafeBrowsing API requests.
* Maps a subdomain's {@code fullyQualifiedDomainName} to its corresponding {@link Subdomain} to
* facilitate batching SafeBrowsing API requests.
*/
private final Map<String, Subdomain> subdomainBuffer = new LinkedHashMap<>(BATCH_SIZE);
@ -88,6 +89,9 @@ public class SafeBrowsingTransforms {
*/
private final Supplier<CloseableHttpClient> closeableHttpClientSupplier;
/** Retries on receiving transient failures such as {@link IOException}. */
private final Retrier retrier;
/**
* Constructs a {@link EvaluateSafeBrowsingFn} that gets its API key from the given provider.
*
@ -99,8 +103,9 @@ public class SafeBrowsingTransforms {
* @param apiKeyProvider provides the SafeBrowsing API key from {@code KMS} at runtime
*/
@SuppressWarnings("unchecked")
EvaluateSafeBrowsingFn(ValueProvider<String> apiKeyProvider) {
EvaluateSafeBrowsingFn(ValueProvider<String> apiKeyProvider, Retrier retrier) {
this.apiKeyProvider = apiKeyProvider;
this.retrier = retrier;
this.closeableHttpClientSupplier = (Supplier & Serializable) HttpClients::createDefault;
}
@ -113,8 +118,11 @@ public class SafeBrowsingTransforms {
@VisibleForTesting
@SuppressWarnings("unchecked")
EvaluateSafeBrowsingFn(
ValueProvider<String> apiKeyProvider, Supplier<CloseableHttpClient> clientSupplier) {
ValueProvider<String> apiKeyProvider,
Retrier retrier,
Supplier<CloseableHttpClient> clientSupplier) {
this.apiKeyProvider = apiKeyProvider;
this.retrier = retrier;
this.closeableHttpClientSupplier = clientSupplier;
}
@ -122,7 +130,7 @@ public class SafeBrowsingTransforms {
@FinishBundle
public void finishBundle(FinishBundleContext context) {
if (!subdomainBuffer.isEmpty()) {
ImmutableList<KV<Subdomain, String>> results = evaluateAndFlush();
ImmutableSet<KV<Subdomain, ThreatMatch>> results = evaluateAndFlush();
results.forEach((kv) -> context.output(kv, Instant.now(), GlobalWindow.INSTANCE));
}
}
@ -134,11 +142,9 @@ public class SafeBrowsingTransforms {
@ProcessElement
public void processElement(ProcessContext context) {
Subdomain subdomain = context.element();
// We put HTTP URLs into the buffer because the API requires specifying the protocol.
subdomainBuffer.put(
String.format("http://%s", subdomain.fullyQualifiedDomainName()), subdomain);
subdomainBuffer.put(subdomain.fullyQualifiedDomainName(), subdomain);
if (subdomainBuffer.size() >= BATCH_SIZE) {
ImmutableList<KV<Subdomain, String>> results = evaluateAndFlush();
ImmutableSet<KV<Subdomain, ThreatMatch>> results = evaluateAndFlush();
results.forEach(context::output);
}
}
@ -149,8 +155,8 @@ public class SafeBrowsingTransforms {
*
* <p>If a {@link Subdomain} is safe according to the API, it will not emit a report.
*/
private ImmutableList<KV<Subdomain, String>> evaluateAndFlush() {
ImmutableList.Builder<KV<Subdomain, String>> resultBuilder = new ImmutableList.Builder<>();
private ImmutableSet<KV<Subdomain, ThreatMatch>> evaluateAndFlush() {
ImmutableSet.Builder<KV<Subdomain, ThreatMatch>> resultBuilder = new ImmutableSet.Builder<>();
try {
URIBuilder uriBuilder = new URIBuilder(SAFE_BROWSING_URL);
// Add the API key param
@ -161,17 +167,18 @@ public class SafeBrowsingTransforms {
JSONObject requestBody = createRequestBody();
httpPost.setEntity(new ByteArrayEntity(requestBody.toString().getBytes(UTF_8)));
try (CloseableHttpClient client = closeableHttpClientSupplier.get();
CloseableHttpResponse response = client.execute(httpPost)) {
processResponse(response, resultBuilder);
}
} catch (URISyntaxException | JSONException e) {
// TODO(b/112354588): also send an alert e-mail to indicate the pipeline failed
logger.atSevere().withCause(e).log(
"Caught parsing error during execution, skipping batch.");
} catch (IOException e) {
logger.atSevere().withCause(e).log("Caught IOException during processing, skipping batch.");
// Retry transient exceptions such as IOException
retrier.callWithRetry(
() -> {
try (CloseableHttpClient client = closeableHttpClientSupplier.get();
CloseableHttpResponse response = client.execute(httpPost)) {
processResponse(response, resultBuilder);
}
},
IOException.class);
} catch (URISyntaxException | JSONException e) {
// Fail the pipeline on a parsing exception- this indicates the API likely changed.
throw new RuntimeException("Caught parsing exception, failing pipeline.", e);
} finally {
// Flush the buffer
subdomainBuffer.clear();
@ -206,12 +213,13 @@ public class SafeBrowsingTransforms {
}
/**
* Iterates through all threat matches in the API response and adds them to the resultBuilder.
* Iterates through all threat matches in the API response and adds them to the {@code
* resultBuilder}.
*/
private void processResponse(
CloseableHttpResponse response, ImmutableList.Builder<KV<Subdomain, String>> resultBuilder)
CloseableHttpResponse response,
ImmutableSet.Builder<KV<Subdomain, ThreatMatch>> resultBuilder)
throws JSONException, IOException {
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != SC_OK) {
logger.atWarning().log("Got unexpected status code %s from response", statusCode);
@ -230,7 +238,9 @@ public class SafeBrowsingTransforms {
for (int i = 0; i < threatMatches.length(); i++) {
JSONObject match = threatMatches.getJSONObject(i);
String url = match.getJSONObject("threat").getString("url");
resultBuilder.add(KV.of(subdomainBuffer.get(url), match.toString()));
Subdomain subdomain = subdomainBuffer.get(url);
resultBuilder.add(
KV.of(subdomain, ThreatMatch.create(match, subdomain.fullyQualifiedDomainName())));
}
}
}

View file

@ -14,8 +14,12 @@
package google.registry.beam.spec11;
import static google.registry.beam.BeamUtils.getQueryFromFile;
import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn;
import google.registry.config.RegistryConfig.Config;
import google.registry.util.Retrier;
import google.registry.util.SqlTemplate;
import java.io.Serializable;
import javax.inject.Inject;
import org.apache.beam.runners.dataflow.DataflowRunner;
@ -27,10 +31,17 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sample;
import org.apache.beam.sdk.transforms.ToString;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
/**
* Definition of a Dataflow pipeline template, which generates a given month's spec11 report.
@ -60,6 +71,9 @@ public class Spec11Pipeline implements Serializable {
@Config("spec11BucketUrl")
String spec11BucketUrl;
@Inject
Retrier retrier;
@Inject
Spec11Pipeline() {}
@ -106,13 +120,21 @@ public class Spec11Pipeline implements Serializable {
"Read active domains from BigQuery",
BigQueryIO.read(Subdomain::parseFromRecord)
.fromQuery(
// This query must be customized for your own use.
"SELECT * FROM YOUR_TABLE_HERE")
SqlTemplate.create(getQueryFromFile(Spec11Pipeline.class, "subdomains.sql"))
.put("PROJECT_ID", projectId)
.put("DATASTORE_EXPORT_DATASET", "latest_datastore_export")
.put("REGISTRAR_TABLE", "Registrar")
.put("DOMAIN_BASE_TABLE", "DomainBase")
.build())
.withCoder(SerializableCoder.of(Subdomain.class))
.usingStandardSql()
.withoutValidation()
.withTemplateCompatibility());
evaluateUrlHealth(domains, new EvaluateSafeBrowsingFn(options.getSafeBrowsingApiKey()));
evaluateUrlHealth(
domains,
new EvaluateSafeBrowsingFn(options.getSafeBrowsingApiKey(), retrier),
options.getYearMonth());
p.run();
}
@ -122,21 +144,51 @@ public class Spec11Pipeline implements Serializable {
* <p>This is factored out to facilitate testing.
*/
void evaluateUrlHealth(
PCollection<Subdomain> domains, EvaluateSafeBrowsingFn evaluateSafeBrowsingFn) {
PCollection<Subdomain> domains,
EvaluateSafeBrowsingFn evaluateSafeBrowsingFn,
ValueProvider<String> yearMonthProvider) {
domains
// TODO(b/111545355): Remove this limiter once we're confident we won't go over quota.
.apply(
"Get just a few representative samples for now, don't want to overwhelm our quota",
Sample.any(1000))
.apply("Run through SafeBrowsingAPI", ParDo.of(evaluateSafeBrowsingFn))
.apply("Convert results to string", ToString.elements())
.apply(
"Map registrar e-mail to ThreatMatch",
MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(), TypeDescriptor.of(ThreatMatch.class)))
.via(
(KV<Subdomain, ThreatMatch> kv) ->
KV.of(kv.getKey().registrarEmailAddress(), kv.getValue())))
.apply("Group by registrar email address", GroupByKey.create())
.apply(
"Convert results to JSON format",
MapElements.into(TypeDescriptors.strings())
.via(
(KV<String, Iterable<ThreatMatch>> kv) -> {
JSONObject output = new JSONObject();
try {
output.put("registrarEmailAddress", kv.getKey());
JSONArray threatMatches = new JSONArray();
for (ThreatMatch match : kv.getValue()) {
threatMatches.put(match.toJSON());
}
output.put("threatMatches", threatMatches);
return output.toString();
} catch (JSONException e) {
throw new RuntimeException(
String.format(
"Encountered an error constructing the JSON for %s", kv.toString()),
e);
}
}))
.apply(
"Output to text file",
TextIO.write()
// TODO(b/111545355): Replace this with a templated directory based on yearMonth
.to(spec11BucketUrl)
.to(
NestedValueProvider.of(
yearMonthProvider,
yearMonth ->
String.format(
"%s/%s/%s-monthly-report", spec11BucketUrl, yearMonth, yearMonth)))
.withoutSharding()
.withHeader("HELLO WORLD"));
.withHeader("Map from registrar email to detected subdomain threats:"));
}
}

View file

@ -22,9 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.FluentLogger;
import java.io.Serializable;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
@ -42,14 +39,14 @@ public abstract class Subdomain implements Serializable {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final ImmutableList<String> FIELD_NAMES =
ImmutableList.of("fullyQualifiedDomainName", "statuses", "creationTime");
ImmutableList.of("fullyQualifiedDomainName", "registrarName", "registrarEmailAddress");
/** Returns the fully qualified domain name. */
abstract String fullyQualifiedDomainName();
/** Returns the UTC DateTime this domain was created. */
abstract ZonedDateTime creationTime();
/** Returns the space-delimited list of statuses on this domain. */
abstract String statuses();
/** Returns the name of the associated registrar for this domain. */
abstract String registrarName();
/** Returns the email address of the registrar associated with this domain. */
abstract String registrarEmailAddress();
/**
* Constructs a {@link Subdomain} from an Apache Avro {@code SchemaAndRecord}.
@ -63,10 +60,8 @@ public abstract class Subdomain implements Serializable {
GenericRecord record = schemaAndRecord.getRecord();
return create(
extractField(record, "fullyQualifiedDomainName"),
// Bigquery provides UNIX timestamps with microsecond precision.
Instant.ofEpochMilli(Long.parseLong(extractField(record, "creationTime")) / 1000)
.atZone(ZoneId.of("UTC")),
extractField(record, "statuses"));
extractField(record, "registrarName"),
extractField(record, "registrarEmailAddress"));
}
/**
@ -77,8 +72,8 @@ public abstract class Subdomain implements Serializable {
*/
@VisibleForTesting
static Subdomain create(
String fullyQualifiedDomainName, ZonedDateTime creationTime, String statuses) {
return new AutoValue_Subdomain(fullyQualifiedDomainName, creationTime, statuses);
String fullyQualifiedDomainName, String registrarName, String registrarEmailAddress) {
return new AutoValue_Subdomain(fullyQualifiedDomainName, registrarName, registrarEmailAddress);
}
}

View file

@ -0,0 +1,72 @@
// Copyright 2018 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.spec11;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import org.json.JSONException;
import org.json.JSONObject;
/** A POJO representing a threat match response from the {@code SafeBrowsing API}. */
@AutoValue
public abstract class ThreatMatch implements Serializable {
private static final String THREAT_TYPE_FIELD = "threatType";
private static final String PLATFORM_TYPE_FIELD = "platformType";
private static final String METADATA_FIELD = "threatEntryMetadata";
/** Returns what kind of threat it is (malware, phishing etc.) */
abstract String threatType();
/** Returns what platforms it affects (Windows, Linux etc.) */
abstract String platformType();
/**
* Returns a String representing a JSON Object containing arbitrary metadata associated with this
* threat, or "NONE" if there is no metadata to retrieve.
*
* <p>This ideally would be a {@link JSONObject} type, but can't be due to serialization
* requirements.
*/
abstract String metadata();
/** Returns the fully qualified domain name [SLD].[TLD] of the matched threat. */
abstract String fullyQualifiedDomainName();
/**
* Constructs a {@link ThreatMatch} by parsing a {@code SafeBrowsing API} response {@link
* JSONObject}.
*
* @throws JSONException when encountering parse errors in the response format
*/
static ThreatMatch create(JSONObject threatMatchJSON, String fullyQualifiedDomainName)
throws JSONException {
return new AutoValue_ThreatMatch(
threatMatchJSON.getString(THREAT_TYPE_FIELD),
threatMatchJSON.getString(PLATFORM_TYPE_FIELD),
threatMatchJSON.has(METADATA_FIELD)
? threatMatchJSON.getJSONObject(METADATA_FIELD).toString()
: "NONE",
fullyQualifiedDomainName);
}
/** Returns a {@link String} containing the simplest details about this threat. */
String getSimpleDetails() {
return String.format("%s;%s", this.fullyQualifiedDomainName(), this.threatType());
}
/** Returns a {@link JSONObject} representing a subset of this object's data. */
JSONObject toJSON() throws JSONException {
return new JSONObject()
.put("fullyQualifiedDomainName", fullyQualifiedDomainName())
.put("threatType", threatType());
}
}

View file

@ -0,0 +1,49 @@
#standardSQL
-- Copyright 2018 The Nomulus Authors. All Rights Reserved.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-- This query gathers all Subdomains active within a given yearMonth
-- and emits a row containing its fully qualified domain name
-- [SLD].[TLD], the current registrar's name, and the current registrar's
-- email address.
SELECT
domain.fullyQualifiedDomainName AS fullyQualifiedDomainName,
registrar.name AS registrarName,
registrar.emailAddress AS registrarEmailAddress
FROM ( (
SELECT
fullyQualifiedDomainName,
currentSponsorClientId,
creationTime
FROM
`%PROJECT_ID%.%DATASTORE_EXPORT_DATASET%.%DOMAIN_BASE_TABLE%`
WHERE
-- Only include active registrations
-- Registrations that are active (not deleted) will have null deletionTime
-- because END_OF_TIME is an invalid timestamp in standardSQL
(SAFE_CAST(deletionTime AS STRING) IS NULL
OR deletionTime > CURRENT_TIMESTAMP)) AS domain
JOIN (
SELECT
__key__.name AS name,
emailAddress
FROM
`%PROJECT_ID%.%DATASTORE_EXPORT_DATASET%.%REGISTRAR_TABLE%`
WHERE
type = 'REAL') AS registrar
ON
domain.currentSponsorClientId = registrar.name)
ORDER BY
creationTime DESC