Migrate Spec11 pipeline to flex template (#1073)

* Migrate Spec11 pipeline to flex template

Unfortunately this PR has turned out to be much bigger than I initially
conceived. However this is no good way to separate it out because the
changes are intertwined. This PR includes 3 main changes:

1. Change the spec11 pipline to use Dataflow Flex Template.
2. Retire the use of the old JPA layer that relies on credential saved
   in KMS.
3. Some extensive refactoring to streamline the logic and improve test
   isolation.

* Fix job name and remove projectId from options

* Add parameter logs

* Set RegistryEnvironment

* Remove logging and modify safe browsing API key regex

* Rename a test method and rebase

* Remove unused Junit extension

* Specify job region
This commit is contained in:
Lai Jiang 2021-04-21 00:09:50 -04:00 committed by GitHub
parent 18dec7ff18
commit 8aea804841
24 changed files with 789 additions and 981 deletions

View file

@ -805,6 +805,10 @@ if (environment in ['alpha', 'crash']) {
mainClass: 'google.registry.beam.datastore.BulkDeleteDatastorePipeline',
metaData: 'google/registry/beam/bulk_delete_datastore_pipeline_metadata.json'
],
[
mainClass: 'google.registry.beam.spec11.Spec11Pipeline',
metaData: 'google/registry/beam/spec11_pipeline_metadata.json'
],
]
project.tasks.create("stage_beam_pipelines") {
doLast {

View file

@ -15,6 +15,7 @@
package google.registry.batch;
import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8;
import static google.registry.beam.BeamUtils.createJobName;
import static javax.servlet.http.HttpServletResponse.SC_FORBIDDEN;
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static javax.servlet.http.HttpServletResponse.SC_OK;
@ -30,9 +31,8 @@ import google.registry.config.RegistryConfig.Config;
import google.registry.request.Action;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import javax.inject.Inject;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
/**
* Wipes out all Cloud Datastore data in a Nomulus GCP environment.
@ -58,17 +58,20 @@ public class WipeoutDatastoreAction implements Runnable {
private final Response response;
private final Dataflow dataflow;
private final String stagingBucketUrl;
private final Clock clock;
@Inject
WipeoutDatastoreAction(
@Config("projectId") String projectId,
@Config("defaultJobRegion") String jobRegion,
@Config("beamStagingBucketUrl") String stagingBucketUrl,
Clock clock,
Response response,
Dataflow dataflow) {
this.projectId = projectId;
this.jobRegion = jobRegion;
this.stagingBucketUrl = stagingBucketUrl;
this.clock = clock;
this.response = response;
this.dataflow = dataflow;
}
@ -86,10 +89,7 @@ public class WipeoutDatastoreAction implements Runnable {
try {
LaunchFlexTemplateParameter parameters =
new LaunchFlexTemplateParameter()
// Job name must be unique and in [-a-z0-9].
.setJobName(
"bulk-delete-datastore-"
+ DateTime.now(DateTimeZone.UTC).toString("yyyy-MM-dd'T'HH-mm-ss'Z'"))
.setJobName(createJobName("bulk-delete-datastore-", clock))
.setContainerSpecGcsPath(
String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME))
.setParameters(ImmutableMap.of("kindsToDelete", "*"));

View file

@ -14,10 +14,14 @@
package google.registry.beam;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Resources;
import google.registry.util.Clock;
import google.registry.util.ResourceUtils;
import java.util.regex.Pattern;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
@ -41,8 +45,7 @@ public class BeamUtils {
ImmutableList<String> fieldNames, SchemaAndRecord schemaAndRecord) {
GenericRecord record = schemaAndRecord.getRecord();
ImmutableList<String> nullFields =
fieldNames
.stream()
fieldNames.stream()
.filter(fieldName -> record.get(fieldName) == null)
.collect(ImmutableList.toImmutableList());
String missingFieldList = Joiner.on(", ").join(nullFields);
@ -61,4 +64,19 @@ public class BeamUtils {
public static String getQueryFromFile(Class<?> clazz, String filename) {
return ResourceUtils.readResourceUtf8(Resources.getResource(clazz, "sql/" + filename));
}
/** Creates a beam job name and validates that it conforms to the requirements. */
public static String createJobName(String prefix, Clock clock) {
// Flex template job name must be unique and consists of only characters [-a-z0-9], starting
// with a letter and ending with a letter or number. So we replace the "T" and "Z" in ISO 8601
// with lowercase letters.
String jobName =
String.format("%s-%s", prefix, clock.nowUtc().toString("yyyy-MM-dd't'HH-mm-ss'z'"));
checkArgument(
Pattern.compile("^[a-z][-a-z0-9]*[a-z0-9]*").matcher(jobName).matches(),
"The job name %s is illegal, it consists of only characters [-a-z0-9], "
+ "starting with a letter and ending with a letter or number,",
jobName);
return jobName;
}
}

View file

@ -20,12 +20,9 @@ import static com.google.common.base.Preconditions.checkState;
import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp;
import static google.registry.beam.initsql.BackupPaths.getExportFilePatterns;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.setJpaTm;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import static google.registry.util.DateTimeUtils.isBeforeOrAt;
import static java.util.Comparator.comparing;
import static org.apache.beam.sdk.values.TypeDescriptors.integers;
import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
@ -36,14 +33,10 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import com.googlecode.objectify.Key;
import google.registry.backup.AppEngineEnvironment;
import google.registry.backup.CommitLogImports;
import google.registry.backup.VersionedEntity;
import google.registry.model.domain.DomainBase;
import google.registry.model.ofy.ObjectifyService;
import google.registry.model.reporting.HistoryEntry;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.schema.replay.DatastoreAndSqlEntity;
import google.registry.schema.replay.SqlEntity;
import google.registry.tools.LevelDbLogReader;
@ -53,7 +46,6 @@ import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@ -62,18 +54,14 @@ import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ProcessFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@ -268,80 +256,6 @@ public final class Transforms {
.iterator()));
}
/**
* Returns a {@link PTransform} that writes a {@link PCollection} of {@link VersionedEntity}s to a
* SQL database. and outputs an empty {@code PCollection<Void>}. This allows other operations to
* {@link org.apache.beam.sdk.transforms.Wait wait} for the completion of this transform.
*
* <p>Errors are handled according to the pipeline runner's default policy. As part of a one-time
* job, we will not add features unless proven necessary.
*
* @param transformId a unique ID for an instance of the returned transform
* @param maxWriters the max number of concurrent writes to SQL, which also determines the max
* number of connection pools created
* @param batchSize the number of entities to write in each operation
* @param jpaSupplier supplier of a {@link JpaTransactionManager}
*/
public static PTransform<PCollection<VersionedEntity>, PCollection<Void>> writeToSql(
String transformId,
int maxWriters,
int batchSize,
SerializableSupplier<JpaTransactionManager> jpaSupplier) {
return writeToSql(
transformId,
maxWriters,
batchSize,
jpaSupplier,
Transforms::convertVersionedEntityToSqlEntity,
TypeDescriptor.of(VersionedEntity.class));
}
/**
* Returns a {@link PTransform} that writes a {@link PCollection} of entities to a SQL database.
* and outputs an empty {@code PCollection<Void>}. This allows other operations to {@link
* org.apache.beam.sdk.transforms.Wait wait} for the completion of this transform.
*
* <p>The converter and type descriptor are generics so that we can convert any type of entity to
* an object to be placed in SQL.
*
* <p>Errors are handled according to the pipeline runner's default policy. As part of a one-time
* job, we will not add features unless proven necessary.
*
* @param transformId a unique ID for an instance of the returned transform
* @param maxWriters the max number of concurrent writes to SQL, which also determines the max
* number of connection pools created
* @param batchSize the number of entities to write in each operation
* @param jpaSupplier supplier of a {@link JpaTransactionManager}
* @param jpaConverter the function that converts the input object to a JPA entity
* @param objectDescriptor the type descriptor of the input object
*/
public static <T> PTransform<PCollection<T>, PCollection<Void>> writeToSql(
String transformId,
int maxWriters,
int batchSize,
SerializableSupplier<JpaTransactionManager> jpaSupplier,
SerializableFunction<T, Object> jpaConverter,
TypeDescriptor<T> objectDescriptor) {
return new PTransform<PCollection<T>, PCollection<Void>>() {
@Override
public PCollection<Void> expand(PCollection<T> input) {
return input
.apply(
"Shard data for " + transformId,
MapElements.into(kvs(integers(), objectDescriptor))
.via(ve -> KV.of(ThreadLocalRandom.current().nextInt(maxWriters), ve)))
.apply("Batch output by shard " + transformId, GroupIntoBatches.ofSize(batchSize))
.apply(
"Write in batch for " + transformId,
ParDo.of(new SqlBatchWriter<T>(transformId, jpaSupplier, jpaConverter)));
}
};
}
private static Key toOfyKey(Object ofyEntity) {
return Key.create(ofyEntity);
}
private static boolean isMigratable(Entity entity) {
if (entity.getKind().equals("HistoryEntry")) {
// DOMAIN_APPLICATION_CREATE is deprecated type and should not be migrated.
@ -458,93 +372,6 @@ public final class Transforms {
}
}
/**
* Writes a batch of entities to a SQL database.
*
* <p>Note that an arbitrary number of instances of this class may be created and freed in
* arbitrary order in a single JVM. Due to the tech debt that forced us to use a static variable
* to hold the {@code JpaTransactionManager} instance, we must ensure that JpaTransactionManager
* is not changed or torn down while being used by some instance.
*/
private static class SqlBatchWriter<T> extends DoFn<KV<Integer, Iterable<T>>, Void> {
private static int instanceCount = 0;
private static JpaTransactionManager originalJpa;
private Counter counter;
private final SerializableSupplier<JpaTransactionManager> jpaSupplier;
private final SerializableFunction<T, Object> jpaConverter;
SqlBatchWriter(
String type,
SerializableSupplier<JpaTransactionManager> jpaSupplier,
SerializableFunction<T, Object> jpaConverter) {
counter = Metrics.counter("SQL_WRITE", type);
this.jpaSupplier = jpaSupplier;
this.jpaConverter = jpaConverter;
}
@Setup
public void setup() {
try (AppEngineEnvironment env = new AppEngineEnvironment()) {
ObjectifyService.initOfy();
}
synchronized (SqlBatchWriter.class) {
if (instanceCount == 0) {
originalJpa = jpaTm();
setJpaTm(jpaSupplier);
}
instanceCount++;
}
}
@Teardown
public void teardown() {
synchronized (SqlBatchWriter.class) {
instanceCount--;
if (instanceCount == 0) {
jpaTm().teardown();
setJpaTm(() -> originalJpa);
}
}
}
@ProcessElement
public void processElement(@Element KV<Integer, Iterable<T>> kv) {
try (AppEngineEnvironment env = new AppEngineEnvironment()) {
ImmutableList<Object> ofyEntities =
Streams.stream(kv.getValue())
.map(this.jpaConverter::apply)
// TODO(b/177340730): post migration delete the line below.
.filter(Objects::nonNull)
.collect(ImmutableList.toImmutableList());
try {
jpaTm().transact(() -> jpaTm().putAll(ofyEntities));
counter.inc(ofyEntities.size());
} catch (RuntimeException e) {
processSingly(ofyEntities);
}
}
}
/**
* Writes entities in a failed batch one by one to identify the first bad entity and throws a
* {@link RuntimeException} on it.
*/
private void processSingly(ImmutableList<Object> ofyEntities) {
for (Object ofyEntity : ofyEntities) {
try {
jpaTm().transact(() -> jpaTm().put(ofyEntity));
counter.inc();
} catch (RuntimeException e) {
throw new RuntimeException(toOfyKey(ofyEntity).toString(), e);
}
}
}
}
/**
* Removes BillingEvents, {@link google.registry.model.poll.PollMessage PollMessages} and {@link
* google.registry.model.host.HostResource} from a {@link DomainBase}. These are circular foreign

View file

@ -14,7 +14,6 @@
package google.registry.beam.spec11;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.http.HttpStatus.SC_OK;
@ -30,7 +29,6 @@ import java.net.URISyntaxException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.KV;
@ -73,7 +71,7 @@ public class SafeBrowsingTransforms {
private static final int BATCH_SIZE = 490;
/** Provides the SafeBrowsing API key at runtime. */
private final ValueProvider<String> apiKeyProvider;
private final String apiKey;
/**
* Maps a subdomain's {@code fullyQualifiedDomainName} to its corresponding {@link Subdomain} to
@ -93,20 +91,18 @@ public class SafeBrowsingTransforms {
private final Retrier retrier;
/**
* Constructs a {@link EvaluateSafeBrowsingFn} that gets its API key from the given provider.
* Constructs a {@link EvaluateSafeBrowsingFn} with a given API key.
*
* <p>We need to dual-cast the closeableHttpClientSupplier lambda because all {@code DoFn}
* member variables need to be serializable. The (Supplier & Serializable) dual cast is safe
* because class methods are generally serializable, especially a static function such as {@link
* HttpClients#createDefault()}.
*
* @param apiKeyProvider provides the SafeBrowsing API key from {@code KMS} at runtime
*/
@SuppressWarnings("unchecked")
EvaluateSafeBrowsingFn(ValueProvider<String> apiKeyProvider, Retrier retrier) {
this.apiKeyProvider = apiKeyProvider;
EvaluateSafeBrowsingFn(String apiKey, Retrier retrier) {
this.apiKey = apiKey;
this.retrier = retrier;
this.closeableHttpClientSupplier = (Supplier & Serializable) HttpClients::createDefault;
closeableHttpClientSupplier = (Supplier & Serializable) HttpClients::createDefault;
}
/**
@ -117,12 +113,10 @@ public class SafeBrowsingTransforms {
*/
@VisibleForTesting
EvaluateSafeBrowsingFn(
ValueProvider<String> apiKeyProvider,
Retrier retrier,
Supplier<CloseableHttpClient> clientSupplier) {
this.apiKeyProvider = apiKeyProvider;
String apiKey, Retrier retrier, Supplier<CloseableHttpClient> clientSupplier) {
this.apiKey = apiKey;
this.retrier = retrier;
this.closeableHttpClientSupplier = clientSupplier;
closeableHttpClientSupplier = clientSupplier;
}
/** Evaluates any buffered {@link Subdomain} objects upon completing the bundle. */
@ -159,7 +153,7 @@ public class SafeBrowsingTransforms {
try {
URIBuilder uriBuilder = new URIBuilder(SAFE_BROWSING_URL);
// Add the API key param
uriBuilder.addParameter("key", apiKeyProvider.get());
uriBuilder.addParameter("key", apiKey);
HttpPost httpPost = new HttpPost(uriBuilder.build());
httpPost.addHeader(HTTP.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
@ -175,7 +169,7 @@ public class SafeBrowsingTransforms {
}
},
IOException.class);
} catch (URISyntaxException | JSONException e) {
} catch (URISyntaxException | JSONException e) {
// Fail the pipeline on a parsing exception- this indicates the API likely changed.
throw new RuntimeException("Caught parsing exception, failing pipeline.", e);
} finally {
@ -239,7 +233,9 @@ public class SafeBrowsingTransforms {
String url = match.getJSONObject("threat").getString("url");
Subdomain subdomain = subdomainBuffer.get(url);
resultBuilder.add(
KV.of(subdomain, ThreatMatch.create(match, subdomain.domainName())));
KV.of(
subdomain,
ThreatMatch.create(match.getString("threatType"), subdomain.domainName())));
}
}
}

View file

@ -17,32 +17,28 @@ package google.registry.beam.spec11;
import static com.google.common.base.Preconditions.checkArgument;
import static google.registry.beam.BeamUtils.getQueryFromFile;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import google.registry.beam.initsql.Transforms;
import google.registry.beam.initsql.Transforms.SerializableSupplier;
import dagger.Component;
import dagger.Module;
import dagger.Provides;
import google.registry.beam.common.RegistryJpaIO;
import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn;
import google.registry.config.CredentialModule.LocalCredential;
import google.registry.config.RegistryConfig.Config;
import google.registry.config.RegistryConfig.ConfigModule;
import google.registry.model.reporting.Spec11ThreatMatch;
import google.registry.model.reporting.Spec11ThreatMatch.ThreatType;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.util.GoogleCredentialsBundle;
import google.registry.util.Retrier;
import google.registry.util.SqlTemplate;
import google.registry.util.UtilsModule;
import java.io.Serializable;
import javax.inject.Inject;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import javax.inject.Singleton;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
@ -58,21 +54,20 @@ import org.json.JSONException;
import org.json.JSONObject;
/**
* Definition of a Dataflow pipeline template, which generates a given month's spec11 report.
* Definition of a Dataflow Flex template, which generates a given month's spec11 report.
*
* <p>To stage this template on GCS, run the {@link
* google.registry.tools.DeploySpec11PipelineCommand} Nomulus command.
* <p>To stage this template locally, run the {@code stage_beam_pipeline.sh} shell script.
*
* <p>Then, you can run the staged template via the API client library, gCloud or a raw REST call.
*
* @see <a href="https://cloud.google.com/dataflow/docs/templates/overview">Dataflow Templates</a>
* @see <a href="https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates">Using
* Flex Templates</a>
*/
public class Spec11Pipeline implements Serializable {
/**
* Returns the subdirectory spec11 reports reside in for a given local date in yyyy-MM-dd format.
*
* @see google.registry.beam.spec11.Spec11Pipeline
* @see google.registry.reporting.spec11.Spec11EmailUtils
*/
public static String getSpec11ReportFilePath(LocalDate localDate) {
@ -87,84 +82,35 @@ public class Spec11Pipeline implements Serializable {
/** The JSON object field into which we put the threat match array for Spec11 reports. */
public static final String THREAT_MATCHES_FIELD = "threatMatches";
private final String projectId;
private final String beamJobRegion;
private final String beamStagingUrl;
private final String spec11TemplateUrl;
private final String reportingBucketUrl;
private final GoogleCredentials googleCredentials;
private final Retrier retrier;
private final SerializableSupplier<JpaTransactionManager> jpaSupplierFactory;
private final Spec11PipelineOptions options;
private final EvaluateSafeBrowsingFn safeBrowsingFn;
private final Pipeline pipeline;
@Inject
public Spec11Pipeline(
@Config("projectId") String projectId,
@Config("defaultJobRegion") String beamJobRegion,
@Config("beamStagingUrl") String beamStagingUrl,
@Config("spec11TemplateUrl") String spec11TemplateUrl,
@Config("reportingBucketUrl") String reportingBucketUrl,
SerializableSupplier<JpaTransactionManager> jpaSupplierFactory,
@LocalCredential GoogleCredentialsBundle googleCredentialsBundle,
Retrier retrier) {
this.projectId = projectId;
this.beamJobRegion = beamJobRegion;
this.beamStagingUrl = beamStagingUrl;
this.spec11TemplateUrl = spec11TemplateUrl;
this.reportingBucketUrl = reportingBucketUrl;
this.jpaSupplierFactory = jpaSupplierFactory;
this.googleCredentials = googleCredentialsBundle.getGoogleCredentials();
this.retrier = retrier;
@VisibleForTesting
Spec11Pipeline(
Spec11PipelineOptions options, EvaluateSafeBrowsingFn safeBrowsingFn, Pipeline pipeline) {
this.options = options;
this.safeBrowsingFn = safeBrowsingFn;
this.pipeline = pipeline;
}
/** Custom options for running the spec11 pipeline. */
public interface Spec11PipelineOptions extends DataflowPipelineOptions {
/** Returns the local date we're generating the report for, in yyyy-MM-dd format. */
@Description("The local date we generate the report for, in yyyy-MM-dd format.")
ValueProvider<String> getDate();
/**
* Sets the local date we generate invoices for.
*
* <p>This is implicitly set when executing the Dataflow template, by specifying the "date"
* parameter.
*/
void setDate(ValueProvider<String> value);
/** Returns the SafeBrowsing API key we use to evaluate subdomain health. */
@Description("The API key we use to access the SafeBrowsing API.")
ValueProvider<String> getSafeBrowsingApiKey();
/**
* Sets the SafeBrowsing API key we use.
*
* <p>This is implicitly set when executing the Dataflow template, by specifying the
* "safeBrowsingApiKey" parameter.
*/
void setSafeBrowsingApiKey(ValueProvider<String> value);
Spec11Pipeline(Spec11PipelineOptions options, EvaluateSafeBrowsingFn safeBrowsingFn) {
this(options, safeBrowsingFn, Pipeline.create(options));
}
/** Deploys the spec11 pipeline as a template on GCS. */
public void deploy() {
// We can't store options as a member variable due to serialization concerns.
Spec11PipelineOptions options = PipelineOptionsFactory.as(Spec11PipelineOptions.class);
options.setProject(projectId);
options.setRegion(beamJobRegion);
options.setRunner(DataflowRunner.class);
// This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it.
options.setTemplateLocation(spec11TemplateUrl);
options.setStagingLocation(beamStagingUrl);
// This credential is used when Dataflow deploys the template to GCS in target GCP project.
// So, make sure the credential has write permission to GCS in that project.
options.setGcpCredential(googleCredentials);
PipelineResult run() {
setupPipeline();
return pipeline.run();
}
Pipeline p = Pipeline.create(options);
void setupPipeline() {
PCollection<Subdomain> domains =
p.apply(
pipeline.apply(
"Read active domains from BigQuery",
BigQueryIO.read(Subdomain::parseFromRecord)
.fromQuery(
SqlTemplate.create(getQueryFromFile(Spec11Pipeline.class, "subdomains.sql"))
.put("PROJECT_ID", projectId)
.put("PROJECT_ID", options.getProject())
.put("DATASTORE_EXPORT_DATASET", "latest_datastore_export")
.put("REGISTRAR_TABLE", "Registrar")
.put("DOMAIN_BASE_TABLE", "DomainBase")
@ -174,48 +120,40 @@ public class Spec11Pipeline implements Serializable {
.withoutValidation()
.withTemplateCompatibility());
evaluateUrlHealth(
domains,
new EvaluateSafeBrowsingFn(options.getSafeBrowsingApiKey(), retrier),
options.getDate());
p.run();
PCollection<KV<Subdomain, ThreatMatch>> threatMatches =
domains.apply("Run through SafeBrowsing API", ParDo.of(safeBrowsingFn));
saveToSql(threatMatches, options);
saveToGcs(threatMatches, options);
}
/**
* Evaluate each {@link Subdomain} URL via the SafeBrowsing API.
*
* <p>This is factored out to facilitate testing.
*/
void evaluateUrlHealth(
PCollection<Subdomain> domains,
EvaluateSafeBrowsingFn evaluateSafeBrowsingFn,
ValueProvider<String> dateProvider) {
PCollection<KV<Subdomain, ThreatMatch>> subdomainsSql =
domains.apply("Run through SafeBrowsing API", ParDo.of(evaluateSafeBrowsingFn));
TypeDescriptor<KV<Subdomain, ThreatMatch>> descriptor =
new TypeDescriptor<KV<Subdomain, ThreatMatch>>() {};
subdomainsSql.apply(
Transforms.writeToSql(
"Spec11ThreatMatch",
4,
4,
jpaSupplierFactory,
(kv) -> {
Subdomain subdomain = kv.getKey();
return new Spec11ThreatMatch.Builder()
.setThreatTypes(ImmutableSet.of(ThreatType.valueOf(kv.getValue().threatType())))
.setCheckDate(LocalDate.parse(dateProvider.get(), ISODateTimeFormat.date()))
.setDomainName(subdomain.domainName())
.setDomainRepoId(subdomain.domainRepoId())
.setRegistrarId(subdomain.registrarId())
.build();
},
descriptor));
static void saveToSql(
PCollection<KV<Subdomain, ThreatMatch>> threatMatches, Spec11PipelineOptions options) {
String transformId = "Spec11 Threat Matches";
LocalDate date = LocalDate.parse(options.getDate(), ISODateTimeFormat.date());
threatMatches.apply(
"Write to Sql: " + transformId,
RegistryJpaIO.<KV<Subdomain, ThreatMatch>>write()
.withName(transformId)
.withBatchSize(options.getSqlWriteBatchSize())
.withShards(options.getSqlWriteShards())
.withJpaConverter(
(kv) -> {
Subdomain subdomain = kv.getKey();
return new Spec11ThreatMatch.Builder()
.setThreatTypes(
ImmutableSet.of(ThreatType.valueOf(kv.getValue().threatType())))
.setCheckDate(date)
.setDomainName(subdomain.domainName())
.setDomainRepoId(subdomain.domainRepoId())
.setRegistrarId(subdomain.registrarId())
.build();
}));
}
/* Store ThreatMatch objects in JSON. */
PCollection<KV<Subdomain, ThreatMatch>> subdomainsJson =
domains.apply("Run through SafeBrowsingAPI", ParDo.of(evaluateSafeBrowsingFn));
subdomainsJson
static void saveToGcs(
PCollection<KV<Subdomain, ThreatMatch>> threatMatches, Spec11PipelineOptions options) {
threatMatches
.apply(
"Map registrar ID to email/ThreatMatch pair",
MapElements.into(
@ -260,17 +198,54 @@ public class Spec11Pipeline implements Serializable {
"Output to text file",
TextIO.write()
.to(
NestedValueProvider.of(
dateProvider,
date ->
String.format(
"%s/%s",
reportingBucketUrl,
getSpec11ReportFilePath(LocalDate.parse(date)))))
String.format(
"%s/%s",
options.getReportingBucketUrl(),
getSpec11ReportFilePath(LocalDate.parse(options.getDate()))))
.withoutSharding()
.withHeader("Map from registrar email / name to detected subdomain threats:"));
}
public static void main(String[] args) {
PipelineOptionsFactory.register(Spec11PipelineOptions.class);
DaggerSpec11Pipeline_Spec11PipelineComponent.builder()
.spec11PipelineModule(new Spec11PipelineModule(args))
.build()
.spec11Pipeline()
.run();
}
@Module
static class Spec11PipelineModule {
private final String[] args;
Spec11PipelineModule(String[] args) {
this.args = args;
}
@Provides
Spec11PipelineOptions provideOptions() {
return PipelineOptionsFactory.fromArgs(args).withValidation().as(Spec11PipelineOptions.class);
}
@Provides
EvaluateSafeBrowsingFn provideSafeBrowsingFn(Spec11PipelineOptions options, Retrier retrier) {
return new EvaluateSafeBrowsingFn(options.getSafeBrowsingApiKey(), retrier);
}
@Provides
Spec11Pipeline providePipeline(
Spec11PipelineOptions options, EvaluateSafeBrowsingFn safeBrowsingFn) {
return new Spec11Pipeline(options, safeBrowsingFn);
}
}
@Component(modules = {Spec11PipelineModule.class, UtilsModule.class, ConfigModule.class})
@Singleton
interface Spec11PipelineComponent {
Spec11Pipeline spec11Pipeline();
}
@AutoValue
abstract static class EmailAndThreatMatch implements Serializable {

View file

@ -0,0 +1,37 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.spec11;
import google.registry.beam.common.RegistryPipelineOptions;
import org.apache.beam.sdk.options.Description;
/** Custom options for running the spec11 pipeline. */
public interface Spec11PipelineOptions extends RegistryPipelineOptions {
@Description("The local date we generate the report for, in yyyy-MM-dd format.")
String getDate();
void setDate(String value);
@Description("The API key we use to access the SafeBrowsing API.")
String getSafeBrowsingApiKey();
void setSafeBrowsingApiKey(String value);
@Description("The GCS bucket URL for Spec11 reports to be uploaded.")
String getReportingBucketUrl();
void setReportingBucketUrl(String value);
}

View file

@ -15,6 +15,7 @@
package google.registry.beam.spec11;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import java.io.Serializable;
import org.json.JSONException;
import org.json.JSONObject;
@ -31,16 +32,9 @@ public abstract class ThreatMatch implements Serializable {
/** Returns the fully qualified domain name [SLD].[TLD] of the matched threat. */
public abstract String fullyQualifiedDomainName();
/**
* Constructs a {@link ThreatMatch} by parsing a {@code SafeBrowsing API} response {@link
* JSONObject}.
*
* @throws JSONException when encountering parse errors in the response format
*/
static ThreatMatch create(JSONObject threatMatchJSON, String fullyQualifiedDomainName)
throws JSONException {
return new AutoValue_ThreatMatch(
threatMatchJSON.getString(THREAT_TYPE_FIELD), fullyQualifiedDomainName);
@VisibleForTesting
static ThreatMatch create(String threatType, String fullyQualifiedDomainName) {
return new AutoValue_ThreatMatch(threatType, fullyQualifiedDomainName);
}
/** Returns a {@link JSONObject} representing a subset of this object's data. */

View file

@ -14,6 +14,7 @@
package google.registry.reporting.spec11;
import static google.registry.beam.BeamUtils.createJobName;
import static google.registry.reporting.ReportingModule.PARAM_DATE;
import static google.registry.reporting.ReportingUtils.enqueueBeamReportingTask;
import static google.registry.request.Action.Method.POST;
@ -21,19 +22,21 @@ import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.LaunchTemplateParameters;
import com.google.api.services.dataflow.model.LaunchTemplateResponse;
import com.google.api.services.dataflow.model.RuntimeEnvironment;
import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
import com.google.common.collect.ImmutableMap;
import com.google.common.flogger.FluentLogger;
import com.google.common.net.MediaType;
import google.registry.config.RegistryConfig.Config;
import google.registry.config.RegistryEnvironment;
import google.registry.keyring.api.KeyModule.Key;
import google.registry.reporting.ReportingModule;
import google.registry.request.Action;
import google.registry.request.Parameter;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import java.io.IOException;
import java.util.Map;
import javax.inject.Inject;
@ -55,55 +58,68 @@ public class GenerateSpec11ReportAction implements Runnable {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
static final String PATH = "/_dr/task/generateSpec11";
static final String PIPELINE_NAME = "spec11_pipeline";
private final String projectId;
private final String beamBucketUrl;
private final String spec11TemplateUrl;
private final String jobZone;
private final String jobRegion;
private final String stagingBucketUrl;
private final String reportingBucketUrl;
private final String apiKey;
private final LocalDate date;
private final Clock clock;
private final Response response;
private final Dataflow dataflow;
@Inject
GenerateSpec11ReportAction(
@Config("projectId") String projectId,
@Config("apacheBeamBucketUrl") String beamBucketUrl,
@Config("spec11TemplateUrl") String spec11TemplateUrl,
@Config("defaultJobZone") String jobZone,
@Config("defaultJobRegion") String jobRegion,
@Config("beamStagingBucketUrl") String stagingBucketUrl,
@Config("reportingBucketUrl") String reportingBucketUrl,
@Key("safeBrowsingAPIKey") String apiKey,
@Parameter(PARAM_DATE) LocalDate date,
Clock clock,
Response response,
Dataflow dataflow) {
this.projectId = projectId;
this.beamBucketUrl = beamBucketUrl;
this.spec11TemplateUrl = spec11TemplateUrl;
this.jobZone = jobZone;
this.jobRegion = jobRegion;
this.stagingBucketUrl = stagingBucketUrl;
this.reportingBucketUrl = reportingBucketUrl;
this.apiKey = apiKey;
this.date = date;
this.clock = clock;
this.response = response;
this.dataflow = dataflow;
}
@Override
public void run() {
response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
try {
LaunchTemplateParameters params =
new LaunchTemplateParameters()
.setJobName(String.format("spec11_%s", date))
.setEnvironment(
new RuntimeEnvironment()
.setZone(jobZone)
.setTempLocation(beamBucketUrl + "/temporary"))
LaunchFlexTemplateParameter parameter =
new LaunchFlexTemplateParameter()
.setJobName(createJobName("spec11", clock))
.setContainerSpecGcsPath(
String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME))
.setParameters(
ImmutableMap.of(
"safeBrowsingApiKey", apiKey, ReportingModule.PARAM_DATE, date.toString()));
LaunchTemplateResponse launchResponse =
"safeBrowsingApiKey",
apiKey,
ReportingModule.PARAM_DATE,
date.toString(),
"reportingBucketUrl",
reportingBucketUrl,
"registryEnvironment",
RegistryEnvironment.get().name()));
LaunchFlexTemplateResponse launchResponse =
dataflow
.projects()
.templates()
.launch(projectId, params)
.setGcsPath(spec11TemplateUrl)
.locations()
.flexTemplates()
.launch(
projectId,
jobRegion,
new LaunchFlexTemplateRequest().setLaunchParameter(parameter))
.execute();
Map<String, String> beamTaskParameters =
ImmutableMap.of(
@ -116,12 +132,10 @@ public class GenerateSpec11ReportAction implements Runnable {
} catch (IOException e) {
logger.atWarning().withCause(e).log("Template Launch failed");
response.setStatus(SC_INTERNAL_SERVER_ERROR);
response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
response.setPayload(String.format("Template launch failed: %s", e.getMessage()));
return;
}
response.setStatus(SC_OK);
response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
response.setPayload("Launched Spec11 dataflow template.");
}
}

View file

@ -69,6 +69,7 @@ public class PublishSpec11ReportAction implements Runnable {
private static final String JOB_FAILED = "JOB_STATE_FAILED";
private final String projectId;
private final String jobRegion;
private final String registryName;
private final String jobId;
private final Spec11EmailUtils emailUtils;
@ -80,6 +81,7 @@ public class PublishSpec11ReportAction implements Runnable {
@Inject
PublishSpec11ReportAction(
@Config("projectId") String projectId,
@Config("defaultJobRegion") String jobRegion,
@Config("registryName") String registryName,
@Parameter(ReportingModule.PARAM_JOB_ID) String jobId,
Spec11EmailUtils emailUtils,
@ -88,6 +90,7 @@ public class PublishSpec11ReportAction implements Runnable {
Response response,
@Parameter(PARAM_DATE) LocalDate date) {
this.projectId = projectId;
this.jobRegion = jobRegion;
this.registryName = registryName;
this.jobId = jobId;
this.emailUtils = emailUtils;
@ -101,7 +104,7 @@ public class PublishSpec11ReportAction implements Runnable {
public void run() {
try {
logger.atInfo().log("Starting publish job.");
Job job = dataflow.projects().jobs().get(projectId, jobId).execute();
Job job = dataflow.projects().locations().jobs().get(projectId, jobRegion, jobId).execute();
String state = job.getCurrentState();
switch (state) {
case JOB_DONE:

View file

@ -1,87 +0,0 @@
// Copyright 2018 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.tools;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import google.registry.beam.initsql.BeamJpaModule.JpaTransactionManagerComponent;
import google.registry.beam.initsql.JpaSupplierFactory;
import google.registry.beam.spec11.Spec11Pipeline;
import google.registry.config.CredentialModule.LocalCredential;
import google.registry.config.RegistryConfig.Config;
import google.registry.util.GoogleCredentialsBundle;
import google.registry.util.Retrier;
import javax.annotation.Nullable;
import javax.inject.Inject;
/** Nomulus command that deploys the {@link Spec11Pipeline} template. */
@Parameters(commandDescription = "Deploy the Spec11 pipeline to GCS.")
public class DeploySpec11PipelineCommand implements Command {
@Inject
@Config("projectId")
String projectId;
@Inject
@Config("defaultJobRegion")
String beamJobRegion;
@Parameter(
names = {"-p", "--project"},
description = "Cloud KMS project ID",
required = true)
String cloudKmsProjectId;
@Inject
@Config("beamStagingUrl")
String beamStagingUrl;
@Inject
@Config("spec11TemplateUrl")
String spec11TemplateUrl;
@Inject
@Config("reportingBucketUrl")
String reportingBucketUrl;
@Inject @LocalCredential GoogleCredentialsBundle googleCredentialsBundle;
@Inject Retrier retrier;
@Inject
@Nullable
@Config("sqlAccessInfoFile")
String sqlAccessInfoFile;
@Override
public void run() {
JpaSupplierFactory jpaSupplierFactory =
new JpaSupplierFactory(
sqlAccessInfoFile,
cloudKmsProjectId,
JpaTransactionManagerComponent::cloudSqlJpaTransactionManager);
Spec11Pipeline pipeline =
new Spec11Pipeline(
projectId,
beamJobRegion,
beamStagingUrl,
spec11TemplateUrl,
reportingBucketUrl,
jpaSupplierFactory,
googleCredentialsBundle,
retrier);
pipeline.deploy();
}
}

View file

@ -63,7 +63,6 @@ public final class RegistryTool {
.put("delete_reserved_list", DeleteReservedListCommand.class)
.put("delete_tld", DeleteTldCommand.class)
.put("deploy_invoicing_pipeline", DeployInvoicingPipelineCommand.class)
.put("deploy_spec11_pipeline", DeploySpec11PipelineCommand.class)
.put("encrypt_escrow_deposit", EncryptEscrowDepositCommand.class)
.put("execute_epp", ExecuteEppCommand.class)
.put("generate_allocation_tokens", GenerateAllocationTokensCommand.class)

View file

@ -109,8 +109,6 @@ interface RegistryToolComponent {
void inject(DeployInvoicingPipelineCommand command);
void inject(DeploySpec11PipelineCommand command);
void inject(EncryptEscrowDepositCommand command);
void inject(GenerateAllocationTokensCommand command);

View file

@ -0,0 +1,66 @@
{
"name": "Spec11 Report Generation",
"description": "An Apache Beam batch pipeline that reads from a Datastore export and generate Spec11 report for the month, saving it to both SQL and as a JSON file on GCS.",
"parameters": [
{
"name": "registryEnvironment",
"label": "The Registry environment.",
"helpText": "The Registry environment, required if environment-specific initialization is needed on worker VMs.",
"is_optional": true,
"regexes": [
"^[0-9A-Z_]+$"
]
},
{
"name": "isolationOverride",
"label": "The desired SQL transaction isolation level.",
"helpText": "The desired SQL transaction isolation level.",
"is_optional": true,
"regexes": [
"^[0-9A-Z_]+$"
]
},
{
"name": "sqlWriteBatchSize",
"label": "SQL write batch size.",
"helpText": "The number of entities to write to the SQL database in one operation.",
"is_optional": true,
"regexes": [
"^[1-9][0-9]*$"
]
},
{
"name": "sqlWriteShards",
"label": "Number of output shards to create when writing to SQL.",
"helpText": "Number of shards to create out of the data before writing to the SQL database. Please refer to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.",
"is_optional": true,
"regexes": [
"^[1-9][0-9]*$"
]
},
{
"name": "date",
"label": "The date when the pipeline runs",
"helpText": "The date then the threat scan is performed, in yyyy-MM-dd format.",
"regexes": [
"^2[0-9]{3}-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01])$"
]
},
{
"name": "safeBrowsingApiKey",
"label": "API Key used to identify the project with Safe Browsing",
"helpText": "The earliest CommitLogs to load, in ISO8601 format.",
"regexes": [
"^[0-9a-zA-Z_]+[\\n]?$"
]
},
{
"name": "reportingBucketUrl",
"label": "Spec11 report upload dir.",
"helpText": "The root directory of the report to upload.",
"regexes": [
"^gs:\\/\\/[^\\n\\r]+$"
]
}
]
}

View file

@ -31,6 +31,7 @@ import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
import google.registry.testing.FakeClock;
import google.registry.testing.FakeResponse;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -50,7 +51,8 @@ class WipeOutDatastoreActionTest {
private LaunchFlexTemplateResponse launchResponse =
new LaunchFlexTemplateResponse().setJob(new Job());
private FakeResponse response = new FakeResponse();
private final FakeClock clock = new FakeClock();
private final FakeResponse response = new FakeResponse();
@BeforeEach
void beforeEach() throws Exception {
@ -67,7 +69,7 @@ class WipeOutDatastoreActionTest {
void run_projectNotAllowed() {
WipeoutDatastoreAction action =
new WipeoutDatastoreAction(
"domain-registry", "us-central1", "gs://some-bucket", response, dataflow);
"domain-registry", "us-central1", "gs://some-bucket", clock, response, dataflow);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_FORBIDDEN);
verifyNoInteractions(dataflow);
@ -77,7 +79,7 @@ class WipeOutDatastoreActionTest {
void run_projectAllowed() throws Exception {
WipeoutDatastoreAction action =
new WipeoutDatastoreAction(
"domain-registry-qa", "us-central1", "gs://some-bucket", response, dataflow);
"domain-registry-qa", "us-central1", "gs://some-bucket", clock, response, dataflow);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
verify(launch, times(1)).execute();
@ -89,7 +91,7 @@ class WipeOutDatastoreActionTest {
when(launch.execute()).thenThrow(new RuntimeException());
WipeoutDatastoreAction action =
new WipeoutDatastoreAction(
"domain-registry-qa", "us-central1", "gs://some-bucket", response, dataflow);
"domain-registry-qa", "us-central1", "gs://some-bucket", clock, response, dataflow);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR);
verify(launch, times(1)).execute();

View file

@ -1,131 +0,0 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.initsql;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import com.google.appengine.api.datastore.Entity;
import com.google.common.collect.ImmutableList;
import google.registry.backup.VersionedEntity;
import google.registry.beam.TestPipelineExtension;
import google.registry.model.ImmutableObject;
import google.registry.model.contact.ContactResource;
import google.registry.model.ofy.Ofy;
import google.registry.model.registrar.Registrar;
import google.registry.persistence.transaction.JpaTestRules;
import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension;
import google.registry.testing.AppEngineExtension;
import google.registry.testing.DatabaseHelper;
import google.registry.testing.DatastoreEntityExtension;
import google.registry.testing.FakeClock;
import google.registry.testing.InjectExtension;
import java.io.Serializable;
import java.nio.file.Path;
import java.util.stream.Collectors;
import org.apache.beam.sdk.transforms.Create;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
/** Unit test for {@link Transforms#writeToSql}. */
class WriteToSqlTest implements Serializable {
private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
private final FakeClock fakeClock = new FakeClock(START_TIME);
@RegisterExtension
@Order(Order.DEFAULT - 1)
final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension();
@RegisterExtension final transient InjectExtension injectRule = new InjectExtension();
@RegisterExtension
final transient JpaIntegrationTestExtension database =
new JpaTestRules.Builder().withClock(fakeClock).buildIntegrationTestRule();
@SuppressWarnings("WeakerAccess")
@TempDir
transient Path tmpDir;
@RegisterExtension
final transient TestPipelineExtension testPipeline =
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
// Must not be transient!
@RegisterExtension
@Order(Order.DEFAULT + 1)
final BeamJpaExtension beamJpaExtension =
new BeamJpaExtension(() -> tmpDir.resolve("credential.dat"), database.getDatabase());
private ImmutableList<Entity> contacts;
@BeforeEach
void beforeEach() throws Exception {
try (BackupTestStore store = new BackupTestStore(fakeClock)) {
injectRule.setStaticField(Ofy.class, "clock", fakeClock);
// Required for contacts created below.
Registrar ofyRegistrar = AppEngineExtension.makeRegistrar2();
store.insertOrUpdate(ofyRegistrar);
jpaTm().transact(() -> jpaTm().put(store.loadAsOfyEntity(ofyRegistrar)));
ImmutableList.Builder<Entity> builder = new ImmutableList.Builder<>();
for (int i = 0; i < 3; i++) {
ContactResource contact = DatabaseHelper.newContactResource("contact_" + i);
store.insertOrUpdate(contact);
builder.add(store.loadAsDatastoreEntity(contact));
}
contacts = builder.build();
}
}
@Test
void writeToSql_twoWriters() {
testPipeline
.apply(
Create.of(
contacts.stream()
.map(InitSqlTestUtils::entityToBytes)
.map(bytes -> VersionedEntity.from(0L, bytes))
.collect(Collectors.toList())))
.apply(
Transforms.writeToSql(
"ContactResource",
2,
4,
() ->
DaggerBeamJpaModule_JpaTransactionManagerComponent.builder()
.beamJpaModule(beamJpaExtension.getBeamJpaModule())
.build()
.localDbJpaTransactionManager()));
testPipeline.run().waitUntilFinish();
ImmutableList<?> sqlContacts = jpaTm().transact(() -> jpaTm().loadAllOf(ContactResource.class));
assertThat(sqlContacts)
.comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp"))
.containsExactlyElementsIn(
contacts.stream()
.map(InitSqlTestUtils::datastoreToOfyEntity)
.map(ImmutableObject.class::cast)
.collect(ImmutableList.toImmutableList()));
}
}

View file

@ -0,0 +1,245 @@
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.spec11;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CharStreams;
import google.registry.beam.TestPipelineExtension;
import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn;
import google.registry.testing.FakeClock;
import google.registry.testing.FakeSleeper;
import google.registry.util.Retrier;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.http.ProtocolVersion;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.message.BasicStatusLine;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;
/** Unit tests for {@link SafeBrowsingTransforms}. */
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
class SafeBrowsingTransformsTest {
private static final ImmutableMap<String, String> THREAT_MAP =
ImmutableMap.of(
"111.com",
"MALWARE",
"party-night.net",
"SOCIAL_ENGINEERING",
"bitcoin.bank",
"POTENTIALLY_HARMFUL_APPLICATION",
"no-email.com",
"THREAT_TYPE_UNSPECIFIED",
"anti-anti-anti-virus.dev",
"UNWANTED_SOFTWARE");
private static final String REPO_ID = "repoId";
private static final String REGISTRAR_ID = "registrarID";
private static final String REGISTRAR_EMAIL = "email@registrar.net";
private static ImmutableMap<Subdomain, ThreatMatch> THREAT_MATCH_MAP;
private final CloseableHttpClient mockHttpClient =
mock(CloseableHttpClient.class, withSettings().serializable());
private final EvaluateSafeBrowsingFn safeBrowsingFn =
new EvaluateSafeBrowsingFn(
"API_KEY",
new Retrier(new FakeSleeper(new FakeClock()), 1),
Suppliers.ofInstance(mockHttpClient));
@RegisterExtension
final TestPipelineExtension pipeline =
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
private static Subdomain createSubdomain(String url) {
return Subdomain.create(url, REPO_ID, REGISTRAR_ID, REGISTRAR_EMAIL);
}
private KV<Subdomain, ThreatMatch> getKv(String url) {
Subdomain subdomain = createSubdomain(url);
return KV.of(subdomain, THREAT_MATCH_MAP.get(subdomain));
}
@BeforeAll
static void beforeAll() {
ImmutableMap.Builder<Subdomain, ThreatMatch> builder = new ImmutableMap.Builder<>();
THREAT_MAP
.entrySet()
.forEach(
kv ->
builder.put(
createSubdomain(kv.getKey()), ThreatMatch.create(kv.getValue(), kv.getKey())));
THREAT_MATCH_MAP = builder.build();
}
@BeforeEach
void beforeEach() throws Exception {
when(mockHttpClient.execute(any(HttpPost.class))).thenAnswer(new HttpResponder());
}
@Test
void testSuccess_someBadDomains() throws Exception {
ImmutableList<Subdomain> subdomains =
ImmutableList.of(
createSubdomain("111.com"),
createSubdomain("hooli.com"),
createSubdomain("party-night.net"),
createSubdomain("anti-anti-anti-virus.dev"),
createSubdomain("no-email.com"));
PCollection<KV<Subdomain, ThreatMatch>> threats =
pipeline
.apply(Create.of(subdomains).withCoder(SerializableCoder.of(Subdomain.class)))
.apply(ParDo.of(safeBrowsingFn));
PAssert.that(threats)
.containsInAnyOrder(
getKv("111.com"),
getKv("party-night.net"),
getKv("anti-anti-anti-virus.dev"),
getKv("no-email.com"));
pipeline.run().waitUntilFinish();
}
@Test
void testSuccess_noBadDomains() throws Exception {
ImmutableList<Subdomain> subdomains =
ImmutableList.of(
createSubdomain("hello_kitty.dev"),
createSubdomain("555.com"),
createSubdomain("goodboy.net"));
PCollection<KV<Subdomain, ThreatMatch>> threats =
pipeline
.apply(Create.of(subdomains).withCoder(SerializableCoder.of(Subdomain.class)))
.apply(ParDo.of(safeBrowsingFn));
PAssert.that(threats).empty();
pipeline.run().waitUntilFinish();
}
/**
* A serializable {@link Answer} that returns a mock HTTP response based on the HTTP request's
* content.
*/
private static class HttpResponder implements Answer<CloseableHttpResponse>, Serializable {
@Override
public CloseableHttpResponse answer(InvocationOnMock invocation) throws Throwable {
return getMockResponse(
CharStreams.toString(
new InputStreamReader(
((HttpPost) invocation.getArguments()[0]).getEntity().getContent(), UTF_8)));
}
}
/**
* Returns a {@link CloseableHttpResponse} containing either positive (threat found) or negative
* (no threat) API examples based on the request data.
*/
private static CloseableHttpResponse getMockResponse(String request) throws JSONException {
// Determine which bad URLs are in the request (if any)
ImmutableList<String> badUrls =
THREAT_MAP.keySet().stream()
.filter(request::contains)
.collect(ImmutableList.toImmutableList());
CloseableHttpResponse httpResponse =
mock(CloseableHttpResponse.class, withSettings().serializable());
when(httpResponse.getStatusLine())
.thenReturn(new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 200, "Done"));
when(httpResponse.getEntity()).thenReturn(new FakeHttpEntity(getAPIResponse(badUrls)));
return httpResponse;
}
/**
* Returns the expected API response for a list of bad URLs.
*
* <p>If there are no badUrls in the list, this returns the empty JSON string "{}".
*/
private static String getAPIResponse(ImmutableList<String> badUrls) throws JSONException {
JSONObject response = new JSONObject();
if (badUrls.isEmpty()) {
return response.toString();
}
// Create a threatMatch for each badUrl
JSONArray matches = new JSONArray();
for (String badUrl : badUrls) {
matches.put(
new JSONObject()
.put("threatType", THREAT_MAP.get(badUrl))
.put("threat", new JSONObject().put("url", badUrl)));
}
response.put("matches", matches);
return response.toString();
}
/** A serializable HttpEntity fake that returns {@link String} content. */
private static class FakeHttpEntity extends BasicHttpEntity implements Serializable {
private static final long serialVersionUID = 105738294571L;
private String content;
private void writeObject(ObjectOutputStream oos) throws IOException {
oos.defaultWriteObject();
}
/**
* Sets the {@link FakeHttpEntity} content upon deserialization.
*
* <p>This allows us to use {@link #getContent()} as-is, fully emulating the behavior of {@link
* BasicHttpEntity} regardless of serialization.
*/
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
ois.defaultReadObject();
super.setContent(new ByteArrayInputStream(this.content.getBytes(UTF_8)));
}
FakeHttpEntity(String content) {
this.content = content;
super.setContent(new ByteArrayInputStream(this.content.getBytes(UTF_8)));
}
}
}

View file

@ -14,361 +14,183 @@
package google.registry.beam.spec11;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.CharStreams;
import com.google.common.collect.Streams;
import com.google.common.truth.Correspondence;
import com.google.common.truth.Correspondence.BinaryPredicate;
import google.registry.beam.TestPipelineExtension;
import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn;
import google.registry.model.reporting.Spec11ThreatMatch;
import google.registry.model.reporting.Spec11ThreatMatch.ThreatType;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.model.reporting.Spec11ThreatMatchDao;
import google.registry.persistence.transaction.JpaTestRules;
import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension;
import google.registry.testing.DatastoreEntityExtension;
import google.registry.testing.FakeClock;
import google.registry.testing.FakeSleeper;
import google.registry.util.GoogleCredentialsBundle;
import google.registry.util.ResourceUtils;
import google.registry.util.Retrier;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.function.Supplier;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.http.ProtocolVersion;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.message.BasicStatusLine;
import org.joda.time.DateTime;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.LocalDate;
import org.joda.time.format.ISODateTimeFormat;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;
/** Unit tests for {@link Spec11Pipeline}. */
/**
* Unit tests for {@link Spec11Pipeline}.
*
* <p>Unfortunately there is no emulator for BigQuery like that for Datastore or App Engine.
* Therefore we cannot fully test the pipeline but only test the two separate sink IO functions,
* assuming that date is sourcede correctly the {@code BigQueryIO}.
*/
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
class Spec11PipelineTest {
private static class SaveNewThreatMatchAnswer implements Answer<Void>, Serializable {
@Override
public Void answer(InvocationOnMock invocation) {
Runnable runnable = invocation.getArgument(0, Runnable.class);
runnable.run();
return null;
}
}
private static final String DATE = "2020-01-27";
private static final String SAFE_BROWSING_API_KEY = "api-key";
private static final String REPORTING_BUCKET_URL = "reporting_bucket";
private static PipelineOptions pipelineOptions;
private static final ImmutableList<Subdomain> SUBDOMAINS =
ImmutableList.of(
Subdomain.create("111.com", "123456789-COM", "hello-registrar", "email@hello.net"),
Subdomain.create("party-night.net", "2244AABBC-NET", "kitty-registrar", "contact@kit.ty"),
Subdomain.create("bitcoin.bank", "1C3D5E7F9-BANK", "hello-registrar", "email@hello.net"),
Subdomain.create("no-email.com", "2A4BA9BBC-COM", "kitty-registrar", "contact@kit.ty"),
Subdomain.create(
"anti-anti-anti-virus.dev", "555666888-DEV", "cool-registrar", "cool@aid.net"));
@Mock(serializable = true)
private static JpaTransactionManager mockJpaTm;
private static final ImmutableList<ThreatMatch> THREAT_MATCHES =
ImmutableList.of(
ThreatMatch.create("MALWARE", "111.com"),
ThreatMatch.create("SOCIAL_ENGINEERING", "party-night.net"),
ThreatMatch.create("POTENTIALLY_HARMFUL_APPLICATION", "bitcoin.bank"),
ThreatMatch.create("THREAT_TYPE_UNSPECIFIED", "no-eamil.com"),
ThreatMatch.create("UNWANTED_SOFTWARE", "anti-anti-anti-virus.dev"));
@BeforeAll
static void beforeAll() {
pipelineOptions = PipelineOptionsFactory.create();
pipelineOptions.setRunner(DirectRunner.class);
}
// This extension is only needed because Spec11ThreatMatch uses Ofy to generate the ID. Can be
// removed after the SQL migration.
@RegisterExtension
@Order(Order.DEFAULT - 1)
final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension();
@TempDir Path tmpDir;
@RegisterExtension
final transient TestPipelineExtension testPipeline =
TestPipelineExtension.fromOptions(pipelineOptions);
final TestPipelineExtension pipeline =
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
@SuppressWarnings("WeakerAccess")
@TempDir
Path tmpDir;
@RegisterExtension
final JpaIntegrationTestExtension database =
new JpaTestRules.Builder().withClock(new FakeClock()).buildIntegrationTestRule();
private final Retrier retrier =
new Retrier(new FakeSleeper(new FakeClock(DateTime.parse("2019-07-15TZ"))), 1);
private Spec11Pipeline spec11Pipeline;
private final Spec11PipelineOptions options =
PipelineOptionsFactory.create().as(Spec11PipelineOptions.class);
private File reportingBucketUrl;
private PCollection<KV<Subdomain, ThreatMatch>> threatMatches;
@BeforeEach
void beforeEach() throws IOException {
String beamTempFolder =
Files.createDirectory(tmpDir.resolve("beam_temp")).toAbsolutePath().toString();
spec11Pipeline =
new Spec11Pipeline(
"test-project",
"region",
beamTempFolder + "/staging",
beamTempFolder + "/templates/invoicing",
tmpDir.toAbsolutePath().toString(),
() -> mockJpaTm,
GoogleCredentialsBundle.create(GoogleCredentials.create(null)),
retrier);
}
private static final ImmutableList<String> BAD_DOMAINS =
ImmutableList.of(
"111.com", "222.com", "444.com", "no-email.com", "testThreatMatchToSqlBad.com");
private ImmutableList<Subdomain> getInputDomainsJson() {
ImmutableList.Builder<Subdomain> subdomainsBuilder = new ImmutableList.Builder<>();
// Put in at least 2 batches worth (x > 490) to guarantee multiple executions.
// Put in half for theRegistrar and half for someRegistrar
for (int i = 0; i < 255; i++) {
subdomainsBuilder.add(
Subdomain.create(
String.format("%s.com", i), "theDomain", "theRegistrar", "fake@theRegistrar.com"));
}
for (int i = 255; i < 510; i++) {
subdomainsBuilder.add(
Subdomain.create(
String.format("%s.com", i), "someDomain", "someRegistrar", "fake@someRegistrar.com"));
}
subdomainsBuilder.add(Subdomain.create("no-email.com", "fakeDomain", "noEmailRegistrar", ""));
return subdomainsBuilder.build();
}
/**
* Tests the end-to-end Spec11 pipeline with mocked out API calls.
*
* <p>We suppress the (Serializable & Supplier) dual-casted lambda warnings because the supplier
* produces an explicitly serializable mock, which is safe to cast.
*/
@Test
@SuppressWarnings("unchecked")
void testEndToEndPipeline_generatesExpectedFiles() throws Exception {
// Establish mocks for testing
ImmutableList<Subdomain> inputRows = getInputDomainsJson();
CloseableHttpClient mockHttpClient =
mock(CloseableHttpClient.class, withSettings().serializable());
// Return a mock HttpResponse that returns a JSON response based on the request.
when(mockHttpClient.execute(any(HttpPost.class))).thenAnswer(new HttpResponder());
EvaluateSafeBrowsingFn evalFn =
new EvaluateSafeBrowsingFn(
StaticValueProvider.of("apikey"),
new Retrier(new FakeSleeper(new FakeClock()), 3),
(Serializable & Supplier) () -> mockHttpClient);
// Apply input and evaluation transforms
PCollection<Subdomain> input = testPipeline.apply(Create.of(inputRows));
spec11Pipeline.evaluateUrlHealth(input, evalFn, StaticValueProvider.of("2018-06-01"));
testPipeline.run();
// Verify header and 4 threat matches for 3 registrars are found
ImmutableList<String> generatedReport = resultFileContents();
assertThat(generatedReport).hasSize(4);
assertThat(generatedReport.get(0))
.isEqualTo("Map from registrar email / name to detected subdomain threats:");
// The output file can put the registrar emails and bad URLs in any order.
// We cannot rely on the JSON toString to sort because the keys are not always in the same
// order, so we must rely on length even though that's not ideal.
ImmutableList<String> sortedLines =
ImmutableList.sortedCopyOf(
Comparator.comparingInt(String::length), generatedReport.subList(1, 4));
JSONObject noEmailRegistrarJSON = new JSONObject(sortedLines.get(0));
assertThat(noEmailRegistrarJSON.get("registrarEmailAddress")).isEqualTo("");
assertThat(noEmailRegistrarJSON.get("registrarClientId")).isEqualTo("noEmailRegistrar");
assertThat(noEmailRegistrarJSON.has("threatMatches")).isTrue();
JSONArray noEmailThreatMatch = noEmailRegistrarJSON.getJSONArray("threatMatches");
assertThat(noEmailThreatMatch.length()).isEqualTo(1);
assertThat(noEmailThreatMatch.getJSONObject(0).get("fullyQualifiedDomainName"))
.isEqualTo("no-email.com");
assertThat(noEmailThreatMatch.getJSONObject(0).get("threatType")).isEqualTo("MALWARE");
JSONObject someRegistrarJSON = new JSONObject(sortedLines.get(1));
assertThat(someRegistrarJSON.get("registrarEmailAddress")).isEqualTo("fake@someRegistrar.com");
assertThat(someRegistrarJSON.get("registrarClientId")).isEqualTo("someRegistrar");
assertThat(someRegistrarJSON.has("threatMatches")).isTrue();
JSONArray someThreatMatch = someRegistrarJSON.getJSONArray("threatMatches");
assertThat(someThreatMatch.length()).isEqualTo(1);
assertThat(someThreatMatch.getJSONObject(0).get("fullyQualifiedDomainName"))
.isEqualTo("444.com");
assertThat(someThreatMatch.getJSONObject(0).get("threatType")).isEqualTo("MALWARE");
// theRegistrar has two ThreatMatches, we have to parse it explicitly
JSONObject theRegistrarJSON = new JSONObject(sortedLines.get(2));
assertThat(theRegistrarJSON.get("registrarEmailAddress")).isEqualTo("fake@theRegistrar.com");
assertThat(theRegistrarJSON.get("registrarClientId")).isEqualTo("theRegistrar");
assertThat(theRegistrarJSON.has("threatMatches")).isTrue();
JSONArray theThreatMatches = theRegistrarJSON.getJSONArray("threatMatches");
assertThat(theThreatMatches.length()).isEqualTo(2);
ImmutableList<String> threatMatchStrings =
ImmutableList.of(
theThreatMatches.getJSONObject(0).toString(),
theThreatMatches.getJSONObject(1).toString());
assertThat(threatMatchStrings)
.containsExactly(
new JSONObject()
.put("fullyQualifiedDomainName", "111.com")
.put("threatType", "MALWARE")
.toString(),
new JSONObject()
.put("fullyQualifiedDomainName", "222.com")
.put("threatType", "MALWARE")
.toString());
void beforeEach() throws Exception {
reportingBucketUrl = Files.createDirectory(tmpDir.resolve(REPORTING_BUCKET_URL)).toFile();
options.setDate(DATE);
options.setSafeBrowsingApiKey(SAFE_BROWSING_API_KEY);
options.setReportingBucketUrl(reportingBucketUrl.getAbsolutePath());
threatMatches =
pipeline.apply(
Create.of(
Streams.zip(SUBDOMAINS.stream(), THREAT_MATCHES.stream(), KV::of)
.collect(toImmutableList()))
.withCoder(
KvCoder.of(
SerializableCoder.of(Subdomain.class),
SerializableCoder.of(ThreatMatch.class))));
}
@Test
@SuppressWarnings("unchecked")
public void testSpec11ThreatMatchToSql() throws Exception {
doAnswer(new SaveNewThreatMatchAnswer()).when(mockJpaTm).transact(any(Runnable.class));
// Create one bad and one good Subdomain to test with evaluateUrlHealth. Only the bad one should
// be detected and persisted.
Subdomain badDomain =
Subdomain.create(
"testThreatMatchToSqlBad.com", "theDomain", "theRegistrar", "fake@theRegistrar.com");
Subdomain goodDomain =
Subdomain.create(
"testThreatMatchToSqlGood.com",
"someDomain",
"someRegistrar",
"fake@someRegistrar.com");
// Establish a mock HttpResponse that returns a JSON response based on the request.
CloseableHttpClient mockHttpClient =
mock(CloseableHttpClient.class, withSettings().serializable());
when(mockHttpClient.execute(any(HttpPost.class))).thenAnswer(new HttpResponder());
EvaluateSafeBrowsingFn evalFn =
new EvaluateSafeBrowsingFn(
StaticValueProvider.of("apikey"),
new Retrier(new FakeSleeper(new FakeClock()), 3),
(Serializable & Supplier) () -> mockHttpClient);
// Apply input and evaluation transforms
PCollection<Subdomain> input = testPipeline.apply(Create.of(badDomain, goodDomain));
spec11Pipeline.evaluateUrlHealth(input, evalFn, StaticValueProvider.of("2020-06-10"));
testPipeline.run();
// Verify that the expected threat created from the bad Subdomain and the persisted
// Spec11TThreatMatch are equal.
Spec11ThreatMatch expected =
new Spec11ThreatMatch()
.asBuilder()
.setThreatTypes(ImmutableSet.of(ThreatType.MALWARE))
.setCheckDate(LocalDate.parse("2020-06-10", ISODateTimeFormat.date()))
.setDomainName(badDomain.domainName())
.setDomainRepoId(badDomain.domainRepoId())
.setRegistrarId(badDomain.registrarId())
.build();
verify(mockJpaTm).transact(any(Runnable.class));
verify(mockJpaTm).putAll(ImmutableList.of(expected));
void testSuccess_saveToSql() {
ImmutableSet<Spec11ThreatMatch> sqlThreatMatches =
ImmutableSet.of(
new Spec11ThreatMatch.Builder()
.setDomainName("111.com")
.setDomainRepoId("123456789-COM")
.setRegistrarId("hello-registrar")
.setCheckDate(new LocalDate(2020, 1, 27))
.setThreatTypes(ImmutableSet.of(ThreatType.MALWARE))
.build(),
new Spec11ThreatMatch.Builder()
.setDomainName("party-night.net")
.setDomainRepoId("2244AABBC-NET")
.setRegistrarId("kitty-registrar")
.setCheckDate(new LocalDate(2020, 1, 27))
.setThreatTypes(ImmutableSet.of(ThreatType.SOCIAL_ENGINEERING))
.build(),
new Spec11ThreatMatch.Builder()
.setDomainName("bitcoin.bank")
.setDomainRepoId("1C3D5E7F9-BANK")
.setRegistrarId("hello-registrar")
.setCheckDate(new LocalDate(2020, 1, 27))
.setThreatTypes(ImmutableSet.of(ThreatType.POTENTIALLY_HARMFUL_APPLICATION))
.build(),
new Spec11ThreatMatch.Builder()
.setDomainName("no-email.com")
.setDomainRepoId("2A4BA9BBC-COM")
.setRegistrarId("kitty-registrar")
.setCheckDate(new LocalDate(2020, 1, 27))
.setThreatTypes(ImmutableSet.of(ThreatType.THREAT_TYPE_UNSPECIFIED))
.build(),
new Spec11ThreatMatch.Builder()
.setDomainName("anti-anti-anti-virus.dev")
.setDomainRepoId("555666888-DEV")
.setRegistrarId("cool-registrar")
.setCheckDate(new LocalDate(2020, 1, 27))
.setThreatTypes(ImmutableSet.of(ThreatType.UNWANTED_SOFTWARE))
.build());
Spec11Pipeline.saveToSql(threatMatches, options);
pipeline.run().waitUntilFinish();
assertThat(
jpaTm()
.transact(
() ->
Spec11ThreatMatchDao.loadEntriesByDate(
jpaTm(), new LocalDate(2020, 1, 27))))
.comparingElementsUsing(immutableObjectCorrespondence("id"))
.containsExactlyElementsIn(sqlThreatMatches);
}
/**
* A serializable {@link Answer} that returns a mock HTTP response based on the HTTP request's
* content.
*/
private static class HttpResponder implements Answer<CloseableHttpResponse>, Serializable {
@Override
public CloseableHttpResponse answer(InvocationOnMock invocation) throws Throwable {
return getMockResponse(
CharStreams.toString(
new InputStreamReader(
((HttpPost) invocation.getArguments()[0]).getEntity().getContent(), UTF_8)));
}
}
/**
* Returns a {@link CloseableHttpResponse} containing either positive (threat found) or negative
* (no threat) API examples based on the request data.
*/
private static CloseableHttpResponse getMockResponse(String request) throws JSONException {
// Determine which bad URLs are in the request (if any)
ImmutableList<String> badUrls =
BAD_DOMAINS.stream().filter(request::contains).collect(ImmutableList.toImmutableList());
CloseableHttpResponse httpResponse =
mock(CloseableHttpResponse.class, withSettings().serializable());
when(httpResponse.getStatusLine())
.thenReturn(new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 200, "Done"));
when(httpResponse.getEntity()).thenReturn(new FakeHttpEntity(getAPIResponse(badUrls)));
return httpResponse;
}
/**
* Returns the expected API response for a list of bad URLs.
*
* <p>If there are no badUrls in the list, this returns the empty JSON string "{}".
*/
private static String getAPIResponse(ImmutableList<String> badUrls) throws JSONException {
JSONObject response = new JSONObject();
if (badUrls.isEmpty()) {
return response.toString();
}
// Create a threatMatch for each badUrl
JSONArray matches = new JSONArray();
for (String badUrl : badUrls) {
matches.put(
new JSONObject()
.put("threatType", "MALWARE")
.put("platformType", "WINDOWS")
.put("threatEntryType", "URL")
.put("threat", new JSONObject().put("url", badUrl))
.put("cacheDuration", "300.000s"));
}
response.put("matches", matches);
return response.toString();
}
/** A serializable HttpEntity fake that returns {@link String} content. */
private static class FakeHttpEntity extends BasicHttpEntity implements Serializable {
private static final long serialVersionUID = 105738294571L;
private String content;
private void writeObject(ObjectOutputStream oos) throws IOException {
oos.defaultWriteObject();
}
/**
* Sets the {@link FakeHttpEntity} content upon deserialization.
*
* <p>This allows us to use {@link #getContent()} as-is, fully emulating the behavior of {@link
* BasicHttpEntity} regardless of serialization.
*/
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
ois.defaultReadObject();
super.setContent(new ByteArrayInputStream(this.content.getBytes(UTF_8)));
}
FakeHttpEntity(String content) {
this.content = content;
super.setContent(new ByteArrayInputStream(this.content.getBytes(UTF_8)));
}
@Test
void testSuccess_saveToGcs() throws Exception {
ImmutableList<String> expectedFileContents =
ImmutableList.copyOf(
ResourceUtils.readResourceUtf8(this.getClass(), "test_output.txt").split("\n"));
Spec11Pipeline.saveToGcs(threatMatches, options);
pipeline.run().waitUntilFinish();
ImmutableList<String> resultFileContents = resultFileContents();
assertThat(resultFileContents.size()).isEqualTo(expectedFileContents.size());
assertThat(resultFileContents.get(0)).isEqualTo(expectedFileContents.get(0));
assertThat(resultFileContents.subList(1, resultFileContents.size()))
.comparingElementsUsing(
Correspondence.from(
new ThreatMatchJsonPredicate(), "has fields with unordered threatTypes equal to"))
.containsExactlyElementsIn(expectedFileContents.subList(1, expectedFileContents.size()));
}
/** Returns the text contents of a file under the beamBucket/results directory. */
@ -376,9 +198,34 @@ class Spec11PipelineTest {
File resultFile =
new File(
String.format(
"%s/icann/spec11/2018-06/SPEC11_MONTHLY_REPORT_2018-06-01",
tmpDir.toAbsolutePath().toString()));
"%s/icann/spec11/2020-01/SPEC11_MONTHLY_REPORT_2020-01-27",
reportingBucketUrl.getAbsolutePath().toString()));
return ImmutableList.copyOf(
ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n"));
}
private static class ThreatMatchJsonPredicate implements BinaryPredicate<String, String> {
private static final String THREAT_MATCHES = "threatMatches";
@Override
public boolean apply(@Nullable String actual, @Nullable String expected) {
JSONObject actualJson = new JSONObject(actual);
JSONObject expectedJson = new JSONObject(expected);
if (!actualJson.keySet().equals(expectedJson.keySet())) {
return false;
}
for (String key : actualJson.keySet()) {
if (key.equals(THREAT_MATCHES)) {
if (ImmutableSet.copyOf(actualJson.getJSONArray(key))
.equals(ImmutableSet.copyOf(expectedJson.getJSONArray(key)))) {
return false;
}
} else if (!actualJson.get(key).equals(expectedJson.get(key))) {
return false;
}
}
;
return true;
}
}
}

View file

@ -15,95 +15,102 @@
package google.registry.reporting.spec11;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued;
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static org.apache.http.HttpStatus.SC_OK;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.Dataflow.Projects;
import com.google.api.services.dataflow.Dataflow.Projects.Templates;
import com.google.api.services.dataflow.Dataflow.Projects.Templates.Launch;
import com.google.api.services.dataflow.Dataflow.Projects.Locations;
import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates;
import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates.Launch;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.LaunchTemplateParameters;
import com.google.api.services.dataflow.model.LaunchTemplateResponse;
import com.google.api.services.dataflow.model.RuntimeEnvironment;
import com.google.common.collect.ImmutableMap;
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
import com.google.common.net.MediaType;
import google.registry.testing.AppEngineExtension;
import google.registry.testing.FakeClock;
import google.registry.testing.FakeResponse;
import google.registry.testing.TaskQueueHelper.TaskMatcher;
import java.io.IOException;
import org.joda.time.LocalDate;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
/** Unit tests for {@link google.registry.reporting.spec11.GenerateSpec11ReportAction}. */
/** Unit tests for {@link GenerateSpec11ReportAction}. */
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
class GenerateSpec11ReportActionTest {
@RegisterExtension
final AppEngineExtension appEngine = AppEngineExtension.builder().withTaskQueue().build();
private FakeResponse response;
private Dataflow dataflow;
private Projects dataflowProjects;
private Templates dataflowTemplates;
private Launch dataflowLaunch;
private FakeResponse response = new FakeResponse();
private Dataflow dataflow = mock(Dataflow.class);
private Projects projects = mock(Projects.class);
private Locations locations = mock(Locations.class);
private FlexTemplates templates = mock(FlexTemplates.class);
private Launch launch = mock(Launch.class);
private LaunchFlexTemplateResponse launchResponse =
new LaunchFlexTemplateResponse().setJob(new Job().setId("jobid"));
private final FakeClock clock = new FakeClock(DateTime.parse("2018-06-11T12:23:56Z"));
private GenerateSpec11ReportAction action;
@BeforeEach
void beforeEach() throws IOException {
response = new FakeResponse();
dataflow = mock(Dataflow.class);
// Establish the Dataflow API call chain
dataflow = mock(Dataflow.class);
dataflowProjects = mock(Dataflow.Projects.class);
dataflowTemplates = mock(Templates.class);
dataflowLaunch = mock(Launch.class);
LaunchTemplateResponse launchTemplateResponse = new LaunchTemplateResponse();
// Ultimately we get back this job response with a given id.
launchTemplateResponse.setJob(new Job().setId("jobid"));
when(dataflow.projects()).thenReturn(dataflowProjects);
when(dataflowProjects.templates()).thenReturn(dataflowTemplates);
when(dataflowTemplates.launch(any(String.class), any(LaunchTemplateParameters.class)))
.thenReturn(dataflowLaunch);
when(dataflowLaunch.setGcsPath(any(String.class))).thenReturn(dataflowLaunch);
when(dataflowLaunch.execute()).thenReturn(launchTemplateResponse);
when(dataflow.projects()).thenReturn(projects);
when(projects.locations()).thenReturn(locations);
when(locations.flexTemplates()).thenReturn(templates);
when(templates.launch(anyString(), anyString(), any(LaunchFlexTemplateRequest.class)))
.thenReturn(launch);
when(launch.execute()).thenReturn(launchResponse);
}
@Test
void testLaunch_success() throws IOException {
void testFailure_dataflowFailure() throws IOException {
action =
new GenerateSpec11ReportAction(
"test",
"gs://my-bucket-beam",
"gs://template",
"test-project",
"us-east1-c",
"gs://staging-project/staging-bucket/",
"gs://reporting-project/reporting-bucket/",
"api_key/a",
new LocalDate(2018, 6, 11),
clock.nowUtc().toLocalDate(),
clock,
response,
dataflow);
when(launch.execute()).thenThrow(new IOException("Dataflow failure"));
action.run();
assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR);
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).contains("Dataflow failure");
assertNoTasksEnqueued("beam-reporting");
}
@Test
void testSuccess() throws IOException {
action =
new GenerateSpec11ReportAction(
"test-project",
"us-east1-c",
"gs://staging-project/staging-bucket/",
"gs://reporting-project/reporting-bucket/",
"api_key/a",
clock.nowUtc().toLocalDate(),
clock,
response,
dataflow);
action.run();
LaunchTemplateParameters expectedLaunchTemplateParameters =
new LaunchTemplateParameters()
.setJobName("spec11_2018-06-11")
.setEnvironment(
new RuntimeEnvironment()
.setZone("us-east1-c")
.setTempLocation("gs://my-bucket-beam/temporary"))
.setParameters(
ImmutableMap.of("safeBrowsingApiKey", "api_key/a", "date", "2018-06-11"));
verify(dataflowTemplates).launch("test", expectedLaunchTemplateParameters);
verify(dataflowLaunch).setGcsPath("gs://template");
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("Launched Spec11 dataflow template.");
TaskMatcher matcher =
new TaskMatcher()
.url("/_dr/task/publishSpec11")

View file

@ -29,8 +29,9 @@ import static org.mockito.Mockito.when;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.Dataflow.Projects;
import com.google.api.services.dataflow.Dataflow.Projects.Jobs;
import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get;
import com.google.api.services.dataflow.Dataflow.Projects.Locations;
import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs;
import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Get;
import com.google.api.services.dataflow.model.Job;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@ -51,6 +52,7 @@ class PublishSpec11ReportActionTest {
private Dataflow dataflow;
private Projects projects;
private Locations locations;
private Jobs jobs;
private Get get;
private Spec11EmailUtils emailUtils;
@ -64,11 +66,13 @@ class PublishSpec11ReportActionTest {
void beforeEach() throws Exception {
dataflow = mock(Dataflow.class);
projects = mock(Projects.class);
locations = mock(Locations.class);
jobs = mock(Jobs.class);
get = mock(Get.class);
when(dataflow.projects()).thenReturn(projects);
when(projects.jobs()).thenReturn(jobs);
when(jobs.get("test-project", "12345")).thenReturn(get);
when(projects.locations()).thenReturn(locations);
when(locations.jobs()).thenReturn(jobs);
when(jobs.get("test-project", "test-region", "12345")).thenReturn(get);
expectedJob = new Job();
when(get.execute()).thenReturn(expectedJob);
emailUtils = mock(Spec11EmailUtils.class);
@ -78,6 +82,7 @@ class PublishSpec11ReportActionTest {
publishAction =
new PublishSpec11ReportAction(
"test-project",
"test-region",
"Super Cool Registry",
"12345",
emailUtils,
@ -95,6 +100,7 @@ class PublishSpec11ReportActionTest {
publishAction =
new PublishSpec11ReportAction(
"test-project",
"test-region",
"Super Cool Registry",
"12345",
emailUtils,

View file

@ -0,0 +1,4 @@
Map from registrar email / name to detected subdomain threats:
{"threatMatches":[{"threatType":"UNWANTED_SOFTWARE","fullyQualifiedDomainName":"anti-anti-anti-virus.dev"}],"registrarClientId":"cool-registrar","registrarEmailAddress":"cool@aid.net"}
{"threatMatches":[{"threatType":"MALWARE","fullyQualifiedDomainName":"111.com"},{"threatType":"POTENTIALLY_HARMFUL_APPLICATION","fullyQualifiedDomainName":"bitcoin.bank"}],"registrarClientId":"hello-registrar","registrarEmailAddress":"email@hello.net"}
{"threatMatches":[{"threatType":"THREAT_TYPE_UNSPECIFIED","fullyQualifiedDomainName":"no-eamil.com"},{"threatType":"SOCIAL_ENGINEERING","fullyQualifiedDomainName":"party-night.net"}],"registrarClientId":"kitty-registrar","registrarEmailAddress":"contact@kit.ty"}

View file

@ -37,24 +37,6 @@ steps:
cat tool-credential.json.enc | base64 -d | gcloud kms decrypt \
--ciphertext-file=- --plaintext-file=tool-credential.json \
--location=global --keyring=nomulus-tool-keyring --key=nomulus-tool-key
# Deploy the Spec11 pipeline to GCS.
#- name: 'gcr.io/$PROJECT_ID/builder:latest'
# entrypoint: /bin/bash
# args:
# - -c
# - |
# set -e
# if [ ${_ENV} == production ]; then
# project_id="domain-registry"
# else
# project_id="domain-registry-${_ENV}"
# fi
# echo "gs://$${project_id}-beam/cloudsql/admin_credential.enc"
# gsutil cp gs://$PROJECT_ID-deploy/${TAG_NAME}/nomulus.jar .
# java -jar nomulus.jar -e ${_ENV} --credential tool-credential.json \
# --sql_access_info \
# "gs://$${project_id}-beam/cloudsql/admin_credential.enc" \
# deploy_spec11_pipeline --project $${project_id}
# Deploy the invoicing pipeline to GCS.
- name: 'gcr.io/$PROJECT_ID/nomulus-tool:latest'
args:

View file

@ -73,7 +73,7 @@ steps:
# Build and package the deployment files for production.
- name: 'gcr.io/${PROJECT_ID}/builder:latest'
args: ['release/build_nomulus_for_env.sh', 'production', 'output']
# Build and stage init_sql_pipeline
# Build and stage Dataflow Flex templates.
- name: 'gcr.io/${PROJECT_ID}/builder:latest'
entrypoint: /bin/bash
# Set home for Gradle caches. Must be consistent with previous steps above
@ -89,7 +89,9 @@ steps:
google.registry.beam.initsql.InitSqlPipeline \
google/registry/beam/init_sql_pipeline_metadata.json \
google.registry.beam.datastore.BulkDeleteDatastorePipeline \
google/registry/beam/bulk_delete_datastore_pipeline_metadata.json
google/registry/beam/bulk_delete_datastore_pipeline_metadata.json \
google.registry.beam.spec11.Spec11Pipeline \
google/registry/beam/spec11_pipeline_metadata.json
# Tentatively build and publish Cloud SQL schema jar here, before schema release
# process is finalized. Also publish nomulus:core jars that are needed for
# server/schema compatibility tests.