Download scheduler for BSA (#2209)

* Add BSA download scheduler
This commit is contained in:
Weimin Yu 2023-11-17 16:15:14 -05:00 committed by GitHub
parent 85b588b51f
commit e42c11051e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 449 additions and 9 deletions

View file

@ -16,5 +16,31 @@ package google.registry.bsa;
/** The processing stages of a download. */
public enum DownloadStage {
DOWNLOAD;
/** Downloads BSA block list files. */
DOWNLOAD,
/** Generates block list diffs with the previous download. */
MAKE_DIFF,
/** Applies the label diffs to the database tables. */
APPLY_DIFF,
/**
* Makes a REST API call to BSA endpoint, declaring that processing starts for new orders in the
* diffs.
*/
START_UPLOADING,
/** Makes a REST API call to BSA endpoint, sending the domains that cannot be blocked. */
UPLOAD_DOMAINS_IN_USE,
/** Makes a REST API call to BSA endpoint, declaring the completion of order processing. */
FINISH_UPLOADING,
/** The terminal stage after processing succeeds. */
DONE,
/**
* The terminal stage indicating that the downloads are discarded because their checksums are the
* same as that of the previous download.
*/
NOP,
/**
* The terminal stage indicating that the downloads are not processed because their BSA-generated
* checksums do not match those calculated by us.
*/
CHECKSUMS_NOT_MATCH;
}

View file

@ -65,16 +65,20 @@ public class BsaDownload {
BsaDownload() {}
public long getJobId() {
long getJobId() {
return jobId;
}
DateTime getCreationTime() {
return creationTime.getTimestamp();
}
/**
* Returns the starting time of this job as a string, which can be used as folder name on GCS when
* storing download data.
*/
public String getJobName() {
return creationTime.getTimestamp().toString();
return getCreationTime().toString();
}
public DownloadStage getStage() {
@ -86,11 +90,7 @@ public class BsaDownload {
return this;
}
DateTime getCreationTime() {
return creationTime.getTimestamp();
}
BsaDownload setBlockListChecksums(ImmutableMap<BlockList, String> checksums) {
BsaDownload setChecksums(ImmutableMap<BlockList, String> checksums) {
blockListChecksums =
CSV_JOINER.withKeyValueSeparator("=").join(ImmutableSortedMap.copyOf(checksums));
return this;

View file

@ -0,0 +1,73 @@
// Copyright 2023 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.bsa.persistence;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
import google.registry.bsa.BlockList;
import google.registry.bsa.DownloadStage;
import java.util.Optional;
/** Information needed when handling a download from BSA. */
@AutoValue
public abstract class DownloadSchedule {
abstract long jobId();
public abstract String jobName();
public abstract DownloadStage stage();
/** The most recent job that ended in the {@code DONE} stage. */
public abstract Optional<CompletedJob> latestCompleted();
/**
* Returns true if download should be processed even if the checksums show that it has not changed
* from the previous one.
*/
abstract boolean alwaysDownload();
static DownloadSchedule of(BsaDownload currentJob) {
return new AutoValue_DownloadSchedule(
currentJob.getJobId(),
currentJob.getJobName(),
currentJob.getStage(),
Optional.empty(),
/* alwaysDownload= */ true);
}
static DownloadSchedule of(
BsaDownload currentJob, CompletedJob latestCompleted, boolean alwaysDownload) {
return new AutoValue_DownloadSchedule(
currentJob.getJobId(),
currentJob.getJobName(),
currentJob.getStage(),
Optional.of(latestCompleted),
/* alwaysDownload= */ alwaysDownload);
}
/** Information about a completed BSA download job. */
@AutoValue
public abstract static class CompletedJob {
public abstract String jobName();
public abstract ImmutableMap<BlockList, String> checksums();
static CompletedJob of(BsaDownload completedJob) {
return new AutoValue_DownloadSchedule_CompletedJob(
completedJob.getJobName(), completedJob.getChecksums());
}
}
}

View file

@ -0,0 +1,131 @@
// Copyright 2023 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.bsa.persistence;
import static com.google.common.base.Verify.verify;
import static google.registry.bsa.DownloadStage.CHECKSUMS_NOT_MATCH;
import static google.registry.bsa.DownloadStage.DONE;
import static google.registry.bsa.DownloadStage.NOP;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static org.joda.time.Duration.standardSeconds;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import google.registry.bsa.persistence.DownloadSchedule.CompletedJob;
import google.registry.util.Clock;
import java.util.Optional;
import javax.inject.Inject;
import org.joda.time.Duration;
/**
* Assigns work for each cron invocation of the BSA Download job.
*
* <p>The download job is invoked at a divisible fraction of the desired data freshness to
* accommodate potential retries. E.g., for 30-minute data freshness with up to two retries on
* error, the cron schedule for the job should be set to 10 minutes.
*
* <p>The processing of each BSA download progresses through multiple stages as described in {@code
* DownloadStage} until it reaches one of the terminal stages. Each stage is check-pointed on
* completion, therefore if an invocation fails mid-process, the next invocation will skip the
* completed stages. No new downloads will start as long as the most recent one is still being
* processed.
*
* <p>When a new download is scheduled, the block list checksums from the most recent completed job
* is included. If the new checksums match the previous ones, the download may be skipped and the
* job should terminate in the {@code NOP} stage. However, if the checksums have stayed unchanged
* for longer than the user-provided {@code maxNopInterval}, the download will be processed.
*
* <p>The BSA downloads contains server-provided checksums. If they do not match the checksums
* generated on Nomulus' side, the download is skipped and the job should terminate in the {@code
* CHECKSUMS_NOT_MATCH} stage.
*/
public final class DownloadScheduler {
/** Allows a new download to proceed if the cron job fires a little early due to NTP drift. */
private static final Duration CRON_JITTER = standardSeconds(5);
private final Duration downloadInterval;
private final Duration maxNopInterval;
private final Clock clock;
@Inject
DownloadScheduler(Duration downloadInterval, Duration maxNopInterval, Clock clock) {
this.downloadInterval = downloadInterval;
this.maxNopInterval = maxNopInterval;
this.clock = clock;
}
/**
* Returns a {@link DownloadSchedule} instance that describes the work to be performed by an
* invocation of the download action, if applicable; or {@link Optional#empty} when there is
* nothing to do.
*/
public Optional<DownloadSchedule> schedule() {
return tm().transact(
() -> {
ImmutableList<BsaDownload> recentJobs = loadRecentProcessedJobs();
if (recentJobs.isEmpty()) {
// No jobs initiated ever.
return Optional.of(scheduleNewJob(Optional.empty()));
}
BsaDownload mostRecent = recentJobs.get(0);
if (mostRecent.getStage().equals(DONE)) {
return isTimeAgain(mostRecent, downloadInterval)
? Optional.of(scheduleNewJob(Optional.of(mostRecent)))
: Optional.empty();
} else if (recentJobs.size() == 1) {
// First job ever, still in progress
return Optional.of(DownloadSchedule.of(recentJobs.get(0)));
} else {
// Job in progress, with completed previous jobs.
BsaDownload prev = recentJobs.get(1);
verify(prev.getStage().equals(DONE), "Unexpectedly found two ongoing jobs.");
return Optional.of(
DownloadSchedule.of(
mostRecent,
CompletedJob.of(prev),
isTimeAgain(mostRecent, maxNopInterval)));
}
});
}
private boolean isTimeAgain(BsaDownload mostRecent, Duration interval) {
return mostRecent.getCreationTime().plus(interval).minus(CRON_JITTER).isBefore(clock.nowUtc());
}
/**
* Adds a new {@link BsaDownload} to the database and returns a {@link DownloadSchedule} for it.
*/
private DownloadSchedule scheduleNewJob(Optional<BsaDownload> prevJob) {
BsaDownload job = new BsaDownload();
tm().insert(job);
return prevJob
.map(
prev ->
DownloadSchedule.of(job, CompletedJob.of(prev), isTimeAgain(prev, maxNopInterval)))
.orElseGet(() -> DownloadSchedule.of(job));
}
@VisibleForTesting
ImmutableList<BsaDownload> loadRecentProcessedJobs() {
return ImmutableList.copyOf(
tm().getEntityManager()
.createQuery(
"FROM BsaDownload WHERE stage NOT IN :nop_stages ORDER BY creationTime DESC")
.setParameter("nop_stages", ImmutableList.of(CHECKSUMS_NOT_MATCH, NOP))
.setMaxResults(2)
.getResultList());
}
}

View file

@ -59,7 +59,7 @@ public class BsaDownloadTest {
BsaDownload job = new BsaDownload();
assertThat(job.getChecksums()).isEmpty();
ImmutableMap<BlockList, String> checksums = ImmutableMap.of(BLOCK, "a", BLOCK_PLUS, "b");
job.setBlockListChecksums(checksums);
job.setChecksums(checksums);
assertThat(job.getChecksums()).isEqualTo(checksums);
assertThat(job.blockListChecksums).isEqualTo("BLOCK=a,BLOCK_PLUS=b");
}

View file

@ -0,0 +1,210 @@
// Copyright 2023 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.bsa.persistence;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth8.assertThat;
import static google.registry.bsa.DownloadStage.CHECKSUMS_NOT_MATCH;
import static google.registry.bsa.DownloadStage.DONE;
import static google.registry.bsa.DownloadStage.DOWNLOAD;
import static google.registry.bsa.DownloadStage.MAKE_DIFF;
import static google.registry.bsa.DownloadStage.NOP;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static org.joda.time.DateTimeZone.UTC;
import static org.joda.time.Duration.standardDays;
import static org.joda.time.Duration.standardMinutes;
import static org.joda.time.Duration.standardSeconds;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import google.registry.bsa.BlockList;
import google.registry.bsa.DownloadStage;
import google.registry.bsa.persistence.DownloadSchedule.CompletedJob;
import google.registry.persistence.transaction.JpaTestExtensions;
import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationWithCoverageExtension;
import google.registry.testing.FakeClock;
import java.util.Optional;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
/** Unit tests for {@link DownloadScheduler} */
public class DownloadSchedulerTest {
static final Duration DOWNLOAD_INTERVAL = standardMinutes(30);
static final Duration MAX_NOP_INTERVAL = standardDays(1);
protected FakeClock fakeClock = new FakeClock(DateTime.now(UTC));
@RegisterExtension
final JpaIntegrationWithCoverageExtension jpa =
new JpaTestExtensions.Builder().withClock(fakeClock).buildIntegrationWithCoverageExtension();
private DownloadScheduler scheduler;
@BeforeEach
void setup() {
scheduler = new DownloadScheduler(DOWNLOAD_INTERVAL, MAX_NOP_INTERVAL, fakeClock);
}
@AfterEach
void dbCheck() {
ImmutableSet<DownloadStage> terminalStages = ImmutableSet.of(DONE, NOP, CHECKSUMS_NOT_MATCH);
assertThat(
tm().transact(
() ->
tm().getEntityManager()
.createQuery("FROM BsaDownload", BsaDownload.class)
.getResultStream()
.filter(job -> !terminalStages.contains(job.getStage()))
.count()))
.isAtMost(1);
}
@Test
void firstJobEver() {
Optional<DownloadSchedule> scheduleOptional = scheduler.schedule();
assertThat(scheduleOptional).isPresent();
DownloadSchedule schedule = scheduleOptional.get();
assertThat(schedule.latestCompleted()).isEmpty();
assertThat(schedule.jobName()).isEqualTo(fakeClock.nowUtc().toString());
assertThat(schedule.stage()).isEqualTo(DownloadStage.DOWNLOAD);
assertThat(schedule.alwaysDownload()).isTrue();
}
@Test
void oneInProgressJob() {
BsaDownload inProgressJob = insertOneJobAndAdvanceClock(MAKE_DIFF);
Optional<DownloadSchedule> scheduleOptional = scheduler.schedule();
assertThat(scheduleOptional).isPresent();
DownloadSchedule schedule = scheduleOptional.get();
assertThat(schedule.jobId()).isEqualTo(inProgressJob.jobId);
assertThat(schedule.jobName()).isEqualTo(inProgressJob.getJobName());
assertThat(schedule.stage()).isEqualTo(MAKE_DIFF);
assertThat(schedule.latestCompleted()).isEmpty();
assertThat(schedule.alwaysDownload()).isTrue();
}
@Test
void oneInProgressJobOneCompletedJob() {
BsaDownload completed = insertOneJobAndAdvanceClock(DONE);
BsaDownload inProgressJob = insertOneJobAndAdvanceClock(MAKE_DIFF);
Optional<DownloadSchedule> scheduleOptional = scheduler.schedule();
assertThat(scheduleOptional).isPresent();
DownloadSchedule schedule = scheduleOptional.get();
assertThat(schedule.jobId()).isEqualTo(inProgressJob.jobId);
assertThat(schedule.jobName()).isEqualTo(inProgressJob.getJobName());
assertThat(schedule.stage()).isEqualTo(MAKE_DIFF);
assertThat(schedule.alwaysDownload()).isFalse();
assertThat(schedule.latestCompleted()).isPresent();
CompletedJob lastCompleted = schedule.latestCompleted().get();
assertThat(lastCompleted.jobName()).isEqualTo(completed.getJobName());
assertThat(lastCompleted.checksums()).isEqualTo(completed.getChecksums());
}
@Test
void doneJob_noNewSchedule() {
insertOneJobAndAdvanceClock(DONE);
assertThat(scheduler.schedule()).isEmpty();
}
@Test
void doneJob_newSchedule() {
BsaDownload completed = insertOneJobAndAdvanceClock(DONE);
fakeClock.advanceBy(DOWNLOAD_INTERVAL);
Optional<DownloadSchedule> scheduleOptional = scheduler.schedule();
assertThat(scheduleOptional).isPresent();
DownloadSchedule schedule = scheduleOptional.get();
assertThat(schedule.stage()).isEqualTo(DOWNLOAD);
assertThat(schedule.alwaysDownload()).isFalse();
assertThat(schedule.latestCompleted()).isPresent();
CompletedJob completedJob = schedule.latestCompleted().get();
assertThat(completedJob.jobName()).isEqualTo(completed.getJobName());
assertThat(completedJob.checksums()).isEqualTo(completedJob.checksums());
}
@Test
void doneJob_newSchedule_alwaysDownload() {
insertOneJobAndAdvanceClock(DONE);
fakeClock.advanceBy(MAX_NOP_INTERVAL);
Optional<DownloadSchedule> scheduleOptional = scheduler.schedule();
assertThat(scheduleOptional).isPresent();
DownloadSchedule schedule = scheduleOptional.get();
assertThat(schedule.alwaysDownload()).isTrue();
}
@Test
void doneJob_cronEarlyWithJitter_newSchedule() {
insertOneJobAndAdvanceClock(DONE);
fakeClock.advanceBy(DOWNLOAD_INTERVAL.minus(standardSeconds(5)));
assertThat(scheduler.schedule()).isPresent();
}
@Test
void doneJob_cronEarlyMoreThanJitter_newSchedule() {
insertOneJobAndAdvanceClock(DONE);
fakeClock.advanceBy(DOWNLOAD_INTERVAL.minus(standardSeconds(6)));
assertThat(scheduler.schedule()).isEmpty();
}
@Test
void loadRecentProcessedJobs_noneExists() {
assertThat(tm().transact(() -> scheduler.loadRecentProcessedJobs())).isEmpty();
}
@Test
void loadRecentProcessedJobs_nopJobsOnly() {
insertOneJobAndAdvanceClock(DownloadStage.NOP);
insertOneJobAndAdvanceClock(DownloadStage.CHECKSUMS_NOT_MATCH);
assertThat(tm().transact(() -> scheduler.loadRecentProcessedJobs())).isEmpty();
}
@Test
void loadRecentProcessedJobs_oneInProgressJob() {
BsaDownload job = insertOneJobAndAdvanceClock(MAKE_DIFF);
assertThat(tm().transact(() -> scheduler.loadRecentProcessedJobs())).containsExactly(job);
}
@Test
void loadRecentProcessedJobs_oneDoneJob() {
BsaDownload job = insertOneJobAndAdvanceClock(DONE);
assertThat(tm().transact(() -> scheduler.loadRecentProcessedJobs())).containsExactly(job);
}
@Test
void loadRecentProcessedJobs_multipleJobs() {
insertOneJobAndAdvanceClock(DownloadStage.DONE);
insertOneJobAndAdvanceClock(DownloadStage.DONE);
BsaDownload completed = insertOneJobAndAdvanceClock(DownloadStage.DONE);
insertOneJobAndAdvanceClock(DownloadStage.NOP);
insertOneJobAndAdvanceClock(DownloadStage.CHECKSUMS_NOT_MATCH);
BsaDownload inprogress = insertOneJobAndAdvanceClock(DownloadStage.APPLY_DIFF);
assertThat(tm().transact(() -> scheduler.loadRecentProcessedJobs()))
.containsExactly(inprogress, completed)
.inOrder();
}
private BsaDownload insertOneJobAndAdvanceClock(DownloadStage stage) {
BsaDownload job = new BsaDownload();
job.setStage(stage);
job.setChecksums(ImmutableMap.of(BlockList.BLOCK, "1", BlockList.BLOCK_PLUS, "2"));
tm().transact(() -> tm().insert(job));
fakeClock.advanceOneMilli();
return job;
}
}