Refactor contact history PII wipeout logic into a Beam pipeline (#1994)

Because we need to check if a contact history is the most recent for its
underlying contact resource, the query-wipe out-repeat loop no longer works
ideally due to the added overhead with the query.

Instead, we refactor the logic into a Beam pipeline where the query only
needs to be performed once and history entries eligible for wipe out are
handled individually in their own transforms. Because history entries
are otherwise immutable, we can run the pipeline in relatively relaxed
repeatable read isolation level. We also do not worry about batching for
performance, as we do not anticipate this operation to put a lot of
strains on the particular table.
This commit is contained in:
Lai Jiang 2023-04-19 13:04:45 -04:00 committed by GitHub
parent ad7d6f4873
commit 2981c1c10c
13 changed files with 625 additions and 409 deletions

View file

@ -693,10 +693,6 @@ createToolTask(
'google.registry.tools.DevTool',
sourceSets.nonprod)
createToolTask(
'createSyntheticDomainHistories',
'google.registry.tools.javascrap.CreateSyntheticDomainHistoriesPipeline')
project.tasks.create('generateSqlSchema', JavaExec) {
classpath = sourceSets.nonprod.runtimeClasspath
main = 'google.registry.tools.DevTool'
@ -766,6 +762,11 @@ if (environment == 'alpha') {
mainClass: 'google.registry.beam.resave.ResaveAllEppResourcesPipeline',
metaData: 'google/registry/beam/resave_all_epp_resources_pipeline_metadata.json'
],
wipeOutContactHistoryPii:
[
mainClass: 'google.registry.beam.wipeout.WipeOutContactHistoryPiiPipeline',
metaData: 'google/registry/beam/wipe_out_contact_history_pii_pipeline_metadata.json'
],
]
project.tasks.create("stageBeamPipelines") {
doLast {

View file

@ -117,6 +117,12 @@ public class BatchModule {
return extractOptionalDatetimeParameter(req, ExpandRecurringBillingEventsAction.PARAM_END_TIME);
}
@Provides
@Parameter(WipeOutContactHistoryPiiAction.PARAM_CUTOFF_TIME)
static Optional<DateTime> provideCutoffTime(HttpServletRequest req) {
return extractOptionalDatetimeParameter(req, WipeOutContactHistoryPiiAction.PARAM_CUTOFF_TIME);
}
@Provides
@Parameter(ExpandRecurringBillingEventsAction.PARAM_ADVANCE_CURSOR)
static boolean provideAdvanceCursor(HttpServletRequest req) {

View file

@ -14,31 +14,39 @@
package google.registry.batch;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static org.apache.http.HttpStatus.SC_INTERNAL_SERVER_ERROR;
import static org.apache.http.HttpStatus.SC_OK;
import static google.registry.batch.BatchModule.PARAM_DRY_RUN;
import static google.registry.beam.BeamUtils.createJobName;
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import com.google.common.annotations.VisibleForTesting;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
import com.google.common.collect.ImmutableMap;
import com.google.common.flogger.FluentLogger;
import com.google.common.net.MediaType;
import google.registry.beam.wipeout.WipeOutContactHistoryPiiPipeline;
import google.registry.config.RegistryConfig.Config;
import google.registry.config.RegistryEnvironment;
import google.registry.model.contact.ContactHistory;
import google.registry.request.Action;
import google.registry.request.Action.Service;
import google.registry.request.Parameter;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import java.io.IOException;
import java.util.Optional;
import javax.inject.Inject;
import org.joda.time.DateTime;
/**
* An action that wipes out Personal Identifiable Information (PII) fields of {@link ContactHistory}
* entities.
* An action that launches {@link WipeOutContactHistoryPiiPipeline} to wipe out Personal
* Identifiable Information (PII) fields of {@link ContactHistory} entities.
*
* <p>ContactHistory entities should be retained in the database for only certain amount of time.
* This periodic wipe out action only applies to SQL.
* <p>{@link ContactHistory} entities should be retained in the database for only certain amount of
* time.
*/
@Action(
service = Service.BACKEND,
@ -47,90 +55,89 @@ import org.joda.time.DateTime;
public class WipeOutContactHistoryPiiAction implements Runnable {
public static final String PATH = "/_dr/task/wipeOutContactHistoryPii";
public static final String PARAM_CUTOFF_TIME = "wipeoutTime";
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final String PIPELINE_NAME = "wipe_out_contact_history_pii_pipeline";
private final Clock clock;
private final Response response;
private final boolean isDryRun;
private final Optional<DateTime> maybeCutoffTime;
private final int minMonthsBeforeWipeOut;
private final int wipeOutQueryBatchSize;
private final String stagingBucketUrl;
private final String projectId;
private final String jobRegion;
private final Dataflow dataflow;
private final Response response;
@Inject
public WipeOutContactHistoryPiiAction(
Clock clock,
@Parameter(PARAM_DRY_RUN) boolean isDryRun,
@Parameter(PARAM_CUTOFF_TIME) Optional<DateTime> maybeCutoffTime,
@Config("minMonthsBeforeWipeOut") int minMonthsBeforeWipeOut,
@Config("wipeOutQueryBatchSize") int wipeOutQueryBatchSize,
@Config("beamStagingBucketUrl") String stagingBucketUrl,
@Config("projectId") String projectId,
@Config("defaultJobRegion") String jobRegion,
Dataflow dataflow,
Response response) {
this.clock = clock;
this.response = response;
this.isDryRun = isDryRun;
this.maybeCutoffTime = maybeCutoffTime;
this.minMonthsBeforeWipeOut = minMonthsBeforeWipeOut;
this.wipeOutQueryBatchSize = wipeOutQueryBatchSize;
this.stagingBucketUrl = stagingBucketUrl;
this.projectId = projectId;
this.jobRegion = jobRegion;
this.dataflow = dataflow;
this.response = response;
}
@Override
public void run() {
response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
DateTime cutoffTime =
maybeCutoffTime.orElse(clock.nowUtc().minusMonths(minMonthsBeforeWipeOut));
LaunchFlexTemplateParameter launchParameter =
new LaunchFlexTemplateParameter()
.setJobName(
createJobName(
String.format(
"contact-history-pii-wipeout-%s",
cutoffTime.toString("yyyy-MM-dd't'HH-mm-ss'z'")),
clock))
.setContainerSpecGcsPath(
String.format("%s/%s_metadata.json", stagingBucketUrl, PIPELINE_NAME))
.setParameters(
ImmutableMap.of(
"registryEnvironment",
RegistryEnvironment.get().name(),
"cutoffTime",
cutoffTime.toString("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),
"isDryRun",
Boolean.toString(isDryRun)));
logger.atInfo().log(
"Launching Beam pipeline to wipe out all PII of contact history entities prior to %s%s.",
cutoffTime, " in dry run mode");
try {
int totalNumOfWipedEntities = 0;
DateTime wipeOutTime = clock.nowUtc().minusMonths(minMonthsBeforeWipeOut);
logger.atInfo().log(
"About to wipe out all PII of contact history entities prior to %s.", wipeOutTime);
int numOfWipedEntities = 0;
do {
numOfWipedEntities =
tm().transact(
() ->
wipeOutContactHistoryData(
getNextContactHistoryEntitiesWithPiiBatch(wipeOutTime)));
totalNumOfWipedEntities += numOfWipedEntities;
} while (numOfWipedEntities > 0);
String msg =
String.format(
"Done. Wiped out PII of %d ContactHistory entities in total.",
totalNumOfWipedEntities);
logger.atInfo().log(msg);
response.setPayload(msg);
LaunchFlexTemplateResponse launchResponse =
dataflow
.projects()
.locations()
.flexTemplates()
.launch(
projectId,
jobRegion,
new LaunchFlexTemplateRequest().setLaunchParameter(launchParameter))
.execute();
logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString());
response.setStatus(SC_OK);
} catch (Exception e) {
logger.atSevere().withCause(e).log(
"Exception thrown during the process of wiping out contact history PII.");
response.setStatus(SC_INTERNAL_SERVER_ERROR);
response.setPayload(
String.format(
"Exception thrown during the process of wiping out contact history PII with cause"
+ ": %s",
e));
"Launched contact history PII wipeout pipeline: %s",
launchResponse.getJob().getId()));
} catch (IOException e) {
logger.atWarning().withCause(e).log("Pipeline Launch failed");
response.setStatus(SC_INTERNAL_SERVER_ERROR);
response.setPayload(String.format("Pipeline launch failed: %s", e.getMessage()));
}
}
/**
* Returns a stream of up to {@link #wipeOutQueryBatchSize} {@link ContactHistory} entities
* containing PII that are prior to @param wipeOutTime.
*/
@VisibleForTesting
Stream<ContactHistory> getNextContactHistoryEntitiesWithPiiBatch(DateTime wipeOutTime) {
// email is one of the required fields in EPP, meaning it's initially not null.
// Therefore, checking if it's null is one way to avoid processing contact history entities
// that have been processed previously. Refer to RFC 5733 for more information.
return tm().query(
"FROM ContactHistory WHERE modificationTime < :wipeOutTime " + "AND email IS NOT NULL",
ContactHistory.class)
.setParameter("wipeOutTime", wipeOutTime)
.setMaxResults(wipeOutQueryBatchSize)
.getResultStream();
}
/** Wipes out the PII of each of the {@link ContactHistory} entities in the stream. */
@VisibleForTesting
int wipeOutContactHistoryData(Stream<ContactHistory> contactHistoryEntities) {
AtomicInteger numOfEntities = new AtomicInteger(0);
contactHistoryEntities.forEach(
contactHistoryEntity -> {
tm().update(contactHistoryEntity.asBuilder().wipeOutPii().build());
numOfEntities.incrementAndGet();
});
logger.atInfo().log(
"Wiped out all PII fields of %d ContactHistory entities.", numOfEntities.get());
return numOfEntities.get();
}
}

View file

@ -0,0 +1,166 @@
// Copyright 2023 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.wipeout;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static org.apache.beam.sdk.values.TypeDescriptors.voids;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import google.registry.beam.common.RegistryJpaIO;
import google.registry.model.contact.ContactHistory;
import google.registry.model.reporting.HistoryEntry.HistoryEntryId;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import google.registry.persistence.VKey;
import java.io.Serializable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.DateTime;
/**
* Definition of a Dataflow Flex pipeline template, which finds out {@link ContactHistory} entries
* that are older than a given age (excluding the most recent one, even if it falls with the range)
* and wipe out PII information in them.
*
* <p>To stage this template locally, run {@code ./nom_build :core:sBP --environment=alpha \
* --pipeline=wipeOutContactHistoryPii}.
*
* <p>Then, you can run the staged template via the API client library, gCloud or a raw REST call.
*/
public class WipeOutContactHistoryPiiPipeline implements Serializable {
private static final long serialVersionUID = -4111052675715913820L;
private static final TupleTag<Long> REVISIONS_TO_WIPE = new TupleTag<>();
private static final TupleTag<Long> MOST_RECENT_REVISION = new TupleTag<>();
private final DateTime cutoffTime;
private final boolean dryRun;
private final Counter contactsInScope =
Metrics.counter("WipeOutContactHistoryPii", "contacts in scope");
private final Counter historiesToWipe =
Metrics.counter("WipeOutContactHistoryPii", "contact histories to wipe PII from");
private final Counter historiesWiped =
Metrics.counter("WipeOutContactHistoryPii", "contact histories actually updated");
WipeOutContactHistoryPiiPipeline(WipeOutContactHistoryPiiPipelineOptions options) {
dryRun = options.getIsDryRun();
cutoffTime = DateTime.parse(options.getCutoffTime());
}
void setup(Pipeline pipeline) {
KeyedPCollectionTuple.of(REVISIONS_TO_WIPE, getHistoryEntriesToWipe(pipeline))
.and(MOST_RECENT_REVISION, getMostRecentHistoryEntries(pipeline))
.apply("Group by contact", CoGroupByKey.create())
.apply(
"Wipe out PII",
MapElements.into(voids())
.via(
kv -> {
String repoId = kv.getKey();
long mostRecentRevision = kv.getValue().getOnly(MOST_RECENT_REVISION);
ImmutableList<Long> revisionsToWipe =
Streams.stream(kv.getValue().getAll(REVISIONS_TO_WIPE))
.filter(e -> e != mostRecentRevision)
.collect(toImmutableList());
if (revisionsToWipe.isEmpty()) {
return null;
}
contactsInScope.inc();
tm().transact(
() -> {
for (long revisionId : revisionsToWipe) {
historiesToWipe.inc();
ContactHistory history =
tm().loadByKey(
VKey.create(
ContactHistory.class,
new HistoryEntryId(repoId, revisionId)));
// In the unlikely case where multiple pipelines run at the
// same time, or where the runner decides to rerun a particular
// transform, we might have a history entry that has already been
// wiped at this point. There's no need to wipe it again.
if (!dryRun
&& history.getContactBase().isPresent()
&& history.getContactBase().get().getEmailAddress() != null) {
historiesWiped.inc();
tm().update(history.asBuilder().wipeOutPii().build());
}
}
});
return null;
}));
}
PCollection<KV<String, Long>> getHistoryEntriesToWipe(Pipeline pipeline) {
return pipeline.apply(
"Find contact histories to wipee",
// Email is one of the required fields in EPP, meaning it's initially not null when it
// is set by EPP flows (even though it is nullalbe in the SQL schema). Therefore,
// checking if it's null is one way to avoid processing contact history entities that
// have been processed previously. Refer to RFC 5733 for more information.
RegistryJpaIO.read(
"SELECT repoId, revisionId FROM ContactHistory WHERE email IS NOT NULL AND"
+ " modificationTime < :cutoffTime",
ImmutableMap.of("cutoffTime", cutoffTime),
Object[].class,
row -> KV.of((String) row[0], (long) row[1]))
.withCoder(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of())));
}
PCollection<KV<String, Long>> getMostRecentHistoryEntries(Pipeline pipeline) {
return pipeline.apply(
"Find the most recent historiy entry for each contact",
RegistryJpaIO.read(
"SELECT repoId, revisionId FROM ContactHistory"
+ " WHERE (repoId, modificationTime) IN"
+ " (SELECT repoId, MAX(modificationTime) FROM ContactHistory GROUP BY repoId)",
ImmutableMap.of(),
Object[].class,
row -> KV.of((String) row[0], (long) row[1]))
.withCoder(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of())));
}
PipelineResult run(Pipeline pipeline) {
setup(pipeline);
return pipeline.run();
}
public static void main(String[] args) {
PipelineOptionsFactory.register(WipeOutContactHistoryPiiPipelineOptions.class);
WipeOutContactHistoryPiiPipelineOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(WipeOutContactHistoryPiiPipelineOptions.class);
// Repeatable read should be more than enough since we are dealing with old history entries that
// are otherwise immutable.
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_REPEATABLE_READ);
Pipeline pipeline = Pipeline.create(options);
new WipeOutContactHistoryPiiPipeline(options).run(pipeline);
}
}

View file

@ -0,0 +1,37 @@
// Copyright 2023 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.wipeout;
import google.registry.beam.common.RegistryPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
public interface WipeOutContactHistoryPiiPipelineOptions extends RegistryPipelineOptions {
@Description(
"A contact history entry with a history modification time before this time will have its PII"
+ " wiped, unless it is the most entry for the contact.")
String getCutoffTime();
void setCutoffTime(String value);
@Description(
"If true, the wiped out billing events will not be saved but the pipeline metrics counter"
+ " will still be updated.")
@Default.Boolean(false)
boolean getIsDryRun();
void setIsDryRun(boolean value);
}

View file

@ -1325,12 +1325,6 @@ public final class RegistryConfig {
return config.contactHistory.minMonthsBeforeWipeOut;
}
@Provides
@Config("wipeOutQueryBatchSize")
public static int provideWipeOutQueryBatchSize(RegistryConfigSettings config) {
return config.contactHistory.wipeOutQueryBatchSize;
}
@Provides
@Config("jdbcBatchSize")
public static int provideHibernateJdbcBatchSize(RegistryConfigSettings config) {

View file

@ -243,7 +243,6 @@ public class RegistryConfigSettings {
/** Configuration for contact history. */
public static class ContactHistory {
public int minMonthsBeforeWipeOut;
public int wipeOutQueryBatchSize;
}
/** Configuration for dns update. */

View file

@ -474,8 +474,6 @@ registryTool:
contactHistory:
# The number of months that a ContactHistory entity should be stored in the database.
minMonthsBeforeWipeOut: 18
# The batch size for querying ContactHistory table in the database.
wipeOutQueryBatchSize: 500
# Configuration options relevant to the DNS update functionality.
dnsUpdate:

View file

@ -280,6 +280,6 @@
This job runs weekly to wipe out PII fields of ContactHistory entities
that have been in the database for a certain period of time.
</description>
<schedule>0 15 * 12 1</schedule>
<schedule>0 15 * * 1</schedule>
</task>
</taskentries>

View file

@ -0,0 +1,39 @@
{
"name": "Wipe Out PII From Old Contact History Entries",
"description": "An Apache Beam batch pipeline that finds old contact history entries and remove PII information from them.",
"parameters": [
{
"name": "registryEnvironment",
"label": "The Registry environment.",
"helpText": "The Registry environment.",
"is_optional": false,
"regexes": [
"^PRODUCTION|SANDBOX|CRASH|QA|ALPHA$"
]
},
{
"name": "isolationOverride",
"label": "The desired SQL transaction isolation level.",
"helpText": "The desired SQL transaction isolation level.",
"is_optional": true,
"regexes": [
"^[0-9A-Z_]+$"
]
},
{
"name": "cutoffTime",
"label": "The maximum history modification time of a contact history entry eligible for wipe out.",
"helpText": "If the history modificaiton time of contact history entry is older than this, and it is not the most recent entry of a contact, it will have its PII wiped out.",
"is_optional": true
},
{
"name": "isDryRun",
"label": "Whether this job is a dry run.",
"helpText": "If true, no changes will be saved to the database.",
"is_optional": true,
"regexes": [
"^true|false$"
]
}
]
}

View file

@ -15,348 +15,119 @@
package google.registry.batch;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static com.google.common.truth.Truth8.assertThat;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.testing.DatabaseHelper.persistResource;
import static org.apache.http.HttpStatus.SC_OK;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import google.registry.model.contact.Contact;
import google.registry.model.contact.ContactAddress;
import google.registry.model.contact.ContactAuthInfo;
import google.registry.model.contact.ContactBase;
import google.registry.model.contact.ContactHistory;
import google.registry.model.contact.ContactPhoneNumber;
import google.registry.model.contact.Disclose;
import google.registry.model.contact.PostalInfo;
import google.registry.model.eppcommon.AuthInfo.PasswordAuth;
import google.registry.model.eppcommon.PresenceMarker;
import google.registry.model.eppcommon.StatusValue;
import google.registry.persistence.transaction.JpaTestExtensions;
import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension;
import google.registry.testing.DatabaseHelper;
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
import google.registry.beam.BeamActionTestBase;
import google.registry.testing.FakeClock;
import google.registry.testing.FakeResponse;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
/** Unit tests for {@link WipeOutContactHistoryPiiAction}. */
class WipeOutContactHistoryPiiActionTest {
class WipeOutContactHistoryPiiActionTest extends BeamActionTestBase {
private static final int TEST_BATCH_SIZE = 20;
private static final int MIN_MONTHS_BEFORE_WIPE_OUT = 18;
private static final Contact DEFAULT_CONTACT =
new Contact.Builder()
.setContactId("sh8013")
.setRepoId("2FF-ROID")
.setStatusValues(ImmutableSet.of(StatusValue.CLIENT_DELETE_PROHIBITED))
.setLocalizedPostalInfo(
new PostalInfo.Builder()
.setType(PostalInfo.Type.LOCALIZED)
.setAddress(
new ContactAddress.Builder()
.setStreet(ImmutableList.of("123 Grand Ave"))
.build())
.build())
.setInternationalizedPostalInfo(
new PostalInfo.Builder()
.setType(PostalInfo.Type.INTERNATIONALIZED)
.setName("John Doe")
.setOrg("Example Inc.")
.setAddress(
new ContactAddress.Builder()
.setStreet(ImmutableList.of("123 Example Dr.", "Suite 100"))
.setCity("Dulles")
.setState("VA")
.setZip("20166-6503")
.setCountryCode("US")
.build())
.build())
.setVoiceNumber(
new ContactPhoneNumber.Builder()
.setPhoneNumber("+1.7035555555")
.setExtension("1234")
.build())
.setFaxNumber(new ContactPhoneNumber.Builder().setPhoneNumber("+1.7035555556").build())
.setEmailAddress("jdoe@example.com")
.setPersistedCurrentSponsorRegistrarId("TheRegistrar")
.setCreationRegistrarId("NewRegistrar")
.setLastEppUpdateRegistrarId("NewRegistrar")
.setCreationTimeForTest(DateTime.parse("1999-04-03T22:00:00.0Z"))
.setLastEppUpdateTime(DateTime.parse("1999-12-03T09:00:00.0Z"))
.setLastTransferTime(DateTime.parse("2000-04-08T09:00:00.0Z"))
.setAuthInfo(ContactAuthInfo.create(PasswordAuth.create("2fooBAR")))
.setDisclose(
new Disclose.Builder()
.setFlag(true)
.setVoice(new PresenceMarker())
.setEmail(new PresenceMarker())
.build())
.build();
@RegisterExtension
final JpaIntegrationTestExtension jpa =
new JpaTestExtensions.Builder().buildIntegrationTestExtension();
private final FakeClock clock = new FakeClock(DateTime.parse("2021-08-26T20:21:22Z"));
private FakeResponse response;
private WipeOutContactHistoryPiiAction action;
private final DateTime now = DateTime.parse("2019-01-19T01:02:03Z");
private final FakeClock clock = new FakeClock(now);
private final Map<String, String> expectedParameters = new HashMap<>();
private final ArgumentCaptor<LaunchFlexTemplateRequest> launchRequest =
ArgumentCaptor.forClass(LaunchFlexTemplateRequest.class);
private WipeOutContactHistoryPiiAction action =
new WipeOutContactHistoryPiiAction(
clock,
false,
Optional.empty(),
8,
"tucketBucket",
"testProject",
"testRegion",
dataflow,
response);
@BeforeEach
void beforeEach() {
response = new FakeResponse();
void before() {
expectedParameters.put("registryEnvironment", "UNITTEST");
expectedParameters.put("isDryRun", "false");
expectedParameters.put("cutoffTime", "2018-05-19T01:02:03.000Z");
}
@Test
void testSuccess() throws Exception {
action.run();
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getPayload())
.isEqualTo("Launched contact history PII wipeout pipeline: jobid");
verify(templates, times(1))
.launch(eq("testProject"), eq("testRegion"), launchRequest.capture());
assertThat(launchRequest.getValue().getLaunchParameter().getParameters())
.containsExactlyEntriesIn(expectedParameters);
}
@Test
void testSuccess_providedCutoffTime() throws Exception {
action =
new WipeOutContactHistoryPiiAction(
clock, MIN_MONTHS_BEFORE_WIPE_OUT, TEST_BATCH_SIZE, response);
}
@Test
void getAllHistoryEntitiesOlderThan_returnsAllPersistedEntities() {
ImmutableList<ContactHistory> expectedToBeWipedOut =
persistLotsOfContactHistoryEntities(20, MIN_MONTHS_BEFORE_WIPE_OUT + 1, 0, DEFAULT_CONTACT);
tm().transact(
() ->
assertThat(
action.getNextContactHistoryEntitiesWithPiiBatch(
clock.nowUtc().minusMonths(MIN_MONTHS_BEFORE_WIPE_OUT)))
.containsExactlyElementsIn(expectedToBeWipedOut));
}
@Test
void getAllHistoryEntitiesOlderThan_returnsOnlyOldEnoughPersistedEntities() {
ImmutableList<ContactHistory> expectedToBeWipedOut =
persistLotsOfContactHistoryEntities(19, MIN_MONTHS_BEFORE_WIPE_OUT + 2, 0, DEFAULT_CONTACT);
// persisted entities that should not be part of the actual result
persistLotsOfContactHistoryEntities(15, 17, MIN_MONTHS_BEFORE_WIPE_OUT - 1, DEFAULT_CONTACT);
tm().transact(
() ->
assertThat(
action.getNextContactHistoryEntitiesWithPiiBatch(
clock.nowUtc().minusMonths(MIN_MONTHS_BEFORE_WIPE_OUT)))
.containsExactlyElementsIn(expectedToBeWipedOut));
}
@Test
void run_withNoEntitiesToWipeOut_success() {
assertThat(
tm().transact(
() ->
action
.getNextContactHistoryEntitiesWithPiiBatch(
clock.nowUtc().minusMonths(MIN_MONTHS_BEFORE_WIPE_OUT))
.count()))
.isEqualTo(0);
clock,
false,
Optional.of(now.minusYears(1)),
8,
"tucketBucket",
"testProject",
"testRegion",
dataflow,
response);
action.run();
assertThat(
tm().transact(
() ->
action
.getNextContactHistoryEntitiesWithPiiBatch(
clock.nowUtc().minusMonths(MIN_MONTHS_BEFORE_WIPE_OUT))
.count()))
.isEqualTo(0);
assertThat(response.getStatus()).isEqualTo(SC_OK);
expectedParameters.put("cutoffTime", "2018-01-19T01:02:03.000Z");
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getPayload())
.isEqualTo("Done. Wiped out PII of 0 ContactHistory entities in total.");
.isEqualTo("Launched contact history PII wipeout pipeline: jobid");
verify(templates, times(1))
.launch(eq("testProject"), eq("testRegion"), launchRequest.capture());
assertThat(launchRequest.getValue().getLaunchParameter().getParameters())
.containsExactlyEntriesIn(expectedParameters);
}
@Test
void run_withOneBatchOfEntities_success() {
int numOfMonthsFromNow = MIN_MONTHS_BEFORE_WIPE_OUT + 2;
ImmutableList<ContactHistory> expectedToBeWipedOut =
persistLotsOfContactHistoryEntities(20, numOfMonthsFromNow, 0, DEFAULT_CONTACT);
// The query should return a stream of all persisted entities.
assertThat(
tm().transact(
() ->
action
.getNextContactHistoryEntitiesWithPiiBatch(
clock.nowUtc().minusMonths(MIN_MONTHS_BEFORE_WIPE_OUT))
.count()))
.isEqualTo(expectedToBeWipedOut.size());
assertAllEntitiesContainPii(DatabaseHelper.loadByEntitiesIfPresent(expectedToBeWipedOut));
void testSuccess_dryRun() throws Exception {
action =
new WipeOutContactHistoryPiiAction(
clock,
true,
Optional.empty(),
8,
"tucketBucket",
"testProject",
"testRegion",
dataflow,
response);
action.run();
expectedParameters.put("isDryRun", "true");
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getPayload())
.isEqualTo("Done. Wiped out PII of 20 ContactHistory entities in total.");
// The query should return an empty stream after the wipe out action.
assertThat(
tm().transact(
() ->
action
.getNextContactHistoryEntitiesWithPiiBatch(
clock.nowUtc().minusMonths(MIN_MONTHS_BEFORE_WIPE_OUT))
.count()))
.isEqualTo(0);
assertAllPiiFieldsAreWipedOut(DatabaseHelper.loadByEntitiesIfPresent(expectedToBeWipedOut));
.isEqualTo("Launched contact history PII wipeout pipeline: jobid");
verify(templates, times(1))
.launch(eq("testProject"), eq("testRegion"), launchRequest.capture());
assertThat(launchRequest.getValue().getLaunchParameter().getParameters())
.containsExactlyEntriesIn(expectedParameters);
}
@Test
void run_withMultipleBatches_numOfEntitiesAsNonMultipleOfBatchSize_success() {
int numOfMonthsFromNow = MIN_MONTHS_BEFORE_WIPE_OUT + 2;
ImmutableList<ContactHistory> expectedToBeWipedOut =
persistLotsOfContactHistoryEntities(56, numOfMonthsFromNow, 0, DEFAULT_CONTACT);
// The query should return a subset of all persisted data.
assertThat(
tm().transact(
() ->
action
.getNextContactHistoryEntitiesWithPiiBatch(
clock.nowUtc().minusMonths(MIN_MONTHS_BEFORE_WIPE_OUT))
.count()))
.isEqualTo(TEST_BATCH_SIZE);
assertAllEntitiesContainPii(DatabaseHelper.loadByEntitiesIfPresent(expectedToBeWipedOut));
void testFailure_launchError() throws Exception {
when(launch.execute()).thenThrow(new IOException("cannot launch"));
action.run();
assertThat(response.getPayload())
.isEqualTo("Done. Wiped out PII of 56 ContactHistory entities in total.");
// The query should return an empty stream after the wipe out action.
assertThat(
tm().transact(
() ->
action
.getNextContactHistoryEntitiesWithPiiBatch(
clock.nowUtc().minusMonths(MIN_MONTHS_BEFORE_WIPE_OUT))
.count()))
.isEqualTo(0);
assertAllPiiFieldsAreWipedOut(DatabaseHelper.loadByEntitiesIfPresent(expectedToBeWipedOut));
}
@Test
void run_withMultipleBatches_numOfEntitiesAsMultiplesOfBatchSize_success() {
int numOfMonthsFromNow = MIN_MONTHS_BEFORE_WIPE_OUT + 2;
ImmutableList<ContactHistory> expectedToBeWipedOut =
persistLotsOfContactHistoryEntities(
TEST_BATCH_SIZE * 2, numOfMonthsFromNow, 0, DEFAULT_CONTACT);
// The query should return a subset of all persisted data.
assertThat(
tm().transact(
() ->
action
.getNextContactHistoryEntitiesWithPiiBatch(
clock.nowUtc().minusMonths(MIN_MONTHS_BEFORE_WIPE_OUT))
.count()))
.isEqualTo(TEST_BATCH_SIZE);
assertAllEntitiesContainPii(DatabaseHelper.loadByEntitiesIfPresent(expectedToBeWipedOut));
action.run();
assertThat(response.getPayload())
.isEqualTo("Done. Wiped out PII of 40 ContactHistory entities in total.");
// The query should return an empty stream after the wipe out action.
assertThat(
tm().transact(
() ->
action
.getNextContactHistoryEntitiesWithPiiBatch(
clock.nowUtc().minusMonths(MIN_MONTHS_BEFORE_WIPE_OUT))
.count()))
.isEqualTo(0);
assertAllPiiFieldsAreWipedOut(DatabaseHelper.loadByEntitiesIfPresent(expectedToBeWipedOut));
}
@Test
void wipeOutContactHistoryData_wipesOutNoEntity() {
tm().transact(
() ->
assertThat(
action.wipeOutContactHistoryData(
action.getNextContactHistoryEntitiesWithPiiBatch(
clock.nowUtc().minusMonths(MIN_MONTHS_BEFORE_WIPE_OUT))))
.isEqualTo(0));
}
@Test
void wipeOutContactHistoryData_wipesOutMultipleEntities() {
int numOfMonthsFromNow = MIN_MONTHS_BEFORE_WIPE_OUT + 3;
ImmutableList<ContactHistory> expectedToBeWipedOut =
persistLotsOfContactHistoryEntities(20, numOfMonthsFromNow, 0, DEFAULT_CONTACT);
assertAllEntitiesContainPii(DatabaseHelper.loadByEntitiesIfPresent(expectedToBeWipedOut));
tm().transact(
() -> {
action.wipeOutContactHistoryData(
action.getNextContactHistoryEntitiesWithPiiBatch(
clock.nowUtc().minusMonths(MIN_MONTHS_BEFORE_WIPE_OUT)));
});
assertAllPiiFieldsAreWipedOut(DatabaseHelper.loadByEntitiesIfPresent(expectedToBeWipedOut));
}
/** persists a number of ContactHistory entities for load and query testing. */
ImmutableList<ContactHistory> persistLotsOfContactHistoryEntities(
int numOfEntities, int minusMonths, int minusDays, Contact contact) {
ImmutableList.Builder<ContactHistory> expectedEntitiesBuilder = new ImmutableList.Builder<>();
for (int i = 0; i < numOfEntities; i++) {
expectedEntitiesBuilder.add(
persistResource(
new ContactHistory()
.asBuilder()
.setRegistrarId("NewRegistrar")
.setModificationTime(clock.nowUtc().minusMonths(minusMonths).minusDays(minusDays))
.setType(ContactHistory.Type.CONTACT_DELETE)
.setContact(persistResource(contact))
.build()));
}
return expectedEntitiesBuilder.build();
}
boolean areAllPiiFieldsWiped(ContactBase contactBase) {
return contactBase.getEmailAddress() == null
&& contactBase.getFaxNumber() == null
&& contactBase.getInternationalizedPostalInfo() == null
&& contactBase.getLocalizedPostalInfo() == null
&& contactBase.getVoiceNumber() == null;
}
boolean containsPii(ContactBase contactBase) {
return contactBase.getEmailAddress() != null
|| contactBase.getFaxNumber() != null
|| contactBase.getInternationalizedPostalInfo() != null
|| contactBase.getLocalizedPostalInfo() != null
|| contactBase.getVoiceNumber() != null;
}
void assertAllPiiFieldsAreWipedOut(ImmutableList<ContactHistory> entities) {
ImmutableList.Builder<ContactHistory> notWipedEntities = new ImmutableList.Builder<>();
for (ContactHistory entity : entities) {
if (!areAllPiiFieldsWiped(entity.getContactBase().get())) {
notWipedEntities.add(entity);
}
}
assertWithMessage("Not all PII fields of the contact history entities were wiped.")
.that(notWipedEntities.build())
.isEmpty();
}
void assertAllEntitiesContainPii(ImmutableList<ContactHistory> entities) {
ImmutableList.Builder<ContactHistory> entitiesWithNoPii = new ImmutableList.Builder<>();
for (ContactHistory entity : entities) {
if (!containsPii(entity.getContactBase().get())) {
entitiesWithNoPii.add(entity);
}
}
assertWithMessage("Not all contact history entities contain PII.")
.that(entitiesWithNoPii.build())
.isEmpty();
assertThat(response.getStatus()).isEqualTo(500);
assertThat(response.getPayload()).isEqualTo("Pipeline launch failed: cannot launch");
verify(templates, times(1))
.launch(eq("testProject"), eq("testRegion"), launchRequest.capture());
assertThat(launchRequest.getValue().getLaunchParameter().getParameters())
.containsExactlyEntriesIn(expectedParameters);
}
}

View file

@ -0,0 +1,196 @@
// Copyright 2023 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.beam.wipeout;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.model.reporting.HistoryEntry.Type.CONTACT_CREATE;
import static google.registry.persistence.PersistenceModule.TransactionIsolationLevel.TRANSACTION_REPEATABLE_READ;
import static google.registry.testing.DatabaseHelper.loadAllOf;
import static google.registry.testing.DatabaseHelper.newContact;
import static google.registry.testing.DatabaseHelper.persistResource;
import static org.hibernate.cfg.AvailableSettings.ISOLATION;
import com.google.common.collect.ImmutableList;
import google.registry.beam.TestPipelineExtension;
import google.registry.model.contact.Contact;
import google.registry.model.contact.ContactHistory;
import google.registry.model.contact.ContactPhoneNumber;
import google.registry.model.reporting.HistoryEntryDao;
import google.registry.persistence.transaction.JpaTestExtensions;
import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension;
import google.registry.testing.FakeClock;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
/** Unit tests for {@link WipeOutContactHistoryPiiPipeline}. */
public class WipeOutContactHistoryPiiPipelineTest {
private static final int MIN_AGE_IN_MONTHS = 18;
private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
private final FakeClock clock = new FakeClock(DateTime.parse("2020-02-02T12:34:56Z"));
private final WipeOutContactHistoryPiiPipelineOptions options =
PipelineOptionsFactory.create().as(WipeOutContactHistoryPiiPipelineOptions.class);
private Contact contact1;
private Contact contact2;
@RegisterExtension
final JpaIntegrationTestExtension jpa =
new JpaTestExtensions.Builder()
.withClock(clock)
.withProperty(ISOLATION, TRANSACTION_REPEATABLE_READ.name())
.buildIntegrationTestExtension();
@RegisterExtension
final TestPipelineExtension pipeline =
TestPipelineExtension.create().enableAbandonedNodeEnforcement(true);
@BeforeEach
void beforeEach() {
contact1 =
persistResource(
newContact("my-contact1")
.asBuilder()
.setEmailAddress("test@example.com")
.setFaxNumber(
new ContactPhoneNumber.Builder().setPhoneNumber("+12122122122").build())
.build());
contact2 =
persistResource(
newContact("my-contact2")
.asBuilder()
.setEmailAddress("test@example.tld")
.setVoiceNumber(
new ContactPhoneNumber.Builder().setPhoneNumber("+19177199177").build())
.build());
// T = 0 month;
persistResource(createHistory(contact1));
// T = 5 months;
advanceMonths(5);
persistResource(createHistory(contact2));
// T = 10 months;
advanceMonths(5);
persistResource(createHistory(contact1));
persistResource(createHistory(contact2));
// T = 20 months;
advanceMonths(10);
persistResource(createHistory(contact2));
// T = 30 months;
advanceMonths(10);
options.setCutoffTime(DATE_TIME_FORMATTER.print(clock.nowUtc().minusMonths(MIN_AGE_IN_MONTHS)));
}
@Test
void testSuccess() {
// Before the pipeline runs, every history entry should have an emali address.
assertThat(
loadAllOf(ContactHistory.class).stream()
.filter(e -> e.getContactBase().get().getEmailAddress() != null)
.count())
.isEqualTo(5);
// Before the pipeline runs, contact history for contact1 should have fax numbers.
ImmutableList<ContactHistory> histories =
HistoryEntryDao.loadHistoryObjectsForResource(contact1.createVKey(), ContactHistory.class);
assertThat(
histories.stream().filter(e -> e.getContactBase().get().getFaxNumber() != null).count())
.isEqualTo(2);
// Before the pipeline runs, contact history for contact2 should have voice numbers.
histories =
HistoryEntryDao.loadHistoryObjectsForResource(contact2.createVKey(), ContactHistory.class);
assertThat(
histories.stream()
.filter(e -> e.getContactBase().get().getVoiceNumber() != null)
.count())
.isEqualTo(3);
WipeOutContactHistoryPiiPipeline wipeOutContactHistoryPiiPipeline =
new WipeOutContactHistoryPiiPipeline(options);
wipeOutContactHistoryPiiPipeline.run(pipeline).waitUntilFinish();
histories =
HistoryEntryDao.loadHistoryObjectsForResource(contact1.createVKey(), ContactHistory.class);
assertThat(histories.size()).isEqualTo(2);
ImmutableList<ContactHistory> wipedEntries =
histories.stream()
.filter(e -> e.getContactBase().get().getEmailAddress() == null)
.collect(toImmutableList());
// Only the history entry at T = 10 is wiped. The one at T = 10 is over 18 months old, but it
// is the most recent entry, so it is kept.
assertThat(wipedEntries.size()).isEqualTo(1);
assertThat(wipedEntries.get(0).getContactBase().get().getFaxNumber()).isNull();
// With a new history entry at T = 30, the one at T = 10 is eligible for wipe out. Note the
// current time itself (therefore the cutoff time) has not changed.
persistResource(createHistory(contact1));
wipeOutContactHistoryPiiPipeline.run(pipeline).waitUntilFinish();
histories =
HistoryEntryDao.loadHistoryObjectsForResource(contact1.createVKey(), ContactHistory.class);
assertThat(histories.size()).isEqualTo(3);
wipedEntries =
histories.stream()
.filter(e -> e.getContactBase().get().getEmailAddress() == null)
.collect(toImmutableList());
assertThat(wipedEntries.size()).isEqualTo(2);
// Check that the pipeline deals with multiple contacts correctly.
histories =
HistoryEntryDao.loadHistoryObjectsForResource(contact2.createVKey(), ContactHistory.class);
assertThat(histories.size()).isEqualTo(3);
wipedEntries =
histories.stream()
.filter(e -> e.getContactBase().get().getEmailAddress() == null)
.collect(toImmutableList());
// Only the history entry at T = 10 is wiped. The one at T = 10 is over 18 months old, but it
// is the most recent entry, so it is kept.
assertThat(wipedEntries.size()).isEqualTo(2);
assertThat(wipedEntries.get(0).getContactBase().get().getVoiceNumber()).isNull();
assertThat(wipedEntries.get(1).getContactBase().get().getVoiceNumber()).isNull();
}
@Test
void testSuccess_dryRun() {
options.setIsDryRun(true);
WipeOutContactHistoryPiiPipeline wipeOutContactHistoryPiiPipeline =
new WipeOutContactHistoryPiiPipeline(options);
wipeOutContactHistoryPiiPipeline.run(pipeline).waitUntilFinish();
ImmutableList<ContactHistory> histories =
HistoryEntryDao.loadHistoryObjectsForResource(contact1.createVKey(), ContactHistory.class);
assertThat(histories.size()).isEqualTo(2);
assertThat(
histories.stream()
.filter(e -> e.getContactBase().get().getEmailAddress() == null)
.collect(toImmutableList()))
.isEmpty();
}
private ContactHistory createHistory(Contact contact) {
return new ContactHistory.Builder()
.setContact(contact)
.setType(CONTACT_CREATE)
.setRegistrarId("TheRegistrar")
.setModificationTime(clock.nowUtc())
.build();
}
private void advanceMonths(int months) {
DateTime now = clock.nowUtc();
DateTime next = now.plusMonths(months);
clock.advanceBy(new Duration(now, next));
}
}

View file

@ -97,7 +97,9 @@ steps:
google.registry.beam.rde.RdePipeline \
google/registry/beam/rde_pipeline_metadata.json \
google.registry.beam.resave.ResaveAllEppResourcesPipeline \
google/registry/beam/resave_all_epp_resources_pipeline_metadata.json
google/registry/beam/resave_all_epp_resources_pipeline_metadata.json \
google.registry.beam.wipeout.WipeOutContactHistoryPiiPipeline \
google/registry/beam/wipe_out_contact_history_pii_pipeline_metadata.json
# Tentatively build and publish Cloud SQL schema jar here, before schema release
# process is finalized. Also publish nomulus:core jars that are needed for
# server/schema compatibility tests.