Run the Spec11 pipeline daily without sending emails

Add a sendSpec11Email parameter that allows us to only send the email on
one run per month. Next, we will compute the diffs between the daily runs
and send daily emails with those diffs.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=224404653
This commit is contained in:
jianglai 2018-12-06 14:10:25 -08:00
parent 3ef8cd692d
commit ec26e3a96a
20 changed files with 434 additions and 158 deletions

View file

@ -15,17 +15,14 @@
package google.registry.beam.spec11;
import static google.registry.beam.BeamUtils.getQueryFromFile;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.auth.oauth2.GoogleCredentials;
import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn;
import google.registry.config.CredentialModule.LocalCredentialJson;
import google.registry.config.RegistryConfig.Config;
import google.registry.util.Retrier;
import google.registry.util.SqlTemplate;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.time.LocalDate;
import java.time.YearMonth;
import javax.inject.Inject;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@ -61,13 +58,14 @@ import org.json.JSONObject;
public class Spec11Pipeline implements Serializable {
/**
* Returns the subdirectory spec11 reports reside in for a given yearMonth in yyyy-MM format.
* Returns the subdirectory spec11 reports reside in for a given local date in yyyy-MM-dd format.
*
* @see google.registry.beam.spec11.Spec11Pipeline
* @see google.registry.reporting.spec11.Spec11EmailUtils
*/
public static String getSpec11Subdirectory(String yearMonth) {
return String.format("icann/spec11/%s/SPEC11_MONTHLY_REPORT", yearMonth);
public static String getSpec11ReportFilePath(LocalDate localDate) {
YearMonth yearMonth = YearMonth.of(localDate.getYear(), localDate.getMonth());
return String.format("icann/spec11/%s/SPEC11_MONTHLY_REPORT_%s", yearMonth, localDate);
}
/** The JSON object field we put the registrar's e-mail address for Spec11 reports. */
@ -91,27 +89,24 @@ public class Spec11Pipeline implements Serializable {
@Config("reportingBucketUrl")
String reportingBucketUrl;
@Inject
Retrier retrier;
@Inject @LocalCredentialJson String credentialJson;
@Inject Retrier retrier;
@Inject
Spec11Pipeline() {}
/** Custom options for running the spec11 pipeline. */
interface Spec11PipelineOptions extends DataflowPipelineOptions {
/** Returns the yearMonth we're generating the report for, in yyyy-MM format. */
@Description("The yearMonth we generate the report for, in yyyy-MM format.")
ValueProvider<String> getYearMonth();
/** Returns the local date we're generating the report for, in yyyy-MM-dd format. */
@Description("The local date we generate the report for, in yyyy-MM-dd format.")
ValueProvider<String> getDate();
/**
* Sets the yearMonth we generate invoices for.
* Sets the local date we generate invoices for.
*
* <p>This is implicitly set when executing the Dataflow template, by specifying the "yearMonth"
* <p>This is implicitly set when executing the Dataflow template, by specifying the "date"
* parameter.
*/
void setYearMonth(ValueProvider<String> value);
void setDate(ValueProvider<String> value);
/** Returns the SafeBrowsing API key we use to evaluate subdomain health. */
@Description("The API key we use to access the SafeBrowsing API.")
@ -130,12 +125,6 @@ public class Spec11Pipeline implements Serializable {
public void deploy() {
// We can't store options as a member variable due to serialization concerns.
Spec11PipelineOptions options = PipelineOptionsFactory.as(Spec11PipelineOptions.class);
try {
options.setGcpCredential(
GoogleCredentials.fromStream(new ByteArrayInputStream(credentialJson.getBytes(UTF_8))));
} catch (IOException e) {
throw new RuntimeException("Cannot obtain local credential to deploy the spec11 pipeline", e);
}
options.setProject(projectId);
options.setRunner(DataflowRunner.class);
// This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it.
@ -162,7 +151,7 @@ public class Spec11Pipeline implements Serializable {
evaluateUrlHealth(
domains,
new EvaluateSafeBrowsingFn(options.getSafeBrowsingApiKey(), retrier),
options.getYearMonth());
options.getDate());
p.run();
}
@ -174,7 +163,7 @@ public class Spec11Pipeline implements Serializable {
void evaluateUrlHealth(
PCollection<Subdomain> domains,
EvaluateSafeBrowsingFn evaluateSafeBrowsingFn,
ValueProvider<String> yearMonthProvider) {
ValueProvider<String> dateProvider) {
domains
.apply("Run through SafeBrowsingAPI", ParDo.of(evaluateSafeBrowsingFn))
.apply(
@ -212,13 +201,13 @@ public class Spec11Pipeline implements Serializable {
TextIO.write()
.to(
NestedValueProvider.of(
yearMonthProvider,
yearMonth ->
dateProvider,
date ->
String.format(
"%s/%s",
reportingBucketUrl, getSpec11Subdirectory(yearMonth))))
reportingBucketUrl,
getSpec11ReportFilePath(LocalDate.parse(date)))))
.withoutSharding()
.withHeader("Map from registrar email to detected subdomain threats:"));
}
}