mirror of
https://github.com/google/nomulus.git
synced 2025-07-08 20:23:24 +02:00
Migrate Spec11 pipeline to flex template (#1073)
* Migrate Spec11 pipeline to flex template Unfortunately this PR has turned out to be much bigger than I initially conceived. However this is no good way to separate it out because the changes are intertwined. This PR includes 3 main changes: 1. Change the spec11 pipline to use Dataflow Flex Template. 2. Retire the use of the old JPA layer that relies on credential saved in KMS. 3. Some extensive refactoring to streamline the logic and improve test isolation. * Fix job name and remove projectId from options * Add parameter logs * Set RegistryEnvironment * Remove logging and modify safe browsing API key regex * Rename a test method and rebase * Remove unused Junit extension * Specify job region
This commit is contained in:
parent
a0995fa0eb
commit
464f9aed1f
24 changed files with 789 additions and 981 deletions
|
@ -31,6 +31,7 @@ import com.google.api.services.dataflow.Dataflow;
|
|||
import com.google.api.services.dataflow.model.Job;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
|
||||
import google.registry.testing.FakeClock;
|
||||
import google.registry.testing.FakeResponse;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -50,7 +51,8 @@ class WipeOutDatastoreActionTest {
|
|||
private LaunchFlexTemplateResponse launchResponse =
|
||||
new LaunchFlexTemplateResponse().setJob(new Job());
|
||||
|
||||
private FakeResponse response = new FakeResponse();
|
||||
private final FakeClock clock = new FakeClock();
|
||||
private final FakeResponse response = new FakeResponse();
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws Exception {
|
||||
|
@ -67,7 +69,7 @@ class WipeOutDatastoreActionTest {
|
|||
void run_projectNotAllowed() {
|
||||
WipeoutDatastoreAction action =
|
||||
new WipeoutDatastoreAction(
|
||||
"domain-registry", "us-central1", "gs://some-bucket", response, dataflow);
|
||||
"domain-registry", "us-central1", "gs://some-bucket", clock, response, dataflow);
|
||||
action.run();
|
||||
assertThat(response.getStatus()).isEqualTo(SC_FORBIDDEN);
|
||||
verifyNoInteractions(dataflow);
|
||||
|
@ -77,7 +79,7 @@ class WipeOutDatastoreActionTest {
|
|||
void run_projectAllowed() throws Exception {
|
||||
WipeoutDatastoreAction action =
|
||||
new WipeoutDatastoreAction(
|
||||
"domain-registry-qa", "us-central1", "gs://some-bucket", response, dataflow);
|
||||
"domain-registry-qa", "us-central1", "gs://some-bucket", clock, response, dataflow);
|
||||
action.run();
|
||||
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||
verify(launch, times(1)).execute();
|
||||
|
@ -89,7 +91,7 @@ class WipeOutDatastoreActionTest {
|
|||
when(launch.execute()).thenThrow(new RuntimeException());
|
||||
WipeoutDatastoreAction action =
|
||||
new WipeoutDatastoreAction(
|
||||
"domain-registry-qa", "us-central1", "gs://some-bucket", response, dataflow);
|
||||
"domain-registry-qa", "us-central1", "gs://some-bucket", clock, response, dataflow);
|
||||
action.run();
|
||||
assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR);
|
||||
verify(launch, times(1)).execute();
|
||||
|
|
|
@ -1,131 +0,0 @@
|
|||
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.initsql;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
|
||||
|
||||
import com.google.appengine.api.datastore.Entity;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import google.registry.backup.VersionedEntity;
|
||||
import google.registry.beam.TestPipelineExtension;
|
||||
import google.registry.model.ImmutableObject;
|
||||
import google.registry.model.contact.ContactResource;
|
||||
import google.registry.model.ofy.Ofy;
|
||||
import google.registry.model.registrar.Registrar;
|
||||
import google.registry.persistence.transaction.JpaTestRules;
|
||||
import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension;
|
||||
import google.registry.testing.AppEngineExtension;
|
||||
import google.registry.testing.DatabaseHelper;
|
||||
import google.registry.testing.DatastoreEntityExtension;
|
||||
import google.registry.testing.FakeClock;
|
||||
import google.registry.testing.InjectExtension;
|
||||
import java.io.Serializable;
|
||||
import java.nio.file.Path;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.beam.sdk.transforms.Create;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Order;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
/** Unit test for {@link Transforms#writeToSql}. */
|
||||
class WriteToSqlTest implements Serializable {
|
||||
|
||||
private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
|
||||
|
||||
private final FakeClock fakeClock = new FakeClock(START_TIME);
|
||||
|
||||
@RegisterExtension
|
||||
@Order(Order.DEFAULT - 1)
|
||||
final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension();
|
||||
|
||||
@RegisterExtension final transient InjectExtension injectRule = new InjectExtension();
|
||||
|
||||
@RegisterExtension
|
||||
final transient JpaIntegrationTestExtension database =
|
||||
new JpaTestRules.Builder().withClock(fakeClock).buildIntegrationTestRule();
|
||||
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
@TempDir
|
||||
transient Path tmpDir;
|
||||
|
||||
@RegisterExtension
|
||||
final transient TestPipelineExtension testPipeline =
|
||||
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
|
||||
|
||||
// Must not be transient!
|
||||
@RegisterExtension
|
||||
@Order(Order.DEFAULT + 1)
|
||||
final BeamJpaExtension beamJpaExtension =
|
||||
new BeamJpaExtension(() -> tmpDir.resolve("credential.dat"), database.getDatabase());
|
||||
|
||||
private ImmutableList<Entity> contacts;
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws Exception {
|
||||
try (BackupTestStore store = new BackupTestStore(fakeClock)) {
|
||||
injectRule.setStaticField(Ofy.class, "clock", fakeClock);
|
||||
|
||||
// Required for contacts created below.
|
||||
Registrar ofyRegistrar = AppEngineExtension.makeRegistrar2();
|
||||
store.insertOrUpdate(ofyRegistrar);
|
||||
jpaTm().transact(() -> jpaTm().put(store.loadAsOfyEntity(ofyRegistrar)));
|
||||
|
||||
ImmutableList.Builder<Entity> builder = new ImmutableList.Builder<>();
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
ContactResource contact = DatabaseHelper.newContactResource("contact_" + i);
|
||||
store.insertOrUpdate(contact);
|
||||
builder.add(store.loadAsDatastoreEntity(contact));
|
||||
}
|
||||
contacts = builder.build();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void writeToSql_twoWriters() {
|
||||
testPipeline
|
||||
.apply(
|
||||
Create.of(
|
||||
contacts.stream()
|
||||
.map(InitSqlTestUtils::entityToBytes)
|
||||
.map(bytes -> VersionedEntity.from(0L, bytes))
|
||||
.collect(Collectors.toList())))
|
||||
.apply(
|
||||
Transforms.writeToSql(
|
||||
"ContactResource",
|
||||
2,
|
||||
4,
|
||||
() ->
|
||||
DaggerBeamJpaModule_JpaTransactionManagerComponent.builder()
|
||||
.beamJpaModule(beamJpaExtension.getBeamJpaModule())
|
||||
.build()
|
||||
.localDbJpaTransactionManager()));
|
||||
testPipeline.run().waitUntilFinish();
|
||||
|
||||
ImmutableList<?> sqlContacts = jpaTm().transact(() -> jpaTm().loadAllOf(ContactResource.class));
|
||||
assertThat(sqlContacts)
|
||||
.comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp"))
|
||||
.containsExactlyElementsIn(
|
||||
contacts.stream()
|
||||
.map(InitSqlTestUtils::datastoreToOfyEntity)
|
||||
.map(ImmutableObject.class::cast)
|
||||
.collect(ImmutableList.toImmutableList()));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,245 @@
|
|||
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.beam.spec11;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.withSettings;
|
||||
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.io.CharStreams;
|
||||
import google.registry.beam.TestPipelineExtension;
|
||||
import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn;
|
||||
import google.registry.testing.FakeClock;
|
||||
import google.registry.testing.FakeSleeper;
|
||||
import google.registry.util.Retrier;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.Serializable;
|
||||
import org.apache.beam.sdk.coders.SerializableCoder;
|
||||
import org.apache.beam.sdk.testing.PAssert;
|
||||
import org.apache.beam.sdk.transforms.Create;
|
||||
import org.apache.beam.sdk.transforms.ParDo;
|
||||
import org.apache.beam.sdk.values.KV;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.apache.http.ProtocolVersion;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.entity.BasicHttpEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.message.BasicStatusLine;
|
||||
import org.json.JSONArray;
|
||||
import org.json.JSONException;
|
||||
import org.json.JSONObject;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.junit.jupiter.MockitoSettings;
|
||||
import org.mockito.quality.Strictness;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
/** Unit tests for {@link SafeBrowsingTransforms}. */
|
||||
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
|
||||
class SafeBrowsingTransformsTest {
|
||||
|
||||
private static final ImmutableMap<String, String> THREAT_MAP =
|
||||
ImmutableMap.of(
|
||||
"111.com",
|
||||
"MALWARE",
|
||||
"party-night.net",
|
||||
"SOCIAL_ENGINEERING",
|
||||
"bitcoin.bank",
|
||||
"POTENTIALLY_HARMFUL_APPLICATION",
|
||||
"no-email.com",
|
||||
"THREAT_TYPE_UNSPECIFIED",
|
||||
"anti-anti-anti-virus.dev",
|
||||
"UNWANTED_SOFTWARE");
|
||||
|
||||
private static final String REPO_ID = "repoId";
|
||||
private static final String REGISTRAR_ID = "registrarID";
|
||||
private static final String REGISTRAR_EMAIL = "email@registrar.net";
|
||||
|
||||
private static ImmutableMap<Subdomain, ThreatMatch> THREAT_MATCH_MAP;
|
||||
|
||||
private final CloseableHttpClient mockHttpClient =
|
||||
mock(CloseableHttpClient.class, withSettings().serializable());
|
||||
|
||||
private final EvaluateSafeBrowsingFn safeBrowsingFn =
|
||||
new EvaluateSafeBrowsingFn(
|
||||
"API_KEY",
|
||||
new Retrier(new FakeSleeper(new FakeClock()), 1),
|
||||
Suppliers.ofInstance(mockHttpClient));
|
||||
|
||||
@RegisterExtension
|
||||
final TestPipelineExtension pipeline =
|
||||
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
|
||||
|
||||
private static Subdomain createSubdomain(String url) {
|
||||
return Subdomain.create(url, REPO_ID, REGISTRAR_ID, REGISTRAR_EMAIL);
|
||||
}
|
||||
|
||||
private KV<Subdomain, ThreatMatch> getKv(String url) {
|
||||
Subdomain subdomain = createSubdomain(url);
|
||||
return KV.of(subdomain, THREAT_MATCH_MAP.get(subdomain));
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
static void beforeAll() {
|
||||
ImmutableMap.Builder<Subdomain, ThreatMatch> builder = new ImmutableMap.Builder<>();
|
||||
THREAT_MAP
|
||||
.entrySet()
|
||||
.forEach(
|
||||
kv ->
|
||||
builder.put(
|
||||
createSubdomain(kv.getKey()), ThreatMatch.create(kv.getValue(), kv.getKey())));
|
||||
THREAT_MATCH_MAP = builder.build();
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws Exception {
|
||||
when(mockHttpClient.execute(any(HttpPost.class))).thenAnswer(new HttpResponder());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess_someBadDomains() throws Exception {
|
||||
ImmutableList<Subdomain> subdomains =
|
||||
ImmutableList.of(
|
||||
createSubdomain("111.com"),
|
||||
createSubdomain("hooli.com"),
|
||||
createSubdomain("party-night.net"),
|
||||
createSubdomain("anti-anti-anti-virus.dev"),
|
||||
createSubdomain("no-email.com"));
|
||||
PCollection<KV<Subdomain, ThreatMatch>> threats =
|
||||
pipeline
|
||||
.apply(Create.of(subdomains).withCoder(SerializableCoder.of(Subdomain.class)))
|
||||
.apply(ParDo.of(safeBrowsingFn));
|
||||
|
||||
PAssert.that(threats)
|
||||
.containsInAnyOrder(
|
||||
getKv("111.com"),
|
||||
getKv("party-night.net"),
|
||||
getKv("anti-anti-anti-virus.dev"),
|
||||
getKv("no-email.com"));
|
||||
pipeline.run().waitUntilFinish();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess_noBadDomains() throws Exception {
|
||||
ImmutableList<Subdomain> subdomains =
|
||||
ImmutableList.of(
|
||||
createSubdomain("hello_kitty.dev"),
|
||||
createSubdomain("555.com"),
|
||||
createSubdomain("goodboy.net"));
|
||||
PCollection<KV<Subdomain, ThreatMatch>> threats =
|
||||
pipeline
|
||||
.apply(Create.of(subdomains).withCoder(SerializableCoder.of(Subdomain.class)))
|
||||
.apply(ParDo.of(safeBrowsingFn));
|
||||
|
||||
PAssert.that(threats).empty();
|
||||
pipeline.run().waitUntilFinish();
|
||||
}
|
||||
|
||||
/**
|
||||
* A serializable {@link Answer} that returns a mock HTTP response based on the HTTP request's
|
||||
* content.
|
||||
*/
|
||||
private static class HttpResponder implements Answer<CloseableHttpResponse>, Serializable {
|
||||
@Override
|
||||
public CloseableHttpResponse answer(InvocationOnMock invocation) throws Throwable {
|
||||
return getMockResponse(
|
||||
CharStreams.toString(
|
||||
new InputStreamReader(
|
||||
((HttpPost) invocation.getArguments()[0]).getEntity().getContent(), UTF_8)));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link CloseableHttpResponse} containing either positive (threat found) or negative
|
||||
* (no threat) API examples based on the request data.
|
||||
*/
|
||||
private static CloseableHttpResponse getMockResponse(String request) throws JSONException {
|
||||
// Determine which bad URLs are in the request (if any)
|
||||
ImmutableList<String> badUrls =
|
||||
THREAT_MAP.keySet().stream()
|
||||
.filter(request::contains)
|
||||
.collect(ImmutableList.toImmutableList());
|
||||
|
||||
CloseableHttpResponse httpResponse =
|
||||
mock(CloseableHttpResponse.class, withSettings().serializable());
|
||||
when(httpResponse.getStatusLine())
|
||||
.thenReturn(new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 200, "Done"));
|
||||
when(httpResponse.getEntity()).thenReturn(new FakeHttpEntity(getAPIResponse(badUrls)));
|
||||
return httpResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the expected API response for a list of bad URLs.
|
||||
*
|
||||
* <p>If there are no badUrls in the list, this returns the empty JSON string "{}".
|
||||
*/
|
||||
private static String getAPIResponse(ImmutableList<String> badUrls) throws JSONException {
|
||||
JSONObject response = new JSONObject();
|
||||
if (badUrls.isEmpty()) {
|
||||
return response.toString();
|
||||
}
|
||||
// Create a threatMatch for each badUrl
|
||||
JSONArray matches = new JSONArray();
|
||||
for (String badUrl : badUrls) {
|
||||
matches.put(
|
||||
new JSONObject()
|
||||
.put("threatType", THREAT_MAP.get(badUrl))
|
||||
.put("threat", new JSONObject().put("url", badUrl)));
|
||||
}
|
||||
response.put("matches", matches);
|
||||
return response.toString();
|
||||
}
|
||||
|
||||
/** A serializable HttpEntity fake that returns {@link String} content. */
|
||||
private static class FakeHttpEntity extends BasicHttpEntity implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 105738294571L;
|
||||
|
||||
private String content;
|
||||
|
||||
private void writeObject(ObjectOutputStream oos) throws IOException {
|
||||
oos.defaultWriteObject();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the {@link FakeHttpEntity} content upon deserialization.
|
||||
*
|
||||
* <p>This allows us to use {@link #getContent()} as-is, fully emulating the behavior of {@link
|
||||
* BasicHttpEntity} regardless of serialization.
|
||||
*/
|
||||
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
|
||||
ois.defaultReadObject();
|
||||
super.setContent(new ByteArrayInputStream(this.content.getBytes(UTF_8)));
|
||||
}
|
||||
|
||||
FakeHttpEntity(String content) {
|
||||
this.content = content;
|
||||
super.setContent(new ByteArrayInputStream(this.content.getBytes(UTF_8)));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -14,361 +14,183 @@
|
|||
|
||||
package google.registry.beam.spec11;
|
||||
|
||||
import static com.google.common.collect.ImmutableList.toImmutableList;
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.withSettings;
|
||||
import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
|
||||
|
||||
import com.google.auth.oauth2.GoogleCredentials;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.io.CharStreams;
|
||||
import com.google.common.collect.Streams;
|
||||
import com.google.common.truth.Correspondence;
|
||||
import com.google.common.truth.Correspondence.BinaryPredicate;
|
||||
import google.registry.beam.TestPipelineExtension;
|
||||
import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn;
|
||||
import google.registry.model.reporting.Spec11ThreatMatch;
|
||||
import google.registry.model.reporting.Spec11ThreatMatch.ThreatType;
|
||||
import google.registry.persistence.transaction.JpaTransactionManager;
|
||||
import google.registry.model.reporting.Spec11ThreatMatchDao;
|
||||
import google.registry.persistence.transaction.JpaTestRules;
|
||||
import google.registry.persistence.transaction.JpaTestRules.JpaIntegrationTestExtension;
|
||||
import google.registry.testing.DatastoreEntityExtension;
|
||||
import google.registry.testing.FakeClock;
|
||||
import google.registry.testing.FakeSleeper;
|
||||
import google.registry.util.GoogleCredentialsBundle;
|
||||
import google.registry.util.ResourceUtils;
|
||||
import google.registry.util.Retrier;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.Serializable;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Comparator;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.beam.runners.direct.DirectRunner;
|
||||
import org.apache.beam.sdk.options.PipelineOptions;
|
||||
import org.apache.beam.sdk.coders.KvCoder;
|
||||
import org.apache.beam.sdk.coders.SerializableCoder;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
|
||||
import org.apache.beam.sdk.transforms.Create;
|
||||
import org.apache.beam.sdk.values.KV;
|
||||
import org.apache.beam.sdk.values.PCollection;
|
||||
import org.apache.http.ProtocolVersion;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.entity.BasicHttpEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.message.BasicStatusLine;
|
||||
import org.joda.time.DateTime;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
import org.joda.time.LocalDate;
|
||||
import org.joda.time.format.ISODateTimeFormat;
|
||||
import org.json.JSONArray;
|
||||
import org.json.JSONException;
|
||||
import org.json.JSONObject;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Order;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.mockito.junit.jupiter.MockitoSettings;
|
||||
import org.mockito.quality.Strictness;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
/** Unit tests for {@link Spec11Pipeline}. */
|
||||
/**
|
||||
* Unit tests for {@link Spec11Pipeline}.
|
||||
*
|
||||
* <p>Unfortunately there is no emulator for BigQuery like that for Datastore or App Engine.
|
||||
* Therefore we cannot fully test the pipeline but only test the two separate sink IO functions,
|
||||
* assuming that date is sourcede correctly the {@code BigQueryIO}.
|
||||
*/
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
@MockitoSettings(strictness = Strictness.LENIENT)
|
||||
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
|
||||
class Spec11PipelineTest {
|
||||
private static class SaveNewThreatMatchAnswer implements Answer<Void>, Serializable {
|
||||
@Override
|
||||
public Void answer(InvocationOnMock invocation) {
|
||||
Runnable runnable = invocation.getArgument(0, Runnable.class);
|
||||
runnable.run();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
private static final String DATE = "2020-01-27";
|
||||
private static final String SAFE_BROWSING_API_KEY = "api-key";
|
||||
private static final String REPORTING_BUCKET_URL = "reporting_bucket";
|
||||
|
||||
private static PipelineOptions pipelineOptions;
|
||||
private static final ImmutableList<Subdomain> SUBDOMAINS =
|
||||
ImmutableList.of(
|
||||
Subdomain.create("111.com", "123456789-COM", "hello-registrar", "email@hello.net"),
|
||||
Subdomain.create("party-night.net", "2244AABBC-NET", "kitty-registrar", "contact@kit.ty"),
|
||||
Subdomain.create("bitcoin.bank", "1C3D5E7F9-BANK", "hello-registrar", "email@hello.net"),
|
||||
Subdomain.create("no-email.com", "2A4BA9BBC-COM", "kitty-registrar", "contact@kit.ty"),
|
||||
Subdomain.create(
|
||||
"anti-anti-anti-virus.dev", "555666888-DEV", "cool-registrar", "cool@aid.net"));
|
||||
|
||||
@Mock(serializable = true)
|
||||
private static JpaTransactionManager mockJpaTm;
|
||||
private static final ImmutableList<ThreatMatch> THREAT_MATCHES =
|
||||
ImmutableList.of(
|
||||
ThreatMatch.create("MALWARE", "111.com"),
|
||||
ThreatMatch.create("SOCIAL_ENGINEERING", "party-night.net"),
|
||||
ThreatMatch.create("POTENTIALLY_HARMFUL_APPLICATION", "bitcoin.bank"),
|
||||
ThreatMatch.create("THREAT_TYPE_UNSPECIFIED", "no-eamil.com"),
|
||||
ThreatMatch.create("UNWANTED_SOFTWARE", "anti-anti-anti-virus.dev"));
|
||||
|
||||
@BeforeAll
|
||||
static void beforeAll() {
|
||||
pipelineOptions = PipelineOptionsFactory.create();
|
||||
pipelineOptions.setRunner(DirectRunner.class);
|
||||
}
|
||||
// This extension is only needed because Spec11ThreatMatch uses Ofy to generate the ID. Can be
|
||||
// removed after the SQL migration.
|
||||
@RegisterExtension
|
||||
@Order(Order.DEFAULT - 1)
|
||||
final transient DatastoreEntityExtension datastore = new DatastoreEntityExtension();
|
||||
|
||||
@TempDir Path tmpDir;
|
||||
|
||||
@RegisterExtension
|
||||
final transient TestPipelineExtension testPipeline =
|
||||
TestPipelineExtension.fromOptions(pipelineOptions);
|
||||
final TestPipelineExtension pipeline =
|
||||
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
|
||||
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
@TempDir
|
||||
Path tmpDir;
|
||||
@RegisterExtension
|
||||
final JpaIntegrationTestExtension database =
|
||||
new JpaTestRules.Builder().withClock(new FakeClock()).buildIntegrationTestRule();
|
||||
|
||||
private final Retrier retrier =
|
||||
new Retrier(new FakeSleeper(new FakeClock(DateTime.parse("2019-07-15TZ"))), 1);
|
||||
private Spec11Pipeline spec11Pipeline;
|
||||
private final Spec11PipelineOptions options =
|
||||
PipelineOptionsFactory.create().as(Spec11PipelineOptions.class);
|
||||
|
||||
private File reportingBucketUrl;
|
||||
private PCollection<KV<Subdomain, ThreatMatch>> threatMatches;
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws IOException {
|
||||
String beamTempFolder =
|
||||
Files.createDirectory(tmpDir.resolve("beam_temp")).toAbsolutePath().toString();
|
||||
|
||||
spec11Pipeline =
|
||||
new Spec11Pipeline(
|
||||
"test-project",
|
||||
"region",
|
||||
beamTempFolder + "/staging",
|
||||
beamTempFolder + "/templates/invoicing",
|
||||
tmpDir.toAbsolutePath().toString(),
|
||||
() -> mockJpaTm,
|
||||
GoogleCredentialsBundle.create(GoogleCredentials.create(null)),
|
||||
retrier);
|
||||
}
|
||||
|
||||
private static final ImmutableList<String> BAD_DOMAINS =
|
||||
ImmutableList.of(
|
||||
"111.com", "222.com", "444.com", "no-email.com", "testThreatMatchToSqlBad.com");
|
||||
|
||||
private ImmutableList<Subdomain> getInputDomainsJson() {
|
||||
ImmutableList.Builder<Subdomain> subdomainsBuilder = new ImmutableList.Builder<>();
|
||||
// Put in at least 2 batches worth (x > 490) to guarantee multiple executions.
|
||||
// Put in half for theRegistrar and half for someRegistrar
|
||||
for (int i = 0; i < 255; i++) {
|
||||
subdomainsBuilder.add(
|
||||
Subdomain.create(
|
||||
String.format("%s.com", i), "theDomain", "theRegistrar", "fake@theRegistrar.com"));
|
||||
}
|
||||
for (int i = 255; i < 510; i++) {
|
||||
subdomainsBuilder.add(
|
||||
Subdomain.create(
|
||||
String.format("%s.com", i), "someDomain", "someRegistrar", "fake@someRegistrar.com"));
|
||||
}
|
||||
subdomainsBuilder.add(Subdomain.create("no-email.com", "fakeDomain", "noEmailRegistrar", ""));
|
||||
return subdomainsBuilder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the end-to-end Spec11 pipeline with mocked out API calls.
|
||||
*
|
||||
* <p>We suppress the (Serializable & Supplier) dual-casted lambda warnings because the supplier
|
||||
* produces an explicitly serializable mock, which is safe to cast.
|
||||
*/
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
void testEndToEndPipeline_generatesExpectedFiles() throws Exception {
|
||||
// Establish mocks for testing
|
||||
ImmutableList<Subdomain> inputRows = getInputDomainsJson();
|
||||
CloseableHttpClient mockHttpClient =
|
||||
mock(CloseableHttpClient.class, withSettings().serializable());
|
||||
|
||||
// Return a mock HttpResponse that returns a JSON response based on the request.
|
||||
when(mockHttpClient.execute(any(HttpPost.class))).thenAnswer(new HttpResponder());
|
||||
|
||||
EvaluateSafeBrowsingFn evalFn =
|
||||
new EvaluateSafeBrowsingFn(
|
||||
StaticValueProvider.of("apikey"),
|
||||
new Retrier(new FakeSleeper(new FakeClock()), 3),
|
||||
(Serializable & Supplier) () -> mockHttpClient);
|
||||
|
||||
// Apply input and evaluation transforms
|
||||
PCollection<Subdomain> input = testPipeline.apply(Create.of(inputRows));
|
||||
spec11Pipeline.evaluateUrlHealth(input, evalFn, StaticValueProvider.of("2018-06-01"));
|
||||
testPipeline.run();
|
||||
|
||||
// Verify header and 4 threat matches for 3 registrars are found
|
||||
ImmutableList<String> generatedReport = resultFileContents();
|
||||
assertThat(generatedReport).hasSize(4);
|
||||
assertThat(generatedReport.get(0))
|
||||
.isEqualTo("Map from registrar email / name to detected subdomain threats:");
|
||||
|
||||
// The output file can put the registrar emails and bad URLs in any order.
|
||||
// We cannot rely on the JSON toString to sort because the keys are not always in the same
|
||||
// order, so we must rely on length even though that's not ideal.
|
||||
ImmutableList<String> sortedLines =
|
||||
ImmutableList.sortedCopyOf(
|
||||
Comparator.comparingInt(String::length), generatedReport.subList(1, 4));
|
||||
|
||||
JSONObject noEmailRegistrarJSON = new JSONObject(sortedLines.get(0));
|
||||
assertThat(noEmailRegistrarJSON.get("registrarEmailAddress")).isEqualTo("");
|
||||
assertThat(noEmailRegistrarJSON.get("registrarClientId")).isEqualTo("noEmailRegistrar");
|
||||
assertThat(noEmailRegistrarJSON.has("threatMatches")).isTrue();
|
||||
JSONArray noEmailThreatMatch = noEmailRegistrarJSON.getJSONArray("threatMatches");
|
||||
assertThat(noEmailThreatMatch.length()).isEqualTo(1);
|
||||
assertThat(noEmailThreatMatch.getJSONObject(0).get("fullyQualifiedDomainName"))
|
||||
.isEqualTo("no-email.com");
|
||||
assertThat(noEmailThreatMatch.getJSONObject(0).get("threatType")).isEqualTo("MALWARE");
|
||||
|
||||
JSONObject someRegistrarJSON = new JSONObject(sortedLines.get(1));
|
||||
assertThat(someRegistrarJSON.get("registrarEmailAddress")).isEqualTo("fake@someRegistrar.com");
|
||||
assertThat(someRegistrarJSON.get("registrarClientId")).isEqualTo("someRegistrar");
|
||||
assertThat(someRegistrarJSON.has("threatMatches")).isTrue();
|
||||
JSONArray someThreatMatch = someRegistrarJSON.getJSONArray("threatMatches");
|
||||
assertThat(someThreatMatch.length()).isEqualTo(1);
|
||||
assertThat(someThreatMatch.getJSONObject(0).get("fullyQualifiedDomainName"))
|
||||
.isEqualTo("444.com");
|
||||
assertThat(someThreatMatch.getJSONObject(0).get("threatType")).isEqualTo("MALWARE");
|
||||
|
||||
// theRegistrar has two ThreatMatches, we have to parse it explicitly
|
||||
JSONObject theRegistrarJSON = new JSONObject(sortedLines.get(2));
|
||||
assertThat(theRegistrarJSON.get("registrarEmailAddress")).isEqualTo("fake@theRegistrar.com");
|
||||
assertThat(theRegistrarJSON.get("registrarClientId")).isEqualTo("theRegistrar");
|
||||
assertThat(theRegistrarJSON.has("threatMatches")).isTrue();
|
||||
JSONArray theThreatMatches = theRegistrarJSON.getJSONArray("threatMatches");
|
||||
assertThat(theThreatMatches.length()).isEqualTo(2);
|
||||
ImmutableList<String> threatMatchStrings =
|
||||
ImmutableList.of(
|
||||
theThreatMatches.getJSONObject(0).toString(),
|
||||
theThreatMatches.getJSONObject(1).toString());
|
||||
assertThat(threatMatchStrings)
|
||||
.containsExactly(
|
||||
new JSONObject()
|
||||
.put("fullyQualifiedDomainName", "111.com")
|
||||
.put("threatType", "MALWARE")
|
||||
.toString(),
|
||||
new JSONObject()
|
||||
.put("fullyQualifiedDomainName", "222.com")
|
||||
.put("threatType", "MALWARE")
|
||||
.toString());
|
||||
void beforeEach() throws Exception {
|
||||
reportingBucketUrl = Files.createDirectory(tmpDir.resolve(REPORTING_BUCKET_URL)).toFile();
|
||||
options.setDate(DATE);
|
||||
options.setSafeBrowsingApiKey(SAFE_BROWSING_API_KEY);
|
||||
options.setReportingBucketUrl(reportingBucketUrl.getAbsolutePath());
|
||||
threatMatches =
|
||||
pipeline.apply(
|
||||
Create.of(
|
||||
Streams.zip(SUBDOMAINS.stream(), THREAT_MATCHES.stream(), KV::of)
|
||||
.collect(toImmutableList()))
|
||||
.withCoder(
|
||||
KvCoder.of(
|
||||
SerializableCoder.of(Subdomain.class),
|
||||
SerializableCoder.of(ThreatMatch.class))));
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSpec11ThreatMatchToSql() throws Exception {
|
||||
doAnswer(new SaveNewThreatMatchAnswer()).when(mockJpaTm).transact(any(Runnable.class));
|
||||
|
||||
// Create one bad and one good Subdomain to test with evaluateUrlHealth. Only the bad one should
|
||||
// be detected and persisted.
|
||||
Subdomain badDomain =
|
||||
Subdomain.create(
|
||||
"testThreatMatchToSqlBad.com", "theDomain", "theRegistrar", "fake@theRegistrar.com");
|
||||
Subdomain goodDomain =
|
||||
Subdomain.create(
|
||||
"testThreatMatchToSqlGood.com",
|
||||
"someDomain",
|
||||
"someRegistrar",
|
||||
"fake@someRegistrar.com");
|
||||
|
||||
// Establish a mock HttpResponse that returns a JSON response based on the request.
|
||||
CloseableHttpClient mockHttpClient =
|
||||
mock(CloseableHttpClient.class, withSettings().serializable());
|
||||
when(mockHttpClient.execute(any(HttpPost.class))).thenAnswer(new HttpResponder());
|
||||
|
||||
EvaluateSafeBrowsingFn evalFn =
|
||||
new EvaluateSafeBrowsingFn(
|
||||
StaticValueProvider.of("apikey"),
|
||||
new Retrier(new FakeSleeper(new FakeClock()), 3),
|
||||
(Serializable & Supplier) () -> mockHttpClient);
|
||||
|
||||
// Apply input and evaluation transforms
|
||||
PCollection<Subdomain> input = testPipeline.apply(Create.of(badDomain, goodDomain));
|
||||
spec11Pipeline.evaluateUrlHealth(input, evalFn, StaticValueProvider.of("2020-06-10"));
|
||||
testPipeline.run();
|
||||
|
||||
// Verify that the expected threat created from the bad Subdomain and the persisted
|
||||
// Spec11TThreatMatch are equal.
|
||||
Spec11ThreatMatch expected =
|
||||
new Spec11ThreatMatch()
|
||||
.asBuilder()
|
||||
.setThreatTypes(ImmutableSet.of(ThreatType.MALWARE))
|
||||
.setCheckDate(LocalDate.parse("2020-06-10", ISODateTimeFormat.date()))
|
||||
.setDomainName(badDomain.domainName())
|
||||
.setDomainRepoId(badDomain.domainRepoId())
|
||||
.setRegistrarId(badDomain.registrarId())
|
||||
.build();
|
||||
|
||||
verify(mockJpaTm).transact(any(Runnable.class));
|
||||
verify(mockJpaTm).putAll(ImmutableList.of(expected));
|
||||
void testSuccess_saveToSql() {
|
||||
ImmutableSet<Spec11ThreatMatch> sqlThreatMatches =
|
||||
ImmutableSet.of(
|
||||
new Spec11ThreatMatch.Builder()
|
||||
.setDomainName("111.com")
|
||||
.setDomainRepoId("123456789-COM")
|
||||
.setRegistrarId("hello-registrar")
|
||||
.setCheckDate(new LocalDate(2020, 1, 27))
|
||||
.setThreatTypes(ImmutableSet.of(ThreatType.MALWARE))
|
||||
.build(),
|
||||
new Spec11ThreatMatch.Builder()
|
||||
.setDomainName("party-night.net")
|
||||
.setDomainRepoId("2244AABBC-NET")
|
||||
.setRegistrarId("kitty-registrar")
|
||||
.setCheckDate(new LocalDate(2020, 1, 27))
|
||||
.setThreatTypes(ImmutableSet.of(ThreatType.SOCIAL_ENGINEERING))
|
||||
.build(),
|
||||
new Spec11ThreatMatch.Builder()
|
||||
.setDomainName("bitcoin.bank")
|
||||
.setDomainRepoId("1C3D5E7F9-BANK")
|
||||
.setRegistrarId("hello-registrar")
|
||||
.setCheckDate(new LocalDate(2020, 1, 27))
|
||||
.setThreatTypes(ImmutableSet.of(ThreatType.POTENTIALLY_HARMFUL_APPLICATION))
|
||||
.build(),
|
||||
new Spec11ThreatMatch.Builder()
|
||||
.setDomainName("no-email.com")
|
||||
.setDomainRepoId("2A4BA9BBC-COM")
|
||||
.setRegistrarId("kitty-registrar")
|
||||
.setCheckDate(new LocalDate(2020, 1, 27))
|
||||
.setThreatTypes(ImmutableSet.of(ThreatType.THREAT_TYPE_UNSPECIFIED))
|
||||
.build(),
|
||||
new Spec11ThreatMatch.Builder()
|
||||
.setDomainName("anti-anti-anti-virus.dev")
|
||||
.setDomainRepoId("555666888-DEV")
|
||||
.setRegistrarId("cool-registrar")
|
||||
.setCheckDate(new LocalDate(2020, 1, 27))
|
||||
.setThreatTypes(ImmutableSet.of(ThreatType.UNWANTED_SOFTWARE))
|
||||
.build());
|
||||
Spec11Pipeline.saveToSql(threatMatches, options);
|
||||
pipeline.run().waitUntilFinish();
|
||||
assertThat(
|
||||
jpaTm()
|
||||
.transact(
|
||||
() ->
|
||||
Spec11ThreatMatchDao.loadEntriesByDate(
|
||||
jpaTm(), new LocalDate(2020, 1, 27))))
|
||||
.comparingElementsUsing(immutableObjectCorrespondence("id"))
|
||||
.containsExactlyElementsIn(sqlThreatMatches);
|
||||
}
|
||||
|
||||
/**
|
||||
* A serializable {@link Answer} that returns a mock HTTP response based on the HTTP request's
|
||||
* content.
|
||||
*/
|
||||
private static class HttpResponder implements Answer<CloseableHttpResponse>, Serializable {
|
||||
@Override
|
||||
public CloseableHttpResponse answer(InvocationOnMock invocation) throws Throwable {
|
||||
return getMockResponse(
|
||||
CharStreams.toString(
|
||||
new InputStreamReader(
|
||||
((HttpPost) invocation.getArguments()[0]).getEntity().getContent(), UTF_8)));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link CloseableHttpResponse} containing either positive (threat found) or negative
|
||||
* (no threat) API examples based on the request data.
|
||||
*/
|
||||
private static CloseableHttpResponse getMockResponse(String request) throws JSONException {
|
||||
// Determine which bad URLs are in the request (if any)
|
||||
ImmutableList<String> badUrls =
|
||||
BAD_DOMAINS.stream().filter(request::contains).collect(ImmutableList.toImmutableList());
|
||||
|
||||
CloseableHttpResponse httpResponse =
|
||||
mock(CloseableHttpResponse.class, withSettings().serializable());
|
||||
when(httpResponse.getStatusLine())
|
||||
.thenReturn(new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 200, "Done"));
|
||||
when(httpResponse.getEntity()).thenReturn(new FakeHttpEntity(getAPIResponse(badUrls)));
|
||||
return httpResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the expected API response for a list of bad URLs.
|
||||
*
|
||||
* <p>If there are no badUrls in the list, this returns the empty JSON string "{}".
|
||||
*/
|
||||
private static String getAPIResponse(ImmutableList<String> badUrls) throws JSONException {
|
||||
JSONObject response = new JSONObject();
|
||||
if (badUrls.isEmpty()) {
|
||||
return response.toString();
|
||||
}
|
||||
// Create a threatMatch for each badUrl
|
||||
JSONArray matches = new JSONArray();
|
||||
for (String badUrl : badUrls) {
|
||||
matches.put(
|
||||
new JSONObject()
|
||||
.put("threatType", "MALWARE")
|
||||
.put("platformType", "WINDOWS")
|
||||
.put("threatEntryType", "URL")
|
||||
.put("threat", new JSONObject().put("url", badUrl))
|
||||
.put("cacheDuration", "300.000s"));
|
||||
}
|
||||
response.put("matches", matches);
|
||||
return response.toString();
|
||||
}
|
||||
|
||||
/** A serializable HttpEntity fake that returns {@link String} content. */
|
||||
private static class FakeHttpEntity extends BasicHttpEntity implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 105738294571L;
|
||||
|
||||
private String content;
|
||||
|
||||
private void writeObject(ObjectOutputStream oos) throws IOException {
|
||||
oos.defaultWriteObject();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the {@link FakeHttpEntity} content upon deserialization.
|
||||
*
|
||||
* <p>This allows us to use {@link #getContent()} as-is, fully emulating the behavior of {@link
|
||||
* BasicHttpEntity} regardless of serialization.
|
||||
*/
|
||||
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
|
||||
ois.defaultReadObject();
|
||||
super.setContent(new ByteArrayInputStream(this.content.getBytes(UTF_8)));
|
||||
}
|
||||
|
||||
FakeHttpEntity(String content) {
|
||||
this.content = content;
|
||||
super.setContent(new ByteArrayInputStream(this.content.getBytes(UTF_8)));
|
||||
}
|
||||
@Test
|
||||
void testSuccess_saveToGcs() throws Exception {
|
||||
ImmutableList<String> expectedFileContents =
|
||||
ImmutableList.copyOf(
|
||||
ResourceUtils.readResourceUtf8(this.getClass(), "test_output.txt").split("\n"));
|
||||
Spec11Pipeline.saveToGcs(threatMatches, options);
|
||||
pipeline.run().waitUntilFinish();
|
||||
ImmutableList<String> resultFileContents = resultFileContents();
|
||||
assertThat(resultFileContents.size()).isEqualTo(expectedFileContents.size());
|
||||
assertThat(resultFileContents.get(0)).isEqualTo(expectedFileContents.get(0));
|
||||
assertThat(resultFileContents.subList(1, resultFileContents.size()))
|
||||
.comparingElementsUsing(
|
||||
Correspondence.from(
|
||||
new ThreatMatchJsonPredicate(), "has fields with unordered threatTypes equal to"))
|
||||
.containsExactlyElementsIn(expectedFileContents.subList(1, expectedFileContents.size()));
|
||||
}
|
||||
|
||||
/** Returns the text contents of a file under the beamBucket/results directory. */
|
||||
|
@ -376,9 +198,34 @@ class Spec11PipelineTest {
|
|||
File resultFile =
|
||||
new File(
|
||||
String.format(
|
||||
"%s/icann/spec11/2018-06/SPEC11_MONTHLY_REPORT_2018-06-01",
|
||||
tmpDir.toAbsolutePath().toString()));
|
||||
"%s/icann/spec11/2020-01/SPEC11_MONTHLY_REPORT_2020-01-27",
|
||||
reportingBucketUrl.getAbsolutePath().toString()));
|
||||
return ImmutableList.copyOf(
|
||||
ResourceUtils.readResourceUtf8(resultFile.toURI().toURL()).split("\n"));
|
||||
}
|
||||
|
||||
private static class ThreatMatchJsonPredicate implements BinaryPredicate<String, String> {
|
||||
private static final String THREAT_MATCHES = "threatMatches";
|
||||
|
||||
@Override
|
||||
public boolean apply(@Nullable String actual, @Nullable String expected) {
|
||||
JSONObject actualJson = new JSONObject(actual);
|
||||
JSONObject expectedJson = new JSONObject(expected);
|
||||
if (!actualJson.keySet().equals(expectedJson.keySet())) {
|
||||
return false;
|
||||
}
|
||||
for (String key : actualJson.keySet()) {
|
||||
if (key.equals(THREAT_MATCHES)) {
|
||||
if (ImmutableSet.copyOf(actualJson.getJSONArray(key))
|
||||
.equals(ImmutableSet.copyOf(expectedJson.getJSONArray(key)))) {
|
||||
return false;
|
||||
}
|
||||
} else if (!actualJson.get(key).equals(expectedJson.get(key))) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,95 +15,102 @@
|
|||
package google.registry.reporting.spec11;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued;
|
||||
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
|
||||
import static org.apache.http.HttpStatus.SC_OK;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.api.services.dataflow.Dataflow;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Templates;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Templates.Launch;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Locations;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Locations.FlexTemplates.Launch;
|
||||
import com.google.api.services.dataflow.model.Job;
|
||||
import com.google.api.services.dataflow.model.LaunchTemplateParameters;
|
||||
import com.google.api.services.dataflow.model.LaunchTemplateResponse;
|
||||
import com.google.api.services.dataflow.model.RuntimeEnvironment;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
|
||||
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
|
||||
import com.google.common.net.MediaType;
|
||||
import google.registry.testing.AppEngineExtension;
|
||||
import google.registry.testing.FakeClock;
|
||||
import google.registry.testing.FakeResponse;
|
||||
import google.registry.testing.TaskQueueHelper.TaskMatcher;
|
||||
import java.io.IOException;
|
||||
import org.joda.time.LocalDate;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.mockito.junit.jupiter.MockitoSettings;
|
||||
import org.mockito.quality.Strictness;
|
||||
|
||||
/** Unit tests for {@link google.registry.reporting.spec11.GenerateSpec11ReportAction}. */
|
||||
/** Unit tests for {@link GenerateSpec11ReportAction}. */
|
||||
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
|
||||
class GenerateSpec11ReportActionTest {
|
||||
|
||||
@RegisterExtension
|
||||
final AppEngineExtension appEngine = AppEngineExtension.builder().withTaskQueue().build();
|
||||
|
||||
private FakeResponse response;
|
||||
private Dataflow dataflow;
|
||||
private Projects dataflowProjects;
|
||||
private Templates dataflowTemplates;
|
||||
private Launch dataflowLaunch;
|
||||
private FakeResponse response = new FakeResponse();
|
||||
private Dataflow dataflow = mock(Dataflow.class);
|
||||
private Projects projects = mock(Projects.class);
|
||||
private Locations locations = mock(Locations.class);
|
||||
private FlexTemplates templates = mock(FlexTemplates.class);
|
||||
private Launch launch = mock(Launch.class);
|
||||
private LaunchFlexTemplateResponse launchResponse =
|
||||
new LaunchFlexTemplateResponse().setJob(new Job().setId("jobid"));
|
||||
|
||||
private final FakeClock clock = new FakeClock(DateTime.parse("2018-06-11T12:23:56Z"));
|
||||
private GenerateSpec11ReportAction action;
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws IOException {
|
||||
response = new FakeResponse();
|
||||
dataflow = mock(Dataflow.class);
|
||||
|
||||
// Establish the Dataflow API call chain
|
||||
dataflow = mock(Dataflow.class);
|
||||
dataflowProjects = mock(Dataflow.Projects.class);
|
||||
dataflowTemplates = mock(Templates.class);
|
||||
dataflowLaunch = mock(Launch.class);
|
||||
LaunchTemplateResponse launchTemplateResponse = new LaunchTemplateResponse();
|
||||
// Ultimately we get back this job response with a given id.
|
||||
launchTemplateResponse.setJob(new Job().setId("jobid"));
|
||||
when(dataflow.projects()).thenReturn(dataflowProjects);
|
||||
when(dataflowProjects.templates()).thenReturn(dataflowTemplates);
|
||||
when(dataflowTemplates.launch(any(String.class), any(LaunchTemplateParameters.class)))
|
||||
.thenReturn(dataflowLaunch);
|
||||
when(dataflowLaunch.setGcsPath(any(String.class))).thenReturn(dataflowLaunch);
|
||||
when(dataflowLaunch.execute()).thenReturn(launchTemplateResponse);
|
||||
when(dataflow.projects()).thenReturn(projects);
|
||||
when(projects.locations()).thenReturn(locations);
|
||||
when(locations.flexTemplates()).thenReturn(templates);
|
||||
when(templates.launch(anyString(), anyString(), any(LaunchFlexTemplateRequest.class)))
|
||||
.thenReturn(launch);
|
||||
when(launch.execute()).thenReturn(launchResponse);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testLaunch_success() throws IOException {
|
||||
void testFailure_dataflowFailure() throws IOException {
|
||||
action =
|
||||
new GenerateSpec11ReportAction(
|
||||
"test",
|
||||
"gs://my-bucket-beam",
|
||||
"gs://template",
|
||||
"test-project",
|
||||
"us-east1-c",
|
||||
"gs://staging-project/staging-bucket/",
|
||||
"gs://reporting-project/reporting-bucket/",
|
||||
"api_key/a",
|
||||
new LocalDate(2018, 6, 11),
|
||||
clock.nowUtc().toLocalDate(),
|
||||
clock,
|
||||
response,
|
||||
dataflow);
|
||||
when(launch.execute()).thenThrow(new IOException("Dataflow failure"));
|
||||
action.run();
|
||||
assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR);
|
||||
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||
assertThat(response.getPayload()).contains("Dataflow failure");
|
||||
assertNoTasksEnqueued("beam-reporting");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess() throws IOException {
|
||||
action =
|
||||
new GenerateSpec11ReportAction(
|
||||
"test-project",
|
||||
"us-east1-c",
|
||||
"gs://staging-project/staging-bucket/",
|
||||
"gs://reporting-project/reporting-bucket/",
|
||||
"api_key/a",
|
||||
clock.nowUtc().toLocalDate(),
|
||||
clock,
|
||||
response,
|
||||
dataflow);
|
||||
action.run();
|
||||
|
||||
LaunchTemplateParameters expectedLaunchTemplateParameters =
|
||||
new LaunchTemplateParameters()
|
||||
.setJobName("spec11_2018-06-11")
|
||||
.setEnvironment(
|
||||
new RuntimeEnvironment()
|
||||
.setZone("us-east1-c")
|
||||
.setTempLocation("gs://my-bucket-beam/temporary"))
|
||||
.setParameters(
|
||||
ImmutableMap.of("safeBrowsingApiKey", "api_key/a", "date", "2018-06-11"));
|
||||
verify(dataflowTemplates).launch("test", expectedLaunchTemplateParameters);
|
||||
verify(dataflowLaunch).setGcsPath("gs://template");
|
||||
assertThat(response.getStatus()).isEqualTo(200);
|
||||
assertThat(response.getStatus()).isEqualTo(SC_OK);
|
||||
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
|
||||
assertThat(response.getPayload()).isEqualTo("Launched Spec11 dataflow template.");
|
||||
|
||||
TaskMatcher matcher =
|
||||
new TaskMatcher()
|
||||
.url("/_dr/task/publishSpec11")
|
||||
|
|
|
@ -29,8 +29,9 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import com.google.api.services.dataflow.Dataflow;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Jobs;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Locations;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs;
|
||||
import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Get;
|
||||
import com.google.api.services.dataflow.model.Job;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
@ -51,6 +52,7 @@ class PublishSpec11ReportActionTest {
|
|||
|
||||
private Dataflow dataflow;
|
||||
private Projects projects;
|
||||
private Locations locations;
|
||||
private Jobs jobs;
|
||||
private Get get;
|
||||
private Spec11EmailUtils emailUtils;
|
||||
|
@ -64,11 +66,13 @@ class PublishSpec11ReportActionTest {
|
|||
void beforeEach() throws Exception {
|
||||
dataflow = mock(Dataflow.class);
|
||||
projects = mock(Projects.class);
|
||||
locations = mock(Locations.class);
|
||||
jobs = mock(Jobs.class);
|
||||
get = mock(Get.class);
|
||||
when(dataflow.projects()).thenReturn(projects);
|
||||
when(projects.jobs()).thenReturn(jobs);
|
||||
when(jobs.get("test-project", "12345")).thenReturn(get);
|
||||
when(projects.locations()).thenReturn(locations);
|
||||
when(locations.jobs()).thenReturn(jobs);
|
||||
when(jobs.get("test-project", "test-region", "12345")).thenReturn(get);
|
||||
expectedJob = new Job();
|
||||
when(get.execute()).thenReturn(expectedJob);
|
||||
emailUtils = mock(Spec11EmailUtils.class);
|
||||
|
@ -78,6 +82,7 @@ class PublishSpec11ReportActionTest {
|
|||
publishAction =
|
||||
new PublishSpec11ReportAction(
|
||||
"test-project",
|
||||
"test-region",
|
||||
"Super Cool Registry",
|
||||
"12345",
|
||||
emailUtils,
|
||||
|
@ -95,6 +100,7 @@ class PublishSpec11ReportActionTest {
|
|||
publishAction =
|
||||
new PublishSpec11ReportAction(
|
||||
"test-project",
|
||||
"test-region",
|
||||
"Super Cool Registry",
|
||||
"12345",
|
||||
emailUtils,
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
Map from registrar email / name to detected subdomain threats:
|
||||
{"threatMatches":[{"threatType":"UNWANTED_SOFTWARE","fullyQualifiedDomainName":"anti-anti-anti-virus.dev"}],"registrarClientId":"cool-registrar","registrarEmailAddress":"cool@aid.net"}
|
||||
{"threatMatches":[{"threatType":"MALWARE","fullyQualifiedDomainName":"111.com"},{"threatType":"POTENTIALLY_HARMFUL_APPLICATION","fullyQualifiedDomainName":"bitcoin.bank"}],"registrarClientId":"hello-registrar","registrarEmailAddress":"email@hello.net"}
|
||||
{"threatMatches":[{"threatType":"THREAT_TYPE_UNSPECIFIED","fullyQualifiedDomainName":"no-eamil.com"},{"threatType":"SOCIAL_ENGINEERING","fullyQualifiedDomainName":"party-night.net"}],"registrarClientId":"kitty-registrar","registrarEmailAddress":"contact@kit.ty"}
|
Loading…
Add table
Add a link
Reference in a new issue