diff --git a/core/build.gradle b/core/build.gradle index 2a433050c..75c5cba02 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -805,6 +805,10 @@ if (environment in ['alpha', 'crash']) { mainClass: 'google.registry.beam.datastore.BulkDeleteDatastorePipeline', metaData: 'google/registry/beam/bulk_delete_datastore_pipeline_metadata.json' ], + [ + mainClass: 'google.registry.beam.spec11.Spec11Pipeline', + metaData: 'google/registry/beam/spec11_pipeline_metadata.json' + ], ] project.tasks.create("stage_beam_pipelines") { doLast { diff --git a/core/src/main/java/google/registry/batch/WipeoutDatastoreAction.java b/core/src/main/java/google/registry/batch/WipeoutDatastoreAction.java index 4db43dfb9..a2adcfa09 100644 --- a/core/src/main/java/google/registry/batch/WipeoutDatastoreAction.java +++ b/core/src/main/java/google/registry/batch/WipeoutDatastoreAction.java @@ -15,6 +15,7 @@ package google.registry.batch; import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8; +import static google.registry.beam.BeamUtils.createJobName; import static javax.servlet.http.HttpServletResponse.SC_FORBIDDEN; import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; import static javax.servlet.http.HttpServletResponse.SC_OK; @@ -30,9 +31,8 @@ import google.registry.config.RegistryConfig.Config; import google.registry.request.Action; import google.registry.request.Response; import google.registry.request.auth.Auth; +import google.registry.util.Clock; import javax.inject.Inject; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; /** * Wipes out all Cloud Datastore data in a Nomulus GCP environment. @@ -58,17 +58,20 @@ public class WipeoutDatastoreAction implements Runnable { private final Response response; private final Dataflow dataflow; private final String stagingBucketUrl; + private final Clock clock; @Inject WipeoutDatastoreAction( @Config("projectId") String projectId, @Config("defaultJobRegion") String jobRegion, @Config("beamStagingBucketUrl") String stagingBucketUrl, + Clock clock, Response response, Dataflow dataflow) { this.projectId = projectId; this.jobRegion = jobRegion; this.stagingBucketUrl = stagingBucketUrl; + this.clock = clock; this.response = response; this.dataflow = dataflow; } @@ -86,10 +89,7 @@ public class WipeoutDatastoreAction implements Runnable { try { LaunchFlexTemplateParameter parameters = new LaunchFlexTemplateParameter() - // Job name must be unique and in [-a-z0-9]. - .setJobName( - "bulk-delete-datastore-" - + DateTime.now(DateTimeZone.UTC).toString("yyyy-MM-dd'T'HH-mm-ss'Z'")) + .setJobName(createJobName("bulk-delete-datastore-", clock)) .setContainerSpecGcsPath( String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME)) .setParameters(ImmutableMap.of("kindsToDelete", "*")); diff --git a/core/src/main/java/google/registry/beam/BeamUtils.java b/core/src/main/java/google/registry/beam/BeamUtils.java index eaa75e347..dd1b7d6bf 100644 --- a/core/src/main/java/google/registry/beam/BeamUtils.java +++ b/core/src/main/java/google/registry/beam/BeamUtils.java @@ -14,10 +14,14 @@ package google.registry.beam; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.io.Resources; +import google.registry.util.Clock; import google.registry.util.ResourceUtils; +import java.util.regex.Pattern; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; @@ -41,8 +45,7 @@ public class BeamUtils { ImmutableList fieldNames, SchemaAndRecord schemaAndRecord) { GenericRecord record = schemaAndRecord.getRecord(); ImmutableList nullFields = - fieldNames - .stream() + fieldNames.stream() .filter(fieldName -> record.get(fieldName) == null) .collect(ImmutableList.toImmutableList()); String missingFieldList = Joiner.on(", ").join(nullFields); @@ -61,4 +64,19 @@ public class BeamUtils { public static String getQueryFromFile(Class clazz, String filename) { return ResourceUtils.readResourceUtf8(Resources.getResource(clazz, "sql/" + filename)); } + + /** Creates a beam job name and validates that it conforms to the requirements. */ + public static String createJobName(String prefix, Clock clock) { + // Flex template job name must be unique and consists of only characters [-a-z0-9], starting + // with a letter and ending with a letter or number. So we replace the "T" and "Z" in ISO 8601 + // with lowercase letters. + String jobName = + String.format("%s-%s", prefix, clock.nowUtc().toString("yyyy-MM-dd't'HH-mm-ss'z'")); + checkArgument( + Pattern.compile("^[a-z][-a-z0-9]*[a-z0-9]*").matcher(jobName).matches(), + "The job name %s is illegal, it consists of only characters [-a-z0-9], " + + "starting with a letter and ending with a letter or number,", + jobName); + return jobName; + } } diff --git a/core/src/main/java/google/registry/beam/initsql/Transforms.java b/core/src/main/java/google/registry/beam/initsql/Transforms.java index 00eb3cc23..60a26b499 100644 --- a/core/src/main/java/google/registry/beam/initsql/Transforms.java +++ b/core/src/main/java/google/registry/beam/initsql/Transforms.java @@ -20,12 +20,9 @@ import static com.google.common.base.Preconditions.checkState; import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp; import static google.registry.beam.initsql.BackupPaths.getExportFilePatterns; import static google.registry.model.ofy.ObjectifyService.ofy; -import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; -import static google.registry.persistence.transaction.TransactionManagerFactory.setJpaTm; import static google.registry.util.DateTimeUtils.START_OF_TIME; import static google.registry.util.DateTimeUtils.isBeforeOrAt; import static java.util.Comparator.comparing; -import static org.apache.beam.sdk.values.TypeDescriptors.integers; import static org.apache.beam.sdk.values.TypeDescriptors.kvs; import static org.apache.beam.sdk.values.TypeDescriptors.strings; @@ -36,14 +33,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; -import com.googlecode.objectify.Key; -import google.registry.backup.AppEngineEnvironment; import google.registry.backup.CommitLogImports; import google.registry.backup.VersionedEntity; import google.registry.model.domain.DomainBase; -import google.registry.model.ofy.ObjectifyService; import google.registry.model.reporting.HistoryEntry; -import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.schema.replay.DatastoreAndSqlEntity; import google.registry.schema.replay.SqlEntity; import google.registry.tools.LevelDbLogReader; @@ -53,7 +46,6 @@ import java.util.Iterator; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -62,18 +54,14 @@ import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.FileIO.ReadableFile; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ProcessFunction; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -268,80 +256,6 @@ public final class Transforms { .iterator())); } - /** - * Returns a {@link PTransform} that writes a {@link PCollection} of {@link VersionedEntity}s to a - * SQL database. and outputs an empty {@code PCollection}. This allows other operations to - * {@link org.apache.beam.sdk.transforms.Wait wait} for the completion of this transform. - * - *

Errors are handled according to the pipeline runner's default policy. As part of a one-time - * job, we will not add features unless proven necessary. - * - * @param transformId a unique ID for an instance of the returned transform - * @param maxWriters the max number of concurrent writes to SQL, which also determines the max - * number of connection pools created - * @param batchSize the number of entities to write in each operation - * @param jpaSupplier supplier of a {@link JpaTransactionManager} - */ - public static PTransform, PCollection> writeToSql( - String transformId, - int maxWriters, - int batchSize, - SerializableSupplier jpaSupplier) { - return writeToSql( - transformId, - maxWriters, - batchSize, - jpaSupplier, - Transforms::convertVersionedEntityToSqlEntity, - TypeDescriptor.of(VersionedEntity.class)); - } - - /** - * Returns a {@link PTransform} that writes a {@link PCollection} of entities to a SQL database. - * and outputs an empty {@code PCollection}. This allows other operations to {@link - * org.apache.beam.sdk.transforms.Wait wait} for the completion of this transform. - * - *

The converter and type descriptor are generics so that we can convert any type of entity to - * an object to be placed in SQL. - * - *

Errors are handled according to the pipeline runner's default policy. As part of a one-time - * job, we will not add features unless proven necessary. - * - * @param transformId a unique ID for an instance of the returned transform - * @param maxWriters the max number of concurrent writes to SQL, which also determines the max - * number of connection pools created - * @param batchSize the number of entities to write in each operation - * @param jpaSupplier supplier of a {@link JpaTransactionManager} - * @param jpaConverter the function that converts the input object to a JPA entity - * @param objectDescriptor the type descriptor of the input object - */ - public static PTransform, PCollection> writeToSql( - String transformId, - int maxWriters, - int batchSize, - SerializableSupplier jpaSupplier, - SerializableFunction jpaConverter, - TypeDescriptor objectDescriptor) { - return new PTransform, PCollection>() { - @Override - public PCollection expand(PCollection input) { - return input - .apply( - "Shard data for " + transformId, - MapElements.into(kvs(integers(), objectDescriptor)) - .via(ve -> KV.of(ThreadLocalRandom.current().nextInt(maxWriters), ve))) - .apply("Batch output by shard " + transformId, GroupIntoBatches.ofSize(batchSize)) - .apply( - "Write in batch for " + transformId, - ParDo.of(new SqlBatchWriter(transformId, jpaSupplier, jpaConverter))); - } - }; - } - - private static Key toOfyKey(Object ofyEntity) { - return Key.create(ofyEntity); - } - private static boolean isMigratable(Entity entity) { if (entity.getKind().equals("HistoryEntry")) { // DOMAIN_APPLICATION_CREATE is deprecated type and should not be migrated. @@ -458,93 +372,6 @@ public final class Transforms { } } - /** - * Writes a batch of entities to a SQL database. - * - *

Note that an arbitrary number of instances of this class may be created and freed in - * arbitrary order in a single JVM. Due to the tech debt that forced us to use a static variable - * to hold the {@code JpaTransactionManager} instance, we must ensure that JpaTransactionManager - * is not changed or torn down while being used by some instance. - */ - private static class SqlBatchWriter extends DoFn>, Void> { - - private static int instanceCount = 0; - private static JpaTransactionManager originalJpa; - - private Counter counter; - - private final SerializableSupplier jpaSupplier; - private final SerializableFunction jpaConverter; - - SqlBatchWriter( - String type, - SerializableSupplier jpaSupplier, - SerializableFunction jpaConverter) { - counter = Metrics.counter("SQL_WRITE", type); - this.jpaSupplier = jpaSupplier; - this.jpaConverter = jpaConverter; - } - - @Setup - public void setup() { - try (AppEngineEnvironment env = new AppEngineEnvironment()) { - ObjectifyService.initOfy(); - } - - synchronized (SqlBatchWriter.class) { - if (instanceCount == 0) { - originalJpa = jpaTm(); - setJpaTm(jpaSupplier); - } - instanceCount++; - } - } - - @Teardown - public void teardown() { - synchronized (SqlBatchWriter.class) { - instanceCount--; - if (instanceCount == 0) { - jpaTm().teardown(); - setJpaTm(() -> originalJpa); - } - } - } - - @ProcessElement - public void processElement(@Element KV> kv) { - try (AppEngineEnvironment env = new AppEngineEnvironment()) { - ImmutableList ofyEntities = - Streams.stream(kv.getValue()) - .map(this.jpaConverter::apply) - // TODO(b/177340730): post migration delete the line below. - .filter(Objects::nonNull) - .collect(ImmutableList.toImmutableList()); - try { - jpaTm().transact(() -> jpaTm().putAll(ofyEntities)); - counter.inc(ofyEntities.size()); - } catch (RuntimeException e) { - processSingly(ofyEntities); - } - } - } - - /** - * Writes entities in a failed batch one by one to identify the first bad entity and throws a - * {@link RuntimeException} on it. - */ - private void processSingly(ImmutableList ofyEntities) { - for (Object ofyEntity : ofyEntities) { - try { - jpaTm().transact(() -> jpaTm().put(ofyEntity)); - counter.inc(); - } catch (RuntimeException e) { - throw new RuntimeException(toOfyKey(ofyEntity).toString(), e); - } - } - } - } - /** * Removes BillingEvents, {@link google.registry.model.poll.PollMessage PollMessages} and {@link * google.registry.model.host.HostResource} from a {@link DomainBase}. These are circular foreign diff --git a/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java b/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java index 57aa324e2..f896e1c89 100644 --- a/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java +++ b/core/src/main/java/google/registry/beam/spec11/SafeBrowsingTransforms.java @@ -14,7 +14,6 @@ package google.registry.beam.spec11; - import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.http.HttpStatus.SC_OK; @@ -30,7 +29,6 @@ import java.net.URISyntaxException; import java.util.LinkedHashMap; import java.util.Map; import java.util.function.Supplier; -import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.values.KV; @@ -73,7 +71,7 @@ public class SafeBrowsingTransforms { private static final int BATCH_SIZE = 490; /** Provides the SafeBrowsing API key at runtime. */ - private final ValueProvider apiKeyProvider; + private final String apiKey; /** * Maps a subdomain's {@code fullyQualifiedDomainName} to its corresponding {@link Subdomain} to @@ -93,20 +91,18 @@ public class SafeBrowsingTransforms { private final Retrier retrier; /** - * Constructs a {@link EvaluateSafeBrowsingFn} that gets its API key from the given provider. + * Constructs a {@link EvaluateSafeBrowsingFn} with a given API key. * *

We need to dual-cast the closeableHttpClientSupplier lambda because all {@code DoFn} * member variables need to be serializable. The (Supplier & Serializable) dual cast is safe * because class methods are generally serializable, especially a static function such as {@link * HttpClients#createDefault()}. - * - * @param apiKeyProvider provides the SafeBrowsing API key from {@code KMS} at runtime */ @SuppressWarnings("unchecked") - EvaluateSafeBrowsingFn(ValueProvider apiKeyProvider, Retrier retrier) { - this.apiKeyProvider = apiKeyProvider; + EvaluateSafeBrowsingFn(String apiKey, Retrier retrier) { + this.apiKey = apiKey; this.retrier = retrier; - this.closeableHttpClientSupplier = (Supplier & Serializable) HttpClients::createDefault; + closeableHttpClientSupplier = (Supplier & Serializable) HttpClients::createDefault; } /** @@ -117,12 +113,10 @@ public class SafeBrowsingTransforms { */ @VisibleForTesting EvaluateSafeBrowsingFn( - ValueProvider apiKeyProvider, - Retrier retrier, - Supplier clientSupplier) { - this.apiKeyProvider = apiKeyProvider; + String apiKey, Retrier retrier, Supplier clientSupplier) { + this.apiKey = apiKey; this.retrier = retrier; - this.closeableHttpClientSupplier = clientSupplier; + closeableHttpClientSupplier = clientSupplier; } /** Evaluates any buffered {@link Subdomain} objects upon completing the bundle. */ @@ -159,7 +153,7 @@ public class SafeBrowsingTransforms { try { URIBuilder uriBuilder = new URIBuilder(SAFE_BROWSING_URL); // Add the API key param - uriBuilder.addParameter("key", apiKeyProvider.get()); + uriBuilder.addParameter("key", apiKey); HttpPost httpPost = new HttpPost(uriBuilder.build()); httpPost.addHeader(HTTP.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()); @@ -175,7 +169,7 @@ public class SafeBrowsingTransforms { } }, IOException.class); - } catch (URISyntaxException | JSONException e) { + } catch (URISyntaxException | JSONException e) { // Fail the pipeline on a parsing exception- this indicates the API likely changed. throw new RuntimeException("Caught parsing exception, failing pipeline.", e); } finally { @@ -239,7 +233,9 @@ public class SafeBrowsingTransforms { String url = match.getJSONObject("threat").getString("url"); Subdomain subdomain = subdomainBuffer.get(url); resultBuilder.add( - KV.of(subdomain, ThreatMatch.create(match, subdomain.domainName()))); + KV.of( + subdomain, + ThreatMatch.create(match.getString("threatType"), subdomain.domainName()))); } } } diff --git a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java index 1b061ad24..674e9d22e 100644 --- a/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java +++ b/core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java @@ -17,32 +17,28 @@ package google.registry.beam.spec11; import static com.google.common.base.Preconditions.checkArgument; import static google.registry.beam.BeamUtils.getQueryFromFile; -import com.google.auth.oauth2.GoogleCredentials; import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; -import google.registry.beam.initsql.Transforms; -import google.registry.beam.initsql.Transforms.SerializableSupplier; +import dagger.Component; +import dagger.Module; +import dagger.Provides; +import google.registry.beam.common.RegistryJpaIO; import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn; -import google.registry.config.CredentialModule.LocalCredential; -import google.registry.config.RegistryConfig.Config; +import google.registry.config.RegistryConfig.ConfigModule; import google.registry.model.reporting.Spec11ThreatMatch; import google.registry.model.reporting.Spec11ThreatMatch.ThreatType; -import google.registry.persistence.transaction.JpaTransactionManager; -import google.registry.util.GoogleCredentialsBundle; import google.registry.util.Retrier; import google.registry.util.SqlTemplate; +import google.registry.util.UtilsModule; import java.io.Serializable; -import javax.inject.Inject; -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import javax.inject.Singleton; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; -import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; @@ -58,21 +54,20 @@ import org.json.JSONException; import org.json.JSONObject; /** - * Definition of a Dataflow pipeline template, which generates a given month's spec11 report. + * Definition of a Dataflow Flex template, which generates a given month's spec11 report. * - *

To stage this template on GCS, run the {@link - * google.registry.tools.DeploySpec11PipelineCommand} Nomulus command. + *

To stage this template locally, run the {@code stage_beam_pipeline.sh} shell script. * *

Then, you can run the staged template via the API client library, gCloud or a raw REST call. * - * @see Dataflow Templates + * @see Using + * Flex Templates */ public class Spec11Pipeline implements Serializable { /** * Returns the subdirectory spec11 reports reside in for a given local date in yyyy-MM-dd format. * - * @see google.registry.beam.spec11.Spec11Pipeline * @see google.registry.reporting.spec11.Spec11EmailUtils */ public static String getSpec11ReportFilePath(LocalDate localDate) { @@ -87,84 +82,35 @@ public class Spec11Pipeline implements Serializable { /** The JSON object field into which we put the threat match array for Spec11 reports. */ public static final String THREAT_MATCHES_FIELD = "threatMatches"; - private final String projectId; - private final String beamJobRegion; - private final String beamStagingUrl; - private final String spec11TemplateUrl; - private final String reportingBucketUrl; - private final GoogleCredentials googleCredentials; - private final Retrier retrier; - private final SerializableSupplier jpaSupplierFactory; + private final Spec11PipelineOptions options; + private final EvaluateSafeBrowsingFn safeBrowsingFn; + private final Pipeline pipeline; - @Inject - public Spec11Pipeline( - @Config("projectId") String projectId, - @Config("defaultJobRegion") String beamJobRegion, - @Config("beamStagingUrl") String beamStagingUrl, - @Config("spec11TemplateUrl") String spec11TemplateUrl, - @Config("reportingBucketUrl") String reportingBucketUrl, - SerializableSupplier jpaSupplierFactory, - @LocalCredential GoogleCredentialsBundle googleCredentialsBundle, - Retrier retrier) { - this.projectId = projectId; - this.beamJobRegion = beamJobRegion; - this.beamStagingUrl = beamStagingUrl; - this.spec11TemplateUrl = spec11TemplateUrl; - this.reportingBucketUrl = reportingBucketUrl; - this.jpaSupplierFactory = jpaSupplierFactory; - this.googleCredentials = googleCredentialsBundle.getGoogleCredentials(); - this.retrier = retrier; + @VisibleForTesting + Spec11Pipeline( + Spec11PipelineOptions options, EvaluateSafeBrowsingFn safeBrowsingFn, Pipeline pipeline) { + this.options = options; + this.safeBrowsingFn = safeBrowsingFn; + this.pipeline = pipeline; } - /** Custom options for running the spec11 pipeline. */ - public interface Spec11PipelineOptions extends DataflowPipelineOptions { - /** Returns the local date we're generating the report for, in yyyy-MM-dd format. */ - @Description("The local date we generate the report for, in yyyy-MM-dd format.") - ValueProvider getDate(); - - /** - * Sets the local date we generate invoices for. - * - *

This is implicitly set when executing the Dataflow template, by specifying the "date" - * parameter. - */ - void setDate(ValueProvider value); - - /** Returns the SafeBrowsing API key we use to evaluate subdomain health. */ - @Description("The API key we use to access the SafeBrowsing API.") - ValueProvider getSafeBrowsingApiKey(); - - /** - * Sets the SafeBrowsing API key we use. - * - *

This is implicitly set when executing the Dataflow template, by specifying the - * "safeBrowsingApiKey" parameter. - */ - void setSafeBrowsingApiKey(ValueProvider value); + Spec11Pipeline(Spec11PipelineOptions options, EvaluateSafeBrowsingFn safeBrowsingFn) { + this(options, safeBrowsingFn, Pipeline.create(options)); } - /** Deploys the spec11 pipeline as a template on GCS. */ - public void deploy() { - // We can't store options as a member variable due to serialization concerns. - Spec11PipelineOptions options = PipelineOptionsFactory.as(Spec11PipelineOptions.class); - options.setProject(projectId); - options.setRegion(beamJobRegion); - options.setRunner(DataflowRunner.class); - // This causes p.run() to stage the pipeline as a template on GCS, as opposed to running it. - options.setTemplateLocation(spec11TemplateUrl); - options.setStagingLocation(beamStagingUrl); - // This credential is used when Dataflow deploys the template to GCS in target GCP project. - // So, make sure the credential has write permission to GCS in that project. - options.setGcpCredential(googleCredentials); + PipelineResult run() { + setupPipeline(); + return pipeline.run(); + } - Pipeline p = Pipeline.create(options); + void setupPipeline() { PCollection domains = - p.apply( + pipeline.apply( "Read active domains from BigQuery", BigQueryIO.read(Subdomain::parseFromRecord) .fromQuery( SqlTemplate.create(getQueryFromFile(Spec11Pipeline.class, "subdomains.sql")) - .put("PROJECT_ID", projectId) + .put("PROJECT_ID", options.getProject()) .put("DATASTORE_EXPORT_DATASET", "latest_datastore_export") .put("REGISTRAR_TABLE", "Registrar") .put("DOMAIN_BASE_TABLE", "DomainBase") @@ -174,48 +120,40 @@ public class Spec11Pipeline implements Serializable { .withoutValidation() .withTemplateCompatibility()); - evaluateUrlHealth( - domains, - new EvaluateSafeBrowsingFn(options.getSafeBrowsingApiKey(), retrier), - options.getDate()); - p.run(); + PCollection> threatMatches = + domains.apply("Run through SafeBrowsing API", ParDo.of(safeBrowsingFn)); + + saveToSql(threatMatches, options); + saveToGcs(threatMatches, options); } - /** - * Evaluate each {@link Subdomain} URL via the SafeBrowsing API. - * - *

This is factored out to facilitate testing. - */ - void evaluateUrlHealth( - PCollection domains, - EvaluateSafeBrowsingFn evaluateSafeBrowsingFn, - ValueProvider dateProvider) { - PCollection> subdomainsSql = - domains.apply("Run through SafeBrowsing API", ParDo.of(evaluateSafeBrowsingFn)); - TypeDescriptor> descriptor = - new TypeDescriptor>() {}; - subdomainsSql.apply( - Transforms.writeToSql( - "Spec11ThreatMatch", - 4, - 4, - jpaSupplierFactory, - (kv) -> { - Subdomain subdomain = kv.getKey(); - return new Spec11ThreatMatch.Builder() - .setThreatTypes(ImmutableSet.of(ThreatType.valueOf(kv.getValue().threatType()))) - .setCheckDate(LocalDate.parse(dateProvider.get(), ISODateTimeFormat.date())) - .setDomainName(subdomain.domainName()) - .setDomainRepoId(subdomain.domainRepoId()) - .setRegistrarId(subdomain.registrarId()) - .build(); - }, - descriptor)); + static void saveToSql( + PCollection> threatMatches, Spec11PipelineOptions options) { + String transformId = "Spec11 Threat Matches"; + LocalDate date = LocalDate.parse(options.getDate(), ISODateTimeFormat.date()); + threatMatches.apply( + "Write to Sql: " + transformId, + RegistryJpaIO.>write() + .withName(transformId) + .withBatchSize(options.getSqlWriteBatchSize()) + .withShards(options.getSqlWriteShards()) + .withJpaConverter( + (kv) -> { + Subdomain subdomain = kv.getKey(); + return new Spec11ThreatMatch.Builder() + .setThreatTypes( + ImmutableSet.of(ThreatType.valueOf(kv.getValue().threatType()))) + .setCheckDate(date) + .setDomainName(subdomain.domainName()) + .setDomainRepoId(subdomain.domainRepoId()) + .setRegistrarId(subdomain.registrarId()) + .build(); + })); + } - /* Store ThreatMatch objects in JSON. */ - PCollection> subdomainsJson = - domains.apply("Run through SafeBrowsingAPI", ParDo.of(evaluateSafeBrowsingFn)); - subdomainsJson + static void saveToGcs( + PCollection> threatMatches, Spec11PipelineOptions options) { + threatMatches .apply( "Map registrar ID to email/ThreatMatch pair", MapElements.into( @@ -260,17 +198,54 @@ public class Spec11Pipeline implements Serializable { "Output to text file", TextIO.write() .to( - NestedValueProvider.of( - dateProvider, - date -> - String.format( - "%s/%s", - reportingBucketUrl, - getSpec11ReportFilePath(LocalDate.parse(date))))) + String.format( + "%s/%s", + options.getReportingBucketUrl(), + getSpec11ReportFilePath(LocalDate.parse(options.getDate())))) .withoutSharding() .withHeader("Map from registrar email / name to detected subdomain threats:")); } + public static void main(String[] args) { + PipelineOptionsFactory.register(Spec11PipelineOptions.class); + DaggerSpec11Pipeline_Spec11PipelineComponent.builder() + .spec11PipelineModule(new Spec11PipelineModule(args)) + .build() + .spec11Pipeline() + .run(); + } + + @Module + static class Spec11PipelineModule { + private final String[] args; + + Spec11PipelineModule(String[] args) { + this.args = args; + } + + @Provides + Spec11PipelineOptions provideOptions() { + return PipelineOptionsFactory.fromArgs(args).withValidation().as(Spec11PipelineOptions.class); + } + + @Provides + EvaluateSafeBrowsingFn provideSafeBrowsingFn(Spec11PipelineOptions options, Retrier retrier) { + return new EvaluateSafeBrowsingFn(options.getSafeBrowsingApiKey(), retrier); + } + + @Provides + Spec11Pipeline providePipeline( + Spec11PipelineOptions options, EvaluateSafeBrowsingFn safeBrowsingFn) { + return new Spec11Pipeline(options, safeBrowsingFn); + } + } + + @Component(modules = {Spec11PipelineModule.class, UtilsModule.class, ConfigModule.class}) + @Singleton + interface Spec11PipelineComponent { + Spec11Pipeline spec11Pipeline(); + } + @AutoValue abstract static class EmailAndThreatMatch implements Serializable { diff --git a/core/src/main/java/google/registry/beam/spec11/Spec11PipelineOptions.java b/core/src/main/java/google/registry/beam/spec11/Spec11PipelineOptions.java new file mode 100644 index 000000000..7e3ab546b --- /dev/null +++ b/core/src/main/java/google/registry/beam/spec11/Spec11PipelineOptions.java @@ -0,0 +1,37 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.spec11; + +import google.registry.beam.common.RegistryPipelineOptions; +import org.apache.beam.sdk.options.Description; + +/** Custom options for running the spec11 pipeline. */ +public interface Spec11PipelineOptions extends RegistryPipelineOptions { + + @Description("The local date we generate the report for, in yyyy-MM-dd format.") + String getDate(); + + void setDate(String value); + + @Description("The API key we use to access the SafeBrowsing API.") + String getSafeBrowsingApiKey(); + + void setSafeBrowsingApiKey(String value); + + @Description("The GCS bucket URL for Spec11 reports to be uploaded.") + String getReportingBucketUrl(); + + void setReportingBucketUrl(String value); +} diff --git a/core/src/main/java/google/registry/beam/spec11/ThreatMatch.java b/core/src/main/java/google/registry/beam/spec11/ThreatMatch.java index e88a3c3df..c82eb7eae 100644 --- a/core/src/main/java/google/registry/beam/spec11/ThreatMatch.java +++ b/core/src/main/java/google/registry/beam/spec11/ThreatMatch.java @@ -15,6 +15,7 @@ package google.registry.beam.spec11; import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; import java.io.Serializable; import org.json.JSONException; import org.json.JSONObject; @@ -31,16 +32,9 @@ public abstract class ThreatMatch implements Serializable { /** Returns the fully qualified domain name [SLD].[TLD] of the matched threat. */ public abstract String fullyQualifiedDomainName(); - /** - * Constructs a {@link ThreatMatch} by parsing a {@code SafeBrowsing API} response {@link - * JSONObject}. - * - * @throws JSONException when encountering parse errors in the response format - */ - static ThreatMatch create(JSONObject threatMatchJSON, String fullyQualifiedDomainName) - throws JSONException { - return new AutoValue_ThreatMatch( - threatMatchJSON.getString(THREAT_TYPE_FIELD), fullyQualifiedDomainName); + @VisibleForTesting + static ThreatMatch create(String threatType, String fullyQualifiedDomainName) { + return new AutoValue_ThreatMatch(threatType, fullyQualifiedDomainName); } /** Returns a {@link JSONObject} representing a subset of this object's data. */ diff --git a/core/src/main/java/google/registry/reporting/spec11/GenerateSpec11ReportAction.java b/core/src/main/java/google/registry/reporting/spec11/GenerateSpec11ReportAction.java index 221590378..a6c601d25 100644 --- a/core/src/main/java/google/registry/reporting/spec11/GenerateSpec11ReportAction.java +++ b/core/src/main/java/google/registry/reporting/spec11/GenerateSpec11ReportAction.java @@ -14,6 +14,7 @@ package google.registry.reporting.spec11; +import static google.registry.beam.BeamUtils.createJobName; import static google.registry.reporting.ReportingModule.PARAM_DATE; import static google.registry.reporting.ReportingUtils.enqueueBeamReportingTask; import static google.registry.request.Action.Method.POST; @@ -21,19 +22,21 @@ import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; import static javax.servlet.http.HttpServletResponse.SC_OK; import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.dataflow.model.LaunchTemplateParameters; -import com.google.api.services.dataflow.model.LaunchTemplateResponse; -import com.google.api.services.dataflow.model.RuntimeEnvironment; +import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter; +import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; +import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; import com.google.common.collect.ImmutableMap; import com.google.common.flogger.FluentLogger; import com.google.common.net.MediaType; import google.registry.config.RegistryConfig.Config; +import google.registry.config.RegistryEnvironment; import google.registry.keyring.api.KeyModule.Key; import google.registry.reporting.ReportingModule; import google.registry.request.Action; import google.registry.request.Parameter; import google.registry.request.Response; import google.registry.request.auth.Auth; +import google.registry.util.Clock; import java.io.IOException; import java.util.Map; import javax.inject.Inject; @@ -55,55 +58,68 @@ public class GenerateSpec11ReportAction implements Runnable { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); static final String PATH = "/_dr/task/generateSpec11"; + static final String PIPELINE_NAME = "spec11_pipeline"; private final String projectId; - private final String beamBucketUrl; - private final String spec11TemplateUrl; - private final String jobZone; + private final String jobRegion; + private final String stagingBucketUrl; + private final String reportingBucketUrl; private final String apiKey; private final LocalDate date; + private final Clock clock; private final Response response; private final Dataflow dataflow; @Inject GenerateSpec11ReportAction( @Config("projectId") String projectId, - @Config("apacheBeamBucketUrl") String beamBucketUrl, - @Config("spec11TemplateUrl") String spec11TemplateUrl, - @Config("defaultJobZone") String jobZone, + @Config("defaultJobRegion") String jobRegion, + @Config("beamStagingBucketUrl") String stagingBucketUrl, + @Config("reportingBucketUrl") String reportingBucketUrl, @Key("safeBrowsingAPIKey") String apiKey, @Parameter(PARAM_DATE) LocalDate date, + Clock clock, Response response, Dataflow dataflow) { this.projectId = projectId; - this.beamBucketUrl = beamBucketUrl; - this.spec11TemplateUrl = spec11TemplateUrl; - this.jobZone = jobZone; + this.jobRegion = jobRegion; + this.stagingBucketUrl = stagingBucketUrl; + this.reportingBucketUrl = reportingBucketUrl; this.apiKey = apiKey; this.date = date; + this.clock = clock; this.response = response; this.dataflow = dataflow; } @Override public void run() { + response.setContentType(MediaType.PLAIN_TEXT_UTF_8); try { - LaunchTemplateParameters params = - new LaunchTemplateParameters() - .setJobName(String.format("spec11_%s", date)) - .setEnvironment( - new RuntimeEnvironment() - .setZone(jobZone) - .setTempLocation(beamBucketUrl + "/temporary")) + LaunchFlexTemplateParameter parameter = + new LaunchFlexTemplateParameter() + .setJobName(createJobName("spec11", clock)) + .setContainerSpecGcsPath( + String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME)) .setParameters( ImmutableMap.of( - "safeBrowsingApiKey", apiKey, ReportingModule.PARAM_DATE, date.toString())); - LaunchTemplateResponse launchResponse = + "safeBrowsingApiKey", + apiKey, + ReportingModule.PARAM_DATE, + date.toString(), + "reportingBucketUrl", + reportingBucketUrl, + "registryEnvironment", + RegistryEnvironment.get().name())); + LaunchFlexTemplateResponse launchResponse = dataflow .projects() - .templates() - .launch(projectId, params) - .setGcsPath(spec11TemplateUrl) + .locations() + .flexTemplates() + .launch( + projectId, + jobRegion, + new LaunchFlexTemplateRequest().setLaunchParameter(parameter)) .execute(); Map beamTaskParameters = ImmutableMap.of( @@ -116,12 +132,10 @@ public class GenerateSpec11ReportAction implements Runnable { } catch (IOException e) { logger.atWarning().withCause(e).log("Template Launch failed"); response.setStatus(SC_INTERNAL_SERVER_ERROR); - response.setContentType(MediaType.PLAIN_TEXT_UTF_8); response.setPayload(String.format("Template launch failed: %s", e.getMessage())); return; } response.setStatus(SC_OK); - response.setContentType(MediaType.PLAIN_TEXT_UTF_8); response.setPayload("Launched Spec11 dataflow template."); } } diff --git a/core/src/main/java/google/registry/reporting/spec11/PublishSpec11ReportAction.java b/core/src/main/java/google/registry/reporting/spec11/PublishSpec11ReportAction.java index f7cabe2e3..6cc9eba8c 100644 --- a/core/src/main/java/google/registry/reporting/spec11/PublishSpec11ReportAction.java +++ b/core/src/main/java/google/registry/reporting/spec11/PublishSpec11ReportAction.java @@ -69,6 +69,7 @@ public class PublishSpec11ReportAction implements Runnable { private static final String JOB_FAILED = "JOB_STATE_FAILED"; private final String projectId; + private final String jobRegion; private final String registryName; private final String jobId; private final Spec11EmailUtils emailUtils; @@ -80,6 +81,7 @@ public class PublishSpec11ReportAction implements Runnable { @Inject PublishSpec11ReportAction( @Config("projectId") String projectId, + @Config("defaultJobRegion") String jobRegion, @Config("registryName") String registryName, @Parameter(ReportingModule.PARAM_JOB_ID) String jobId, Spec11EmailUtils emailUtils, @@ -88,6 +90,7 @@ public class PublishSpec11ReportAction implements Runnable { Response response, @Parameter(PARAM_DATE) LocalDate date) { this.projectId = projectId; + this.jobRegion = jobRegion; this.registryName = registryName; this.jobId = jobId; this.emailUtils = emailUtils; @@ -101,7 +104,7 @@ public class PublishSpec11ReportAction implements Runnable { public void run() { try { logger.atInfo().log("Starting publish job."); - Job job = dataflow.projects().jobs().get(projectId, jobId).execute(); + Job job = dataflow.projects().locations().jobs().get(projectId, jobRegion, jobId).execute(); String state = job.getCurrentState(); switch (state) { case JOB_DONE: diff --git a/core/src/main/java/google/registry/tools/DeploySpec11PipelineCommand.java b/core/src/main/java/google/registry/tools/DeploySpec11PipelineCommand.java deleted file mode 100644 index d3fc27f8d..000000000 --- a/core/src/main/java/google/registry/tools/DeploySpec11PipelineCommand.java +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2018 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.tools; - -import com.beust.jcommander.Parameter; -import com.beust.jcommander.Parameters; -import google.registry.beam.initsql.BeamJpaModule.JpaTransactionManagerComponent; -import google.registry.beam.initsql.JpaSupplierFactory; -import google.registry.beam.spec11.Spec11Pipeline; -import google.registry.config.CredentialModule.LocalCredential; -import google.registry.config.RegistryConfig.Config; -import google.registry.util.GoogleCredentialsBundle; -import google.registry.util.Retrier; -import javax.annotation.Nullable; -import javax.inject.Inject; - -/** Nomulus command that deploys the {@link Spec11Pipeline} template. */ -@Parameters(commandDescription = "Deploy the Spec11 pipeline to GCS.") -public class DeploySpec11PipelineCommand implements Command { - - @Inject - @Config("projectId") - String projectId; - - @Inject - @Config("defaultJobRegion") - String beamJobRegion; - - @Parameter( - names = {"-p", "--project"}, - description = "Cloud KMS project ID", - required = true) - String cloudKmsProjectId; - - @Inject - @Config("beamStagingUrl") - String beamStagingUrl; - - @Inject - @Config("spec11TemplateUrl") - String spec11TemplateUrl; - - @Inject - @Config("reportingBucketUrl") - String reportingBucketUrl; - - @Inject @LocalCredential GoogleCredentialsBundle googleCredentialsBundle; - @Inject Retrier retrier; - - @Inject - @Nullable - @Config("sqlAccessInfoFile") - String sqlAccessInfoFile; - - @Override - public void run() { - JpaSupplierFactory jpaSupplierFactory = - new JpaSupplierFactory( - sqlAccessInfoFile, - cloudKmsProjectId, - JpaTransactionManagerComponent::cloudSqlJpaTransactionManager); - - Spec11Pipeline pipeline = - new Spec11Pipeline( - projectId, - beamJobRegion, - beamStagingUrl, - spec11TemplateUrl, - reportingBucketUrl, - jpaSupplierFactory, - googleCredentialsBundle, - retrier); - pipeline.deploy(); - } -} diff --git a/core/src/main/java/google/registry/tools/RegistryTool.java b/core/src/main/java/google/registry/tools/RegistryTool.java index 0e2b410d9..f25997a37 100644 --- a/core/src/main/java/google/registry/tools/RegistryTool.java +++ b/core/src/main/java/google/registry/tools/RegistryTool.java @@ -63,7 +63,6 @@ public final class RegistryTool { .put("delete_reserved_list", DeleteReservedListCommand.class) .put("delete_tld", DeleteTldCommand.class) .put("deploy_invoicing_pipeline", DeployInvoicingPipelineCommand.class) - .put("deploy_spec11_pipeline", DeploySpec11PipelineCommand.class) .put("encrypt_escrow_deposit", EncryptEscrowDepositCommand.class) .put("execute_epp", ExecuteEppCommand.class) .put("generate_allocation_tokens", GenerateAllocationTokensCommand.class) diff --git a/core/src/main/java/google/registry/tools/RegistryToolComponent.java b/core/src/main/java/google/registry/tools/RegistryToolComponent.java index bb895b3b7..cde6b06ce 100644 --- a/core/src/main/java/google/registry/tools/RegistryToolComponent.java +++ b/core/src/main/java/google/registry/tools/RegistryToolComponent.java @@ -109,8 +109,6 @@ interface RegistryToolComponent { void inject(DeployInvoicingPipelineCommand command); - void inject(DeploySpec11PipelineCommand command); - void inject(EncryptEscrowDepositCommand command); void inject(GenerateAllocationTokensCommand command); diff --git a/core/src/main/java/google/registry/beam/spec11/sql/subdomains.sql b/core/src/main/resources/google/registry/beam/spec11/sql/subdomains.sql similarity index 100% rename from core/src/main/java/google/registry/beam/spec11/sql/subdomains.sql rename to core/src/main/resources/google/registry/beam/spec11/sql/subdomains.sql diff --git a/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json new file mode 100644 index 000000000..f9e17b2c1 --- /dev/null +++ b/core/src/main/resources/google/registry/beam/spec11_pipeline_metadata.json @@ -0,0 +1,66 @@ +{ + "name": "Spec11 Report Generation", + "description": "An Apache Beam batch pipeline that reads from a Datastore export and generate Spec11 report for the month, saving it to both SQL and as a JSON file on GCS.", + "parameters": [ + { + "name": "registryEnvironment", + "label": "The Registry environment.", + "helpText": "The Registry environment, required if environment-specific initialization is needed on worker VMs.", + "is_optional": true, + "regexes": [ + "^[0-9A-Z_]+$" + ] + }, + { + "name": "isolationOverride", + "label": "The desired SQL transaction isolation level.", + "helpText": "The desired SQL transaction isolation level.", + "is_optional": true, + "regexes": [ + "^[0-9A-Z_]+$" + ] + }, + { + "name": "sqlWriteBatchSize", + "label": "SQL write batch size.", + "helpText": "The number of entities to write to the SQL database in one operation.", + "is_optional": true, + "regexes": [ + "^[1-9][0-9]*$" + ] + }, + { + "name": "sqlWriteShards", + "label": "Number of output shards to create when writing to SQL.", + "helpText": "Number of shards to create out of the data before writing to the SQL database. Please refer to the Javadoc of RegistryJpaIO.Write.shards() for how to choose this value.", + "is_optional": true, + "regexes": [ + "^[1-9][0-9]*$" + ] + }, + { + "name": "date", + "label": "The date when the pipeline runs", + "helpText": "The date then the threat scan is performed, in yyyy-MM-dd format.", + "regexes": [ + "^2[0-9]{3}-(0[1-9]|1[0-2])-(0[1-9]|[12][0-9]|3[01])$" + ] + }, + { + "name": "safeBrowsingApiKey", + "label": "API Key used to identify the project with Safe Browsing", + "helpText": "The earliest CommitLogs to load, in ISO8601 format.", + "regexes": [ + "^[0-9a-zA-Z_]+[\\n]?$" + ] + }, + { + "name": "reportingBucketUrl", + "label": "Spec11 report upload dir.", + "helpText": "The root directory of the report to upload.", + "regexes": [ + "^gs:\\/\\/[^\\n\\r]+$" + ] + } + ] +} diff --git a/core/src/test/java/google/registry/batch/WipeOutDatastoreActionTest.java b/core/src/test/java/google/registry/batch/WipeOutDatastoreActionTest.java index 3af391c2a..c3b4d9d42 100644 --- a/core/src/test/java/google/registry/batch/WipeOutDatastoreActionTest.java +++ b/core/src/test/java/google/registry/batch/WipeOutDatastoreActionTest.java @@ -31,6 +31,7 @@ import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; +import google.registry.testing.FakeClock; import google.registry.testing.FakeResponse; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -50,7 +51,8 @@ class WipeOutDatastoreActionTest { private LaunchFlexTemplateResponse launchResponse = new LaunchFlexTemplateResponse().setJob(new Job()); - private FakeResponse response = new FakeResponse(); + private final FakeClock clock = new FakeClock(); + private final FakeResponse response = new FakeResponse(); @BeforeEach void beforeEach() throws Exception { @@ -67,7 +69,7 @@ class WipeOutDatastoreActionTest { void run_projectNotAllowed() { WipeoutDatastoreAction action = new WipeoutDatastoreAction( - "domain-registry", "us-central1", "gs://some-bucket", response, dataflow); + "domain-registry", "us-central1", "gs://some-bucket", clock, response, dataflow); action.run(); assertThat(response.getStatus()).isEqualTo(SC_FORBIDDEN); verifyNoInteractions(dataflow); @@ -77,7 +79,7 @@ class WipeOutDatastoreActionTest { void run_projectAllowed() throws Exception { WipeoutDatastoreAction action = new WipeoutDatastoreAction( - "domain-registry-qa", "us-central1", "gs://some-bucket", response, dataflow); + "domain-registry-qa", "us-central1", "gs://some-bucket", clock, response, dataflow); action.run(); assertThat(response.getStatus()).isEqualTo(SC_OK); verify(launch, times(1)).execute(); @@ -89,7 +91,7 @@ class WipeOutDatastoreActionTest { when(launch.execute()).thenThrow(new RuntimeException()); WipeoutDatastoreAction action = new WipeoutDatastoreAction( - "domain-registry-qa", "us-central1", "gs://some-bucket", response, dataflow); + "domain-registry-qa", "us-central1", "gs://some-bucket", clock, response, dataflow); action.run(); assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR); verify(launch, times(1)).execute(); diff --git a/core/src/test/java/google/registry/beam/initsql/WriteToSqlTest.java b/core/src/test/java/google/registry/beam/initsql/WriteToSqlTest.java deleted file mode 100644 index 6a4adb203..000000000 --- a/core/src/test/java/google/registry/beam/initsql/WriteToSqlTest.java +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright 2020 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.beam.initsql; - -import static com.google.common.truth.Truth.assertThat; -import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence; -import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; - -import com.google.appengine.api.datastore.Entity; -import com.google.common.collect.ImmutableList; -import google.registry.backup.VersionedEntity; -import google.registry.beam.TestPipelineExtension; -import google.registry.model.ImmutableObject; -import google.registry.model.contact.ContactResource; -import google.registry.model.ofy.Ofy; -import google.registry.model.registrar.Registrar; -import google.registry.persistence.transaction.JpaTestRules; -import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension; -import google.registry.testing.AppEngineExtension; -import google.registry.testing.DatabaseHelper; -import google.registry.testing.DatastoreEntityExtension; -import google.registry.testing.FakeClock; -import google.registry.testing.InjectExtension; -import java.io.Serializable; -import java.nio.file.Path; -import java.util.stream.Collectors; -import org.apache.beam.sdk.transforms.Create; -import org.joda.time.DateTime; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Order; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.junit.jupiter.api.io.TempDir; - -/** Unit test for {@link Transforms#writeToSql}. */ -class WriteToSqlTest implements Serializable { - - private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z"); - - private final FakeClock fakeClock = new FakeClock(START_TIME); - - @RegisterExtension - @Order(Order.DEFAULT - 1) - final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension(); - - @RegisterExtension final transient InjectExtension injectRule = new InjectExtension(); - - @RegisterExtension - final transient JpaIntegrationTestExtension database = - new JpaTestRules.Builder().withClock(fakeClock).buildIntegrationTestRule(); - - @SuppressWarnings("WeakerAccess") - @TempDir - transient Path tmpDir; - - @RegisterExtension - final transient TestPipelineExtension testPipeline = - TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); - - // Must not be transient! - @RegisterExtension - @Order(Order.DEFAULT + 1) - final BeamJpaExtension beamJpaExtension = - new BeamJpaExtension(() -> tmpDir.resolve("credential.dat"), database.getDatabase()); - - private ImmutableList contacts; - - @BeforeEach - void beforeEach() throws Exception { - try (BackupTestStore store = new BackupTestStore(fakeClock)) { - injectRule.setStaticField(Ofy.class, "clock", fakeClock); - - // Required for contacts created below. - Registrar ofyRegistrar = AppEngineExtension.makeRegistrar2(); - store.insertOrUpdate(ofyRegistrar); - jpaTm().transact(() -> jpaTm().put(store.loadAsOfyEntity(ofyRegistrar))); - - ImmutableList.Builder builder = new ImmutableList.Builder<>(); - - for (int i = 0; i < 3; i++) { - ContactResource contact = DatabaseHelper.newContactResource("contact_" + i); - store.insertOrUpdate(contact); - builder.add(store.loadAsDatastoreEntity(contact)); - } - contacts = builder.build(); - } - } - - @Test - void writeToSql_twoWriters() { - testPipeline - .apply( - Create.of( - contacts.stream() - .map(InitSqlTestUtils::entityToBytes) - .map(bytes -> VersionedEntity.from(0L, bytes)) - .collect(Collectors.toList()))) - .apply( - Transforms.writeToSql( - "ContactResource", - 2, - 4, - () -> - DaggerBeamJpaModule_JpaTransactionManagerComponent.builder() - .beamJpaModule(beamJpaExtension.getBeamJpaModule()) - .build() - .localDbJpaTransactionManager())); - testPipeline.run().waitUntilFinish(); - - ImmutableList sqlContacts = jpaTm().transact(() -> jpaTm().loadAllOf(ContactResource.class)); - assertThat(sqlContacts) - .comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp")) - .containsExactlyElementsIn( - contacts.stream() - .map(InitSqlTestUtils::datastoreToOfyEntity) - .map(ImmutableObject.class::cast) - .collect(ImmutableList.toImmutableList())); - } -} diff --git a/core/src/test/java/google/registry/beam/spec11/SafeBrowsingTransformsTest.java b/core/src/test/java/google/registry/beam/spec11/SafeBrowsingTransformsTest.java new file mode 100644 index 000000000..04a5dd81e --- /dev/null +++ b/core/src/test/java/google/registry/beam/spec11/SafeBrowsingTransformsTest.java @@ -0,0 +1,245 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.spec11; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.CharStreams; +import google.registry.beam.TestPipelineExtension; +import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn; +import google.registry.testing.FakeClock; +import google.registry.testing.FakeSleeper; +import google.registry.util.Retrier; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.http.ProtocolVersion; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.BasicHttpEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.message.BasicStatusLine; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.mockito.stubbing.Answer; + +/** Unit tests for {@link SafeBrowsingTransforms}. */ +@MockitoSettings(strictness = Strictness.STRICT_STUBS) +class SafeBrowsingTransformsTest { + + private static final ImmutableMap THREAT_MAP = + ImmutableMap.of( + "111.com", + "MALWARE", + "party-night.net", + "SOCIAL_ENGINEERING", + "bitcoin.bank", + "POTENTIALLY_HARMFUL_APPLICATION", + "no-email.com", + "THREAT_TYPE_UNSPECIFIED", + "anti-anti-anti-virus.dev", + "UNWANTED_SOFTWARE"); + + private static final String REPO_ID = "repoId"; + private static final String REGISTRAR_ID = "registrarID"; + private static final String REGISTRAR_EMAIL = "email@registrar.net"; + + private static ImmutableMap THREAT_MATCH_MAP; + + private final CloseableHttpClient mockHttpClient = + mock(CloseableHttpClient.class, withSettings().serializable()); + + private final EvaluateSafeBrowsingFn safeBrowsingFn = + new EvaluateSafeBrowsingFn( + "API_KEY", + new Retrier(new FakeSleeper(new FakeClock()), 1), + Suppliers.ofInstance(mockHttpClient)); + + @RegisterExtension + final TestPipelineExtension pipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); + + private static Subdomain createSubdomain(String url) { + return Subdomain.create(url, REPO_ID, REGISTRAR_ID, REGISTRAR_EMAIL); + } + + private KV getKv(String url) { + Subdomain subdomain = createSubdomain(url); + return KV.of(subdomain, THREAT_MATCH_MAP.get(subdomain)); + } + + @BeforeAll + static void beforeAll() { + ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); + THREAT_MAP + .entrySet() + .forEach( + kv -> + builder.put( + createSubdomain(kv.getKey()), ThreatMatch.create(kv.getValue(), kv.getKey()))); + THREAT_MATCH_MAP = builder.build(); + } + + @BeforeEach + void beforeEach() throws Exception { + when(mockHttpClient.execute(any(HttpPost.class))).thenAnswer(new HttpResponder()); + } + + @Test + void testSuccess_someBadDomains() throws Exception { + ImmutableList subdomains = + ImmutableList.of( + createSubdomain("111.com"), + createSubdomain("hooli.com"), + createSubdomain("party-night.net"), + createSubdomain("anti-anti-anti-virus.dev"), + createSubdomain("no-email.com")); + PCollection> threats = + pipeline + .apply(Create.of(subdomains).withCoder(SerializableCoder.of(Subdomain.class))) + .apply(ParDo.of(safeBrowsingFn)); + + PAssert.that(threats) + .containsInAnyOrder( + getKv("111.com"), + getKv("party-night.net"), + getKv("anti-anti-anti-virus.dev"), + getKv("no-email.com")); + pipeline.run().waitUntilFinish(); + } + + @Test + void testSuccess_noBadDomains() throws Exception { + ImmutableList subdomains = + ImmutableList.of( + createSubdomain("hello_kitty.dev"), + createSubdomain("555.com"), + createSubdomain("goodboy.net")); + PCollection> threats = + pipeline + .apply(Create.of(subdomains).withCoder(SerializableCoder.of(Subdomain.class))) + .apply(ParDo.of(safeBrowsingFn)); + + PAssert.that(threats).empty(); + pipeline.run().waitUntilFinish(); + } + + /** + * A serializable {@link Answer} that returns a mock HTTP response based on the HTTP request's + * content. + */ + private static class HttpResponder implements Answer, Serializable { + @Override + public CloseableHttpResponse answer(InvocationOnMock invocation) throws Throwable { + return getMockResponse( + CharStreams.toString( + new InputStreamReader( + ((HttpPost) invocation.getArguments()[0]).getEntity().getContent(), UTF_8))); + } + } + + /** + * Returns a {@link CloseableHttpResponse} containing either positive (threat found) or negative + * (no threat) API examples based on the request data. + */ + private static CloseableHttpResponse getMockResponse(String request) throws JSONException { + // Determine which bad URLs are in the request (if any) + ImmutableList badUrls = + THREAT_MAP.keySet().stream() + .filter(request::contains) + .collect(ImmutableList.toImmutableList()); + + CloseableHttpResponse httpResponse = + mock(CloseableHttpResponse.class, withSettings().serializable()); + when(httpResponse.getStatusLine()) + .thenReturn(new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 200, "Done")); + when(httpResponse.getEntity()).thenReturn(new FakeHttpEntity(getAPIResponse(badUrls))); + return httpResponse; + } + + /** + * Returns the expected API response for a list of bad URLs. + * + *

If there are no badUrls in the list, this returns the empty JSON string "{}". + */ + private static String getAPIResponse(ImmutableList badUrls) throws JSONException { + JSONObject response = new JSONObject(); + if (badUrls.isEmpty()) { + return response.toString(); + } + // Create a threatMatch for each badUrl + JSONArray matches = new JSONArray(); + for (String badUrl : badUrls) { + matches.put( + new JSONObject() + .put("threatType", THREAT_MAP.get(badUrl)) + .put("threat", new JSONObject().put("url", badUrl))); + } + response.put("matches", matches); + return response.toString(); + } + + /** A serializable HttpEntity fake that returns {@link String} content. */ + private static class FakeHttpEntity extends BasicHttpEntity implements Serializable { + + private static final long serialVersionUID = 105738294571L; + + private String content; + + private void writeObject(ObjectOutputStream oos) throws IOException { + oos.defaultWriteObject(); + } + + /** + * Sets the {@link FakeHttpEntity} content upon deserialization. + * + *

This allows us to use {@link #getContent()} as-is, fully emulating the behavior of {@link + * BasicHttpEntity} regardless of serialization. + */ + private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { + ois.defaultReadObject(); + super.setContent(new ByteArrayInputStream(this.content.getBytes(UTF_8))); + } + + FakeHttpEntity(String content) { + this.content = content; + super.setContent(new ByteArrayInputStream(this.content.getBytes(UTF_8))); + } + } +} diff --git a/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java b/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java index 20db45935..d30416519 100644 --- a/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java +++ b/core/src/test/java/google/registry/beam/spec11/Spec11PipelineTest.java @@ -14,361 +14,183 @@ package google.registry.beam.spec11; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.truth.Truth.assertThat; -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.withSettings; +import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; -import com.google.auth.oauth2.GoogleCredentials; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.io.CharStreams; +import com.google.common.collect.Streams; +import com.google.common.truth.Correspondence; +import com.google.common.truth.Correspondence.BinaryPredicate; import google.registry.beam.TestPipelineExtension; -import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn; import google.registry.model.reporting.Spec11ThreatMatch; import google.registry.model.reporting.Spec11ThreatMatch.ThreatType; -import google.registry.persistence.transaction.JpaTransactionManager; +import google.registry.model.reporting.Spec11ThreatMatchDao; +import google.registry.persistence.transaction.JpaTestRules; +import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension; +import google.registry.testing.DatastoreEntityExtension; import google.registry.testing.FakeClock; -import google.registry.testing.FakeSleeper; -import google.registry.util.GoogleCredentialsBundle; import google.registry.util.ResourceUtils; -import google.registry.util.Retrier; -import java.io.ByteArrayInputStream; import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Comparator; -import java.util.function.Supplier; -import org.apache.beam.runners.direct.DirectRunner; -import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.http.ProtocolVersion; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.BasicHttpEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.message.BasicStatusLine; -import org.joda.time.DateTime; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.LocalDate; -import org.joda.time.format.ISODateTimeFormat; -import org.json.JSONArray; -import org.json.JSONException; import org.json.JSONObject; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; -import org.mockito.Mock; -import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import org.mockito.stubbing.Answer; -/** Unit tests for {@link Spec11Pipeline}. */ +/** + * Unit tests for {@link Spec11Pipeline}. + * + *

Unfortunately there is no emulator for BigQuery like that for Datastore or App Engine. + * Therefore we cannot fully test the pipeline but only test the two separate sink IO functions, + * assuming that date is sourcede correctly the {@code BigQueryIO}. + */ @ExtendWith(MockitoExtension.class) -@MockitoSettings(strictness = Strictness.LENIENT) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) class Spec11PipelineTest { - private static class SaveNewThreatMatchAnswer implements Answer, Serializable { - @Override - public Void answer(InvocationOnMock invocation) { - Runnable runnable = invocation.getArgument(0, Runnable.class); - runnable.run(); - return null; - } - } + private static final String DATE = "2020-01-27"; + private static final String SAFE_BROWSING_API_KEY = "api-key"; + private static final String REPORTING_BUCKET_URL = "reporting_bucket"; - private static PipelineOptions pipelineOptions; + private static final ImmutableList SUBDOMAINS = + ImmutableList.of( + Subdomain.create("111.com", "123456789-COM", "hello-registrar", "email@hello.net"), + Subdomain.create("party-night.net", "2244AABBC-NET", "kitty-registrar", "contact@kit.ty"), + Subdomain.create("bitcoin.bank", "1C3D5E7F9-BANK", "hello-registrar", "email@hello.net"), + Subdomain.create("no-email.com", "2A4BA9BBC-COM", "kitty-registrar", "contact@kit.ty"), + Subdomain.create( + "anti-anti-anti-virus.dev", "555666888-DEV", "cool-registrar", "cool@aid.net")); - @Mock(serializable = true) - private static JpaTransactionManager mockJpaTm; + private static final ImmutableList THREAT_MATCHES = + ImmutableList.of( + ThreatMatch.create("MALWARE", "111.com"), + ThreatMatch.create("SOCIAL_ENGINEERING", "party-night.net"), + ThreatMatch.create("POTENTIALLY_HARMFUL_APPLICATION", "bitcoin.bank"), + ThreatMatch.create("THREAT_TYPE_UNSPECIFIED", "no-eamil.com"), + ThreatMatch.create("UNWANTED_SOFTWARE", "anti-anti-anti-virus.dev")); - @BeforeAll - static void beforeAll() { - pipelineOptions = PipelineOptionsFactory.create(); - pipelineOptions.setRunner(DirectRunner.class); - } + // This extension is only needed because Spec11ThreatMatch uses Ofy to generate the ID. Can be + // removed after the SQL migration. + @RegisterExtension + @Order(Order.DEFAULT - 1) + final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension(); + + @TempDir Path tmpDir; @RegisterExtension - final transient TestPipelineExtension testPipeline = - TestPipelineExtension.fromOptions(pipelineOptions); + final TestPipelineExtension pipeline = + TestPipelineExtension.create().enableAbandonedNodeEnforcement(true); - @SuppressWarnings("WeakerAccess") - @TempDir - Path tmpDir; + @RegisterExtension + final JpaIntegrationTestExtension database = + new JpaTestRules.Builder().withClock(new FakeClock()).buildIntegrationTestRule(); - private final Retrier retrier = - new Retrier(new FakeSleeper(new FakeClock(DateTime.parse("2019-07-15TZ"))), 1); - private Spec11Pipeline spec11Pipeline; + private final Spec11PipelineOptions options = + PipelineOptionsFactory.create().as(Spec11PipelineOptions.class); + + private File reportingBucketUrl; + private PCollection> threatMatches; @BeforeEach - void beforeEach() throws IOException { - String beamTempFolder = - Files.createDirectory(tmpDir.resolve("beam_temp")).toAbsolutePath().toString(); - - spec11Pipeline = - new Spec11Pipeline( - "test-project", - "region", - beamTempFolder + "/staging", - beamTempFolder + "/templates/invoicing", - tmpDir.toAbsolutePath().toString(), - () -> mockJpaTm, - GoogleCredentialsBundle.create(GoogleCredentials.create(null)), - retrier); - } - - private static final ImmutableList BAD_DOMAINS = - ImmutableList.of( - "111.com", "222.com", "444.com", "no-email.com", "testThreatMatchToSqlBad.com"); - - private ImmutableList getInputDomainsJson() { - ImmutableList.Builder subdomainsBuilder = new ImmutableList.Builder<>(); - // Put in at least 2 batches worth (x > 490) to guarantee multiple executions. - // Put in half for theRegistrar and half for someRegistrar - for (int i = 0; i < 255; i++) { - subdomainsBuilder.add( - Subdomain.create( - String.format("%s.com", i), "theDomain", "theRegistrar", "fake@theRegistrar.com")); - } - for (int i = 255; i < 510; i++) { - subdomainsBuilder.add( - Subdomain.create( - String.format("%s.com", i), "someDomain", "someRegistrar", "fake@someRegistrar.com")); - } - subdomainsBuilder.add(Subdomain.create("no-email.com", "fakeDomain", "noEmailRegistrar", "")); - return subdomainsBuilder.build(); - } - - /** - * Tests the end-to-end Spec11 pipeline with mocked out API calls. - * - *

We suppress the (Serializable & Supplier) dual-casted lambda warnings because the supplier - * produces an explicitly serializable mock, which is safe to cast. - */ - @Test - @SuppressWarnings("unchecked") - void testEndToEndPipeline_generatesExpectedFiles() throws Exception { - // Establish mocks for testing - ImmutableList inputRows = getInputDomainsJson(); - CloseableHttpClient mockHttpClient = - mock(CloseableHttpClient.class, withSettings().serializable()); - - // Return a mock HttpResponse that returns a JSON response based on the request. - when(mockHttpClient.execute(any(HttpPost.class))).thenAnswer(new HttpResponder()); - - EvaluateSafeBrowsingFn evalFn = - new EvaluateSafeBrowsingFn( - StaticValueProvider.of("apikey"), - new Retrier(new FakeSleeper(new FakeClock()), 3), - (Serializable & Supplier) () -> mockHttpClient); - - // Apply input and evaluation transforms - PCollection input = testPipeline.apply(Create.of(inputRows)); - spec11Pipeline.evaluateUrlHealth(input, evalFn, StaticValueProvider.of("2018-06-01")); - testPipeline.run(); - - // Verify header and 4 threat matches for 3 registrars are found - ImmutableList generatedReport = resultFileContents(); - assertThat(generatedReport).hasSize(4); - assertThat(generatedReport.get(0)) - .isEqualTo("Map from registrar email / name to detected subdomain threats:"); - - // The output file can put the registrar emails and bad URLs in any order. - // We cannot rely on the JSON toString to sort because the keys are not always in the same - // order, so we must rely on length even though that's not ideal. - ImmutableList sortedLines = - ImmutableList.sortedCopyOf( - Comparator.comparingInt(String::length), generatedReport.subList(1, 4)); - - JSONObject noEmailRegistrarJSON = new JSONObject(sortedLines.get(0)); - assertThat(noEmailRegistrarJSON.get("registrarEmailAddress")).isEqualTo(""); - assertThat(noEmailRegistrarJSON.get("registrarClientId")).isEqualTo("noEmailRegistrar"); - assertThat(noEmailRegistrarJSON.has("threatMatches")).isTrue(); - JSONArray noEmailThreatMatch = noEmailRegistrarJSON.getJSONArray("threatMatches"); - assertThat(noEmailThreatMatch.length()).isEqualTo(1); - assertThat(noEmailThreatMatch.getJSONObject(0).get("fullyQualifiedDomainName")) - .isEqualTo("no-email.com"); - assertThat(noEmailThreatMatch.getJSONObject(0).get("threatType")).isEqualTo("MALWARE"); - - JSONObject someRegistrarJSON = new JSONObject(sortedLines.get(1)); - assertThat(someRegistrarJSON.get("registrarEmailAddress")).isEqualTo("fake@someRegistrar.com"); - assertThat(someRegistrarJSON.get("registrarClientId")).isEqualTo("someRegistrar"); - assertThat(someRegistrarJSON.has("threatMatches")).isTrue(); - JSONArray someThreatMatch = someRegistrarJSON.getJSONArray("threatMatches"); - assertThat(someThreatMatch.length()).isEqualTo(1); - assertThat(someThreatMatch.getJSONObject(0).get("fullyQualifiedDomainName")) - .isEqualTo("444.com"); - assertThat(someThreatMatch.getJSONObject(0).get("threatType")).isEqualTo("MALWARE"); - - // theRegistrar has two ThreatMatches, we have to parse it explicitly - JSONObject theRegistrarJSON = new JSONObject(sortedLines.get(2)); - assertThat(theRegistrarJSON.get("registrarEmailAddress")).isEqualTo("fake@theRegistrar.com"); - assertThat(theRegistrarJSON.get("registrarClientId")).isEqualTo("theRegistrar"); - assertThat(theRegistrarJSON.has("threatMatches")).isTrue(); - JSONArray theThreatMatches = theRegistrarJSON.getJSONArray("threatMatches"); - assertThat(theThreatMatches.length()).isEqualTo(2); - ImmutableList threatMatchStrings = - ImmutableList.of( - theThreatMatches.getJSONObject(0).toString(), - theThreatMatches.getJSONObject(1).toString()); - assertThat(threatMatchStrings) - .containsExactly( - new JSONObject() - .put("fullyQualifiedDomainName", "111.com") - .put("threatType", "MALWARE") - .toString(), - new JSONObject() - .put("fullyQualifiedDomainName", "222.com") - .put("threatType", "MALWARE") - .toString()); + void beforeEach() throws Exception { + reportingBucketUrl = Files.createDirectory(tmpDir.resolve(REPORTING_BUCKET_URL)).toFile(); + options.setDate(DATE); + options.setSafeBrowsingApiKey(SAFE_BROWSING_API_KEY); + options.setReportingBucketUrl(reportingBucketUrl.getAbsolutePath()); + threatMatches = + pipeline.apply( + Create.of( + Streams.zip(SUBDOMAINS.stream(), THREAT_MATCHES.stream(), KV::of) + .collect(toImmutableList())) + .withCoder( + KvCoder.of( + SerializableCoder.of(Subdomain.class), + SerializableCoder.of(ThreatMatch.class)))); } @Test - @SuppressWarnings("unchecked") - public void testSpec11ThreatMatchToSql() throws Exception { - doAnswer(new SaveNewThreatMatchAnswer()).when(mockJpaTm).transact(any(Runnable.class)); - - // Create one bad and one good Subdomain to test with evaluateUrlHealth. Only the bad one should - // be detected and persisted. - Subdomain badDomain = - Subdomain.create( - "testThreatMatchToSqlBad.com", "theDomain", "theRegistrar", "fake@theRegistrar.com"); - Subdomain goodDomain = - Subdomain.create( - "testThreatMatchToSqlGood.com", - "someDomain", - "someRegistrar", - "fake@someRegistrar.com"); - - // Establish a mock HttpResponse that returns a JSON response based on the request. - CloseableHttpClient mockHttpClient = - mock(CloseableHttpClient.class, withSettings().serializable()); - when(mockHttpClient.execute(any(HttpPost.class))).thenAnswer(new HttpResponder()); - - EvaluateSafeBrowsingFn evalFn = - new EvaluateSafeBrowsingFn( - StaticValueProvider.of("apikey"), - new Retrier(new FakeSleeper(new FakeClock()), 3), - (Serializable & Supplier) () -> mockHttpClient); - - // Apply input and evaluation transforms - PCollection input = testPipeline.apply(Create.of(badDomain, goodDomain)); - spec11Pipeline.evaluateUrlHealth(input, evalFn, StaticValueProvider.of("2020-06-10")); - testPipeline.run(); - - // Verify that the expected threat created from the bad Subdomain and the persisted - // Spec11TThreatMatch are equal. - Spec11ThreatMatch expected = - new Spec11ThreatMatch() - .asBuilder() - .setThreatTypes(ImmutableSet.of(ThreatType.MALWARE)) - .setCheckDate(LocalDate.parse("2020-06-10", ISODateTimeFormat.date())) - .setDomainName(badDomain.domainName()) - .setDomainRepoId(badDomain.domainRepoId()) - .setRegistrarId(badDomain.registrarId()) - .build(); - - verify(mockJpaTm).transact(any(Runnable.class)); - verify(mockJpaTm).putAll(ImmutableList.of(expected)); + void testSuccess_saveToSql() { + ImmutableSet sqlThreatMatches = + ImmutableSet.of( + new Spec11ThreatMatch.Builder() + .setDomainName("111.com") + .setDomainRepoId("123456789-COM") + .setRegistrarId("hello-registrar") + .setCheckDate(new LocalDate(2020, 1, 27)) + .setThreatTypes(ImmutableSet.of(ThreatType.MALWARE)) + .build(), + new Spec11ThreatMatch.Builder() + .setDomainName("party-night.net") + .setDomainRepoId("2244AABBC-NET") + .setRegistrarId("kitty-registrar") + .setCheckDate(new LocalDate(2020, 1, 27)) + .setThreatTypes(ImmutableSet.of(ThreatType.SOCIAL_ENGINEERING)) + .build(), + new Spec11ThreatMatch.Builder() + .setDomainName("bitcoin.bank") + .setDomainRepoId("1C3D5E7F9-BANK") + .setRegistrarId("hello-registrar") + .setCheckDate(new LocalDate(2020, 1, 27)) + .setThreatTypes(ImmutableSet.of(ThreatType.POTENTIALLY_HARMFUL_APPLICATION)) + .build(), + new Spec11ThreatMatch.Builder() + .setDomainName("no-email.com") + .setDomainRepoId("2A4BA9BBC-COM") + .setRegistrarId("kitty-registrar") + .setCheckDate(new LocalDate(2020, 1, 27)) + .setThreatTypes(ImmutableSet.of(ThreatType.THREAT_TYPE_UNSPECIFIED)) + .build(), + new Spec11ThreatMatch.Builder() + .setDomainName("anti-anti-anti-virus.dev") + .setDomainRepoId("555666888-DEV") + .setRegistrarId("cool-registrar") + .setCheckDate(new LocalDate(2020, 1, 27)) + .setThreatTypes(ImmutableSet.of(ThreatType.UNWANTED_SOFTWARE)) + .build()); + Spec11Pipeline.saveToSql(threatMatches, options); + pipeline.run().waitUntilFinish(); + assertThat( + jpaTm() + .transact( + () -> + Spec11ThreatMatchDao.loadEntriesByDate( + jpaTm(), new LocalDate(2020, 1, 27)))) + .comparingElementsUsing(immutableObjectCorrespondence("id")) + .containsExactlyElementsIn(sqlThreatMatches); } - /** - * A serializable {@link Answer} that returns a mock HTTP response based on the HTTP request's - * content. - */ - private static class HttpResponder implements Answer, Serializable { - @Override - public CloseableHttpResponse answer(InvocationOnMock invocation) throws Throwable { - return getMockResponse( - CharStreams.toString( - new InputStreamReader( - ((HttpPost) invocation.getArguments()[0]).getEntity().getContent(), UTF_8))); - } - } - - /** - * Returns a {@link CloseableHttpResponse} containing either positive (threat found) or negative - * (no threat) API examples based on the request data. - */ - private static CloseableHttpResponse getMockResponse(String request) throws JSONException { - // Determine which bad URLs are in the request (if any) - ImmutableList badUrls = - BAD_DOMAINS.stream().filter(request::contains).collect(ImmutableList.toImmutableList()); - - CloseableHttpResponse httpResponse = - mock(CloseableHttpResponse.class, withSettings().serializable()); - when(httpResponse.getStatusLine()) - .thenReturn(new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 200, "Done")); - when(httpResponse.getEntity()).thenReturn(new FakeHttpEntity(getAPIResponse(badUrls))); - return httpResponse; - } - - /** - * Returns the expected API response for a list of bad URLs. - * - *

If there are no badUrls in the list, this returns the empty JSON string "{}". - */ - private static String getAPIResponse(ImmutableList badUrls) throws JSONException { - JSONObject response = new JSONObject(); - if (badUrls.isEmpty()) { - return response.toString(); - } - // Create a threatMatch for each badUrl - JSONArray matches = new JSONArray(); - for (String badUrl : badUrls) { - matches.put( - new JSONObject() - .put("threatType", "MALWARE") - .put("platformType", "WINDOWS") - .put("threatEntryType", "URL") - .put("threat", new JSONObject().put("url", badUrl)) - .put("cacheDuration", "300.000s")); - } - response.put("matches", matches); - return response.toString(); - } - - /** A serializable HttpEntity fake that returns {@link String} content. */ - private static class FakeHttpEntity extends BasicHttpEntity implements Serializable { - - private static final long serialVersionUID = 105738294571L; - - private String content; - - private void writeObject(ObjectOutputStream oos) throws IOException { - oos.defaultWriteObject(); - } - - /** - * Sets the {@link FakeHttpEntity} content upon deserialization. - * - *

This allows us to use {@link #getContent()} as-is, fully emulating the behavior of {@link - * BasicHttpEntity} regardless of serialization. - */ - private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { - ois.defaultReadObject(); - super.setContent(new ByteArrayInputStream(this.content.getBytes(UTF_8))); - } - - FakeHttpEntity(String content) { - this.content = content; - super.setContent(new ByteArrayInputStream(this.content.getBytes(UTF_8))); - } + @Test + void testSuccess_saveToGcs() throws Exception { + ImmutableList expectedFileContents = + ImmutableList.copyOf( + ResourceUtils.readResourceUtf8(this.getClass(), "test_output.txt").split("\n")); + Spec11Pipeline.saveToGcs(threatMatches, options); + pipeline.run().waitUntilFinish(); + ImmutableList resultFileContents = resultFileContents(); + assertThat(resultFileContents.size()).isEqualTo(expectedFileContents.size()); + assertThat(resultFileContents.get(0)).isEqualTo(expectedFileContents.get(0)); + assertThat(resultFileContents.subList(1, resultFileContents.size())) + .comparingElementsUsing( + Correspondence.from( + new ThreatMatchJsonPredicate(), "has fields with unordered threatTypes equal to")) + .containsExactlyElementsIn(expectedFileContents.subList(1, expectedFileContents.size())); } /** Returns the text contents of a file under the beamBucket/results directory. */ @@ -376,9 +198,34 @@ class Spec11PipelineTest { File resultFile = new File( String.format( - "%s/icann/spec11/2018-06/SPEC11_MONTHLY_REPORT_2018-06-01", - tmpDir.toAbsolutePath().toString())); + "%s/icann/spec11/2020-01/SPEC11_MONTHLY_REPORT_2020-01-27", + reportingBucketUrl.getAbsolutePath().toString())); return ImmutableList.copyOf( ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n")); } + + private static class ThreatMatchJsonPredicate implements BinaryPredicate { + private static final String THREAT_MATCHES = "threatMatches"; + + @Override + public boolean apply(@Nullable String actual, @Nullable String expected) { + JSONObject actualJson = new JSONObject(actual); + JSONObject expectedJson = new JSONObject(expected); + if (!actualJson.keySet().equals(expectedJson.keySet())) { + return false; + } + for (String key : actualJson.keySet()) { + if (key.equals(THREAT_MATCHES)) { + if (ImmutableSet.copyOf(actualJson.getJSONArray(key)) + .equals(ImmutableSet.copyOf(expectedJson.getJSONArray(key)))) { + return false; + } + } else if (!actualJson.get(key).equals(expectedJson.get(key))) { + return false; + } + } + ; + return true; + } + } } diff --git a/core/src/test/java/google/registry/reporting/spec11/GenerateSpec11ReportActionTest.java b/core/src/test/java/google/registry/reporting/spec11/GenerateSpec11ReportActionTest.java index d2ff9226c..06760fbac 100644 --- a/core/src/test/java/google/registry/reporting/spec11/GenerateSpec11ReportActionTest.java +++ b/core/src/test/java/google/registry/reporting/spec11/GenerateSpec11ReportActionTest.java @@ -15,95 +15,102 @@ package google.registry.reporting.spec11; import static com.google.common.truth.Truth.assertThat; +import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued; import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; +import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; +import static org.apache.http.HttpStatus.SC_OK; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.Dataflow.Projects; -import com.google.api.services.dataflow.Dataflow.Projects.Templates; -import com.google.api.services.dataflow.Dataflow.Projects.Templates.Launch; +import com.google.api.services.dataflow.Dataflow.Projects.Locations; +import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates; +import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates.Launch; import com.google.api.services.dataflow.model.Job; -import com.google.api.services.dataflow.model.LaunchTemplateParameters; -import com.google.api.services.dataflow.model.LaunchTemplateResponse; -import com.google.api.services.dataflow.model.RuntimeEnvironment; -import com.google.common.collect.ImmutableMap; +import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; +import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; import com.google.common.net.MediaType; import google.registry.testing.AppEngineExtension; +import google.registry.testing.FakeClock; import google.registry.testing.FakeResponse; import google.registry.testing.TaskQueueHelper.TaskMatcher; import java.io.IOException; -import org.joda.time.LocalDate; +import org.joda.time.DateTime; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; -/** Unit tests for {@link google.registry.reporting.spec11.GenerateSpec11ReportAction}. */ +/** Unit tests for {@link GenerateSpec11ReportAction}. */ +@MockitoSettings(strictness = Strictness.STRICT_STUBS) class GenerateSpec11ReportActionTest { @RegisterExtension final AppEngineExtension appEngine = AppEngineExtension.builder().withTaskQueue().build(); - private FakeResponse response; - private Dataflow dataflow; - private Projects dataflowProjects; - private Templates dataflowTemplates; - private Launch dataflowLaunch; + private FakeResponse response = new FakeResponse(); + private Dataflow dataflow = mock(Dataflow.class); + private Projects projects = mock(Projects.class); + private Locations locations = mock(Locations.class); + private FlexTemplates templates = mock(FlexTemplates.class); + private Launch launch = mock(Launch.class); + private LaunchFlexTemplateResponse launchResponse = + new LaunchFlexTemplateResponse().setJob(new Job().setId("jobid")); + private final FakeClock clock = new FakeClock(DateTime.parse("2018-06-11T12:23:56Z")); private GenerateSpec11ReportAction action; @BeforeEach void beforeEach() throws IOException { - response = new FakeResponse(); - dataflow = mock(Dataflow.class); - - // Establish the Dataflow API call chain - dataflow = mock(Dataflow.class); - dataflowProjects = mock(Dataflow.Projects.class); - dataflowTemplates = mock(Templates.class); - dataflowLaunch = mock(Launch.class); - LaunchTemplateResponse launchTemplateResponse = new LaunchTemplateResponse(); - // Ultimately we get back this job response with a given id. - launchTemplateResponse.setJob(new Job().setId("jobid")); - when(dataflow.projects()).thenReturn(dataflowProjects); - when(dataflowProjects.templates()).thenReturn(dataflowTemplates); - when(dataflowTemplates.launch(any(String.class), any(LaunchTemplateParameters.class))) - .thenReturn(dataflowLaunch); - when(dataflowLaunch.setGcsPath(any(String.class))).thenReturn(dataflowLaunch); - when(dataflowLaunch.execute()).thenReturn(launchTemplateResponse); + when(dataflow.projects()).thenReturn(projects); + when(projects.locations()).thenReturn(locations); + when(locations.flexTemplates()).thenReturn(templates); + when(templates.launch(anyString(), anyString(), any(LaunchFlexTemplateRequest.class))) + .thenReturn(launch); + when(launch.execute()).thenReturn(launchResponse); } @Test - void testLaunch_success() throws IOException { + void testFailure_dataflowFailure() throws IOException { action = new GenerateSpec11ReportAction( - "test", - "gs://my-bucket-beam", - "gs://template", + "test-project", "us-east1-c", + "gs://staging-project/staging-bucket/", + "gs://reporting-project/reporting-bucket/", "api_key/a", - new LocalDate(2018, 6, 11), + clock.nowUtc().toLocalDate(), + clock, + response, + dataflow); + when(launch.execute()).thenThrow(new IOException("Dataflow failure")); + action.run(); + assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR); + assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); + assertThat(response.getPayload()).contains("Dataflow failure"); + assertNoTasksEnqueued("beam-reporting"); + } + + @Test + void testSuccess() throws IOException { + action = + new GenerateSpec11ReportAction( + "test-project", + "us-east1-c", + "gs://staging-project/staging-bucket/", + "gs://reporting-project/reporting-bucket/", + "api_key/a", + clock.nowUtc().toLocalDate(), + clock, response, dataflow); action.run(); - - LaunchTemplateParameters expectedLaunchTemplateParameters = - new LaunchTemplateParameters() - .setJobName("spec11_2018-06-11") - .setEnvironment( - new RuntimeEnvironment() - .setZone("us-east1-c") - .setTempLocation("gs://my-bucket-beam/temporary")) - .setParameters( - ImmutableMap.of("safeBrowsingApiKey", "api_key/a", "date", "2018-06-11")); - verify(dataflowTemplates).launch("test", expectedLaunchTemplateParameters); - verify(dataflowLaunch).setGcsPath("gs://template"); - assertThat(response.getStatus()).isEqualTo(200); + assertThat(response.getStatus()).isEqualTo(SC_OK); assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); - assertThat(response.getPayload()).isEqualTo("Launched Spec11 dataflow template."); - TaskMatcher matcher = new TaskMatcher() .url("/_dr/task/publishSpec11") diff --git a/core/src/test/java/google/registry/reporting/spec11/PublishSpec11ReportActionTest.java b/core/src/test/java/google/registry/reporting/spec11/PublishSpec11ReportActionTest.java index aba11bdf7..5db85d41b 100644 --- a/core/src/test/java/google/registry/reporting/spec11/PublishSpec11ReportActionTest.java +++ b/core/src/test/java/google/registry/reporting/spec11/PublishSpec11ReportActionTest.java @@ -29,8 +29,9 @@ import static org.mockito.Mockito.when; import com.google.api.services.dataflow.Dataflow; import com.google.api.services.dataflow.Dataflow.Projects; -import com.google.api.services.dataflow.Dataflow.Projects.Jobs; -import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get; +import com.google.api.services.dataflow.Dataflow.Projects.Locations; +import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs; +import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Get; import com.google.api.services.dataflow.model.Job; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -51,6 +52,7 @@ class PublishSpec11ReportActionTest { private Dataflow dataflow; private Projects projects; + private Locations locations; private Jobs jobs; private Get get; private Spec11EmailUtils emailUtils; @@ -64,11 +66,13 @@ class PublishSpec11ReportActionTest { void beforeEach() throws Exception { dataflow = mock(Dataflow.class); projects = mock(Projects.class); + locations = mock(Locations.class); jobs = mock(Jobs.class); get = mock(Get.class); when(dataflow.projects()).thenReturn(projects); - when(projects.jobs()).thenReturn(jobs); - when(jobs.get("test-project", "12345")).thenReturn(get); + when(projects.locations()).thenReturn(locations); + when(locations.jobs()).thenReturn(jobs); + when(jobs.get("test-project", "test-region", "12345")).thenReturn(get); expectedJob = new Job(); when(get.execute()).thenReturn(expectedJob); emailUtils = mock(Spec11EmailUtils.class); @@ -78,6 +82,7 @@ class PublishSpec11ReportActionTest { publishAction = new PublishSpec11ReportAction( "test-project", + "test-region", "Super Cool Registry", "12345", emailUtils, @@ -95,6 +100,7 @@ class PublishSpec11ReportActionTest { publishAction = new PublishSpec11ReportAction( "test-project", + "test-region", "Super Cool Registry", "12345", emailUtils, diff --git a/core/src/test/resources/google/registry/beam/spec11/test_output.txt b/core/src/test/resources/google/registry/beam/spec11/test_output.txt new file mode 100644 index 000000000..f572d694b --- /dev/null +++ b/core/src/test/resources/google/registry/beam/spec11/test_output.txt @@ -0,0 +1,4 @@ +Map from registrar email / name to detected subdomain threats: +{"threatMatches":[{"threatType":"UNWANTED_SOFTWARE","fullyQualifiedDomainName":"anti-anti-anti-virus.dev"}],"registrarClientId":"cool-registrar","registrarEmailAddress":"cool@aid.net"} +{"threatMatches":[{"threatType":"MALWARE","fullyQualifiedDomainName":"111.com"},{"threatType":"POTENTIALLY_HARMFUL_APPLICATION","fullyQualifiedDomainName":"bitcoin.bank"}],"registrarClientId":"hello-registrar","registrarEmailAddress":"email@hello.net"} +{"threatMatches":[{"threatType":"THREAT_TYPE_UNSPECIFIED","fullyQualifiedDomainName":"no-eamil.com"},{"threatType":"SOCIAL_ENGINEERING","fullyQualifiedDomainName":"party-night.net"}],"registrarClientId":"kitty-registrar","registrarEmailAddress":"contact@kit.ty"} \ No newline at end of file diff --git a/release/cloudbuild-deploy.yaml b/release/cloudbuild-deploy.yaml index cc93f89d6..e5cc57634 100644 --- a/release/cloudbuild-deploy.yaml +++ b/release/cloudbuild-deploy.yaml @@ -37,24 +37,6 @@ steps: cat tool-credential.json.enc | base64 -d | gcloud kms decrypt \ --ciphertext-file=- --plaintext-file=tool-credential.json \ --location=global --keyring=nomulus-tool-keyring --key=nomulus-tool-key -# Deploy the Spec11 pipeline to GCS. -#- name: 'gcr.io/$PROJECT_ID/builder:latest' -# entrypoint: /bin/bash -# args: -# - -c -# - | -# set -e -# if [ ${_ENV} == production ]; then -# project_id="domain-registry" -# else -# project_id="domain-registry-${_ENV}" -# fi -# echo "gs://$${project_id}-beam/cloudsql/admin_credential.enc" -# gsutil cp gs://$PROJECT_ID-deploy/${TAG_NAME}/nomulus.jar . -# java -jar nomulus.jar -e ${_ENV} --credential tool-credential.json \ -# --sql_access_info \ -# "gs://$${project_id}-beam/cloudsql/admin_credential.enc" \ -# deploy_spec11_pipeline --project $${project_id} # Deploy the invoicing pipeline to GCS. - name: 'gcr.io/$PROJECT_ID/nomulus-tool:latest' args: diff --git a/release/cloudbuild-nomulus.yaml b/release/cloudbuild-nomulus.yaml index 3546cf61a..6ab672023 100644 --- a/release/cloudbuild-nomulus.yaml +++ b/release/cloudbuild-nomulus.yaml @@ -73,7 +73,7 @@ steps: # Build and package the deployment files for production. - name: 'gcr.io/${PROJECT_ID}/builder:latest' args: ['release/build_nomulus_for_env.sh', 'production', 'output'] -# Build and stage init_sql_pipeline +# Build and stage Dataflow Flex templates. - name: 'gcr.io/${PROJECT_ID}/builder:latest' entrypoint: /bin/bash # Set home for Gradle caches. Must be consistent with previous steps above @@ -89,7 +89,9 @@ steps: google.registry.beam.initsql.InitSqlPipeline \ google/registry/beam/init_sql_pipeline_metadata.json \ google.registry.beam.datastore.BulkDeleteDatastorePipeline \ - google/registry/beam/bulk_delete_datastore_pipeline_metadata.json + google/registry/beam/bulk_delete_datastore_pipeline_metadata.json \ + google.registry.beam.spec11.Spec11Pipeline \ + google/registry/beam/spec11_pipeline_metadata.json # Tentatively build and publish Cloud SQL schema jar here, before schema release # process is finalized. Also publish nomulus:core jars that are needed for # server/schema compatibility tests.