Add batching to BSA unavailable domains list generation (#2282)

This also moves it back to the replica transaction manager now that it shouldn't be timing
out its queries.

And this adds a test as well (more to come!).
This commit is contained in:
Ben McIlwain 2024-01-19 14:58:09 -05:00 committed by GitHub
parent 2cf2d7e7b1
commit c414e38a98
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 165 additions and 25 deletions

View file

@ -14,13 +14,13 @@
package google.registry.bsa;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getLast;
import static google.registry.model.tld.Tld.isEnrolledWithBsa;
import static google.registry.model.tld.Tlds.getTldEntitiesOfType;
import static google.registry.model.tld.label.ReservedList.loadReservedLists;
import static google.registry.persistence.PersistenceModule.TransactionIsolationLevel.TRANSACTION_REPEATABLE_READ;
import static google.registry.persistence.transaction.TransactionManagerFactory.replicaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.request.Action.Method.GET;
import static google.registry.request.Action.Method.POST;
import static java.nio.charset.StandardCharsets.US_ASCII;
@ -28,6 +28,7 @@ import static java.nio.charset.StandardCharsets.US_ASCII;
import com.google.api.client.http.HttpStatusCodes;
import com.google.cloud.storage.BlobId;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Ordering;
@ -49,8 +50,10 @@ import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Optional;
import java.util.zip.GZIPOutputStream;
import javax.inject.Inject;
import javax.persistence.TypedQuery;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.OkHttpClient;
@ -77,6 +80,8 @@ public class UploadBsaUnavailableDomainsAction implements Runnable {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final int BATCH_SIZE = 50000;
Clock clock;
BsaCredential bsaCredential;
@ -110,10 +115,7 @@ public class UploadBsaUnavailableDomainsAction implements Runnable {
// TODO(mcilwain): Implement a date Cursor, have the cronjob run frequently, and short-circuit
// the run if the daily upload is already completed.
DateTime runTime = clock.nowUtc();
// TODO(mcilwain): Batch this.
String unavailableDomains =
Joiner.on("\n")
.join(tm().transact(() -> getUnavailableDomains(runTime), TRANSACTION_REPEATABLE_READ));
String unavailableDomains = Joiner.on("\n").join(getUnavailableDomains(runTime));
if (unavailableDomains.isEmpty()) {
logger.atWarning().log("No unavailable domains found; terminating.");
} else {
@ -193,6 +195,7 @@ public class UploadBsaUnavailableDomainsAction implements Runnable {
}
private ImmutableSortedSet<String> getUnavailableDomains(DateTime runTime) {
// Get list of TLDs to process.
ImmutableSet<Tld> bsaEnabledTlds =
getTldEntitiesOfType(TldType.REAL).stream()
.filter(tld -> isEnrolledWithBsa(tld, runTime))
@ -204,26 +207,56 @@ public class UploadBsaUnavailableDomainsAction implements Runnable {
ImmutableSortedSet.Builder<String> unavailableDomains =
new ImmutableSortedSet.Builder<>(Ordering.natural());
for (Tld tld : bsaEnabledTlds) {
for (ReservedList reservedList : loadReservedLists(tld.getReservedListNames())) {
unavailableDomains.addAll(
reservedList.getReservedListEntries().keySet().stream()
.map(label -> toDomain(label, tld))
.collect(toImmutableSet()));
}
}
unavailableDomains.addAll(
replicaTm()
.query(
"SELECT domainName FROM Domain "
+ "WHERE tld IN :tlds "
+ "AND deletionTime > :now ",
String.class)
.setParameter(
"tlds", bsaEnabledTlds.stream().map(Tld::getTldStr).collect(toImmutableSet()))
.setParameter("now", runTime)
.getResultList());
// Add domains on reserved lists to unavailable names list.
replicaTm()
.transact(
() -> {
for (Tld tld : bsaEnabledTlds) {
for (ReservedList reservedList : loadReservedLists(tld.getReservedListNames())) {
unavailableDomains.addAll(
reservedList.getReservedListEntries().keySet().stream()
.map(label -> toDomain(label, tld))
.collect(toImmutableSet()));
}
}
});
// Add existing domains to unavailable names list, in batches so as to not time out on replica.
ImmutableSet<String> tldNames =
bsaEnabledTlds.stream().map(Tld::getTldStr).collect(toImmutableSet());
ImmutableList<String> domainsBatch;
Optional<String> lastDomain = Optional.empty();
do {
final Optional<String> lastDomainCopy = lastDomain;
domainsBatch =
replicaTm()
.transact(
() -> {
String sql =
String.format(
"SELECT domainName FROM Domain "
+ "WHERE tld IN :tlds "
+ "AND deletionTime > :now "
+ "%s ORDER BY domainName ASC",
lastDomainCopy.isPresent()
? "AND domainName > :lastInPreviousBatch"
: "");
TypedQuery<String> query =
replicaTm()
.query(sql, String.class)
.setParameter("tlds", tldNames)
.setParameter("now", runTime);
lastDomainCopy.ifPresent(l -> query.setParameter("lastInPreviousBatch", l));
return query
.setMaxResults(BATCH_SIZE)
.getResultStream()
.collect(toImmutableList());
});
unavailableDomains.addAll(domainsBatch);
lastDomain = Optional.ofNullable(domainsBatch.isEmpty() ? null : getLast(domainsBatch));
} while (domainsBatch.size() == BATCH_SIZE);
ImmutableSortedSet<String> result = unavailableDomains.build();
logger.atInfo().log("Found %d total unavailable domains.", result.size());
return result;

View file

@ -0,0 +1,107 @@
// Copyright 2024 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.bsa;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.testing.DatabaseHelper.createTld;
import static google.registry.testing.DatabaseHelper.persistActiveDomain;
import static google.registry.testing.DatabaseHelper.persistDeletedDomain;
import static google.registry.testing.DatabaseHelper.persistReservedList;
import static google.registry.testing.DatabaseHelper.persistResource;
import static google.registry.util.DateTimeUtils.START_OF_TIME;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.contrib.nio.testing.LocalStorageHelper;
import google.registry.bsa.api.BsaCredential;
import google.registry.gcs.GcsUtils;
import google.registry.model.tld.Tld;
import google.registry.model.tld.Tld.TldType;
import google.registry.model.tld.label.ReservedList;
import google.registry.persistence.transaction.JpaTestExtensions;
import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension;
import google.registry.request.UrlConnectionService;
import google.registry.testing.FakeClock;
import google.registry.testing.FakeResponse;
import java.util.Optional;
import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
/** Unit tests for {@link UploadBsaUnavailableDomainsAction}. */
@ExtendWith(MockitoExtension.class)
public class UploadBsaUnavailableDomainsActionTest {
private final FakeClock clock = new FakeClock(DateTime.parse("2024-02-02T02:02:02Z"));
@RegisterExtension
final JpaIntegrationTestExtension jpa =
new JpaTestExtensions.Builder().withClock(clock).buildIntegrationTestExtension();
private UploadBsaUnavailableDomainsAction action;
@Mock UrlConnectionService connectionService;
@Mock BsaCredential bsaCredential;
private final GcsUtils gcsUtils = new GcsUtils(LocalStorageHelper.getOptions());
private final String BUCKET = "domain-registry-bsa";
private final String API_URL = "https://upload.test/bsa";
private final FakeResponse response = new FakeResponse();
@BeforeEach
void beforeEach() {
ReservedList reservedList =
persistReservedList(
"tld-reserved_list",
true,
"tine,FULLY_BLOCKED",
"flagrant,NAME_COLLISION",
"jimmy,RESERVED_FOR_SPECIFIC_USE");
createTld("tld");
persistResource(
Tld.get("tld")
.asBuilder()
.setReservedLists(reservedList)
.setBsaEnrollStartTime(Optional.of(START_OF_TIME))
.setTldType(TldType.REAL)
.build());
action =
new UploadBsaUnavailableDomainsAction(
clock, bsaCredential, gcsUtils, BUCKET, API_URL, response);
}
@Test
void calculatesEntriesCorrectly() throws Exception {
persistActiveDomain("foobar.tld");
persistActiveDomain("ace.tld");
persistDeletedDomain("not-blocked.tld", clock.nowUtc().minusDays(1));
action.run();
BlobId existingFile =
BlobId.of(BUCKET, String.format("unavailable_domains_%s.txt", clock.nowUtc()));
String blockList = new String(gcsUtils.readBytesFrom(existingFile), UTF_8);
assertThat(blockList).isEqualTo("ace.tld\nflagrant.tld\nfoobar.tld\njimmy.tld\ntine.tld");
assertThat(blockList).doesNotContain("not-blocked.tld");
// TODO(mcilwain): Add test of BSA API upload as well.
}
}