diff --git a/core/src/main/java/google/registry/bsa/DownloadStage.java b/core/src/main/java/google/registry/bsa/DownloadStage.java index 7e196b0de..7cc29fbed 100644 --- a/core/src/main/java/google/registry/bsa/DownloadStage.java +++ b/core/src/main/java/google/registry/bsa/DownloadStage.java @@ -16,5 +16,31 @@ package google.registry.bsa; /** The processing stages of a download. */ public enum DownloadStage { - DOWNLOAD; + /** Downloads BSA block list files. */ + DOWNLOAD, + /** Generates block list diffs with the previous download. */ + MAKE_DIFF, + /** Applies the label diffs to the database tables. */ + APPLY_DIFF, + /** + * Makes a REST API call to BSA endpoint, declaring that processing starts for new orders in the + * diffs. + */ + START_UPLOADING, + /** Makes a REST API call to BSA endpoint, sending the domains that cannot be blocked. */ + UPLOAD_DOMAINS_IN_USE, + /** Makes a REST API call to BSA endpoint, declaring the completion of order processing. */ + FINISH_UPLOADING, + /** The terminal stage after processing succeeds. */ + DONE, + /** + * The terminal stage indicating that the downloads are discarded because their checksums are the + * same as that of the previous download. + */ + NOP, + /** + * The terminal stage indicating that the downloads are not processed because their BSA-generated + * checksums do not match those calculated by us. + */ + CHECKSUMS_NOT_MATCH; } diff --git a/core/src/main/java/google/registry/bsa/persistence/BsaDownload.java b/core/src/main/java/google/registry/bsa/persistence/BsaDownload.java index f2307dce5..c19ecc4c8 100644 --- a/core/src/main/java/google/registry/bsa/persistence/BsaDownload.java +++ b/core/src/main/java/google/registry/bsa/persistence/BsaDownload.java @@ -65,16 +65,20 @@ public class BsaDownload { BsaDownload() {} - public long getJobId() { + long getJobId() { return jobId; } + DateTime getCreationTime() { + return creationTime.getTimestamp(); + } + /** * Returns the starting time of this job as a string, which can be used as folder name on GCS when * storing download data. */ public String getJobName() { - return creationTime.getTimestamp().toString(); + return getCreationTime().toString(); } public DownloadStage getStage() { @@ -86,11 +90,7 @@ public class BsaDownload { return this; } - DateTime getCreationTime() { - return creationTime.getTimestamp(); - } - - BsaDownload setBlockListChecksums(ImmutableMap checksums) { + BsaDownload setChecksums(ImmutableMap checksums) { blockListChecksums = CSV_JOINER.withKeyValueSeparator("=").join(ImmutableSortedMap.copyOf(checksums)); return this; diff --git a/core/src/main/java/google/registry/bsa/persistence/DownloadSchedule.java b/core/src/main/java/google/registry/bsa/persistence/DownloadSchedule.java new file mode 100644 index 000000000..1ac84db46 --- /dev/null +++ b/core/src/main/java/google/registry/bsa/persistence/DownloadSchedule.java @@ -0,0 +1,73 @@ +// Copyright 2023 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.bsa.persistence; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableMap; +import google.registry.bsa.BlockList; +import google.registry.bsa.DownloadStage; +import java.util.Optional; + +/** Information needed when handling a download from BSA. */ +@AutoValue +public abstract class DownloadSchedule { + + abstract long jobId(); + + public abstract String jobName(); + + public abstract DownloadStage stage(); + + /** The most recent job that ended in the {@code DONE} stage. */ + public abstract Optional latestCompleted(); + + /** + * Returns true if download should be processed even if the checksums show that it has not changed + * from the previous one. + */ + abstract boolean alwaysDownload(); + + static DownloadSchedule of(BsaDownload currentJob) { + return new AutoValue_DownloadSchedule( + currentJob.getJobId(), + currentJob.getJobName(), + currentJob.getStage(), + Optional.empty(), + /* alwaysDownload= */ true); + } + + static DownloadSchedule of( + BsaDownload currentJob, CompletedJob latestCompleted, boolean alwaysDownload) { + return new AutoValue_DownloadSchedule( + currentJob.getJobId(), + currentJob.getJobName(), + currentJob.getStage(), + Optional.of(latestCompleted), + /* alwaysDownload= */ alwaysDownload); + } + + /** Information about a completed BSA download job. */ + @AutoValue + public abstract static class CompletedJob { + public abstract String jobName(); + + public abstract ImmutableMap checksums(); + + static CompletedJob of(BsaDownload completedJob) { + return new AutoValue_DownloadSchedule_CompletedJob( + completedJob.getJobName(), completedJob.getChecksums()); + } + } +} diff --git a/core/src/main/java/google/registry/bsa/persistence/DownloadScheduler.java b/core/src/main/java/google/registry/bsa/persistence/DownloadScheduler.java new file mode 100644 index 000000000..229f24de7 --- /dev/null +++ b/core/src/main/java/google/registry/bsa/persistence/DownloadScheduler.java @@ -0,0 +1,131 @@ +// Copyright 2023 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.bsa.persistence; + +import static com.google.common.base.Verify.verify; +import static google.registry.bsa.DownloadStage.CHECKSUMS_NOT_MATCH; +import static google.registry.bsa.DownloadStage.DONE; +import static google.registry.bsa.DownloadStage.NOP; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static org.joda.time.Duration.standardSeconds; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import google.registry.bsa.persistence.DownloadSchedule.CompletedJob; +import google.registry.util.Clock; +import java.util.Optional; +import javax.inject.Inject; +import org.joda.time.Duration; + +/** + * Assigns work for each cron invocation of the BSA Download job. + * + *

The download job is invoked at a divisible fraction of the desired data freshness to + * accommodate potential retries. E.g., for 30-minute data freshness with up to two retries on + * error, the cron schedule for the job should be set to 10 minutes. + * + *

The processing of each BSA download progresses through multiple stages as described in {@code + * DownloadStage} until it reaches one of the terminal stages. Each stage is check-pointed on + * completion, therefore if an invocation fails mid-process, the next invocation will skip the + * completed stages. No new downloads will start as long as the most recent one is still being + * processed. + * + *

When a new download is scheduled, the block list checksums from the most recent completed job + * is included. If the new checksums match the previous ones, the download may be skipped and the + * job should terminate in the {@code NOP} stage. However, if the checksums have stayed unchanged + * for longer than the user-provided {@code maxNopInterval}, the download will be processed. + * + *

The BSA downloads contains server-provided checksums. If they do not match the checksums + * generated on Nomulus' side, the download is skipped and the job should terminate in the {@code + * CHECKSUMS_NOT_MATCH} stage. + */ +public final class DownloadScheduler { + + /** Allows a new download to proceed if the cron job fires a little early due to NTP drift. */ + private static final Duration CRON_JITTER = standardSeconds(5); + + private final Duration downloadInterval; + private final Duration maxNopInterval; + private final Clock clock; + + @Inject + DownloadScheduler(Duration downloadInterval, Duration maxNopInterval, Clock clock) { + this.downloadInterval = downloadInterval; + this.maxNopInterval = maxNopInterval; + this.clock = clock; + } + + /** + * Returns a {@link DownloadSchedule} instance that describes the work to be performed by an + * invocation of the download action, if applicable; or {@link Optional#empty} when there is + * nothing to do. + */ + public Optional schedule() { + return tm().transact( + () -> { + ImmutableList recentJobs = loadRecentProcessedJobs(); + if (recentJobs.isEmpty()) { + // No jobs initiated ever. + return Optional.of(scheduleNewJob(Optional.empty())); + } + BsaDownload mostRecent = recentJobs.get(0); + if (mostRecent.getStage().equals(DONE)) { + return isTimeAgain(mostRecent, downloadInterval) + ? Optional.of(scheduleNewJob(Optional.of(mostRecent))) + : Optional.empty(); + } else if (recentJobs.size() == 1) { + // First job ever, still in progress + return Optional.of(DownloadSchedule.of(recentJobs.get(0))); + } else { + // Job in progress, with completed previous jobs. + BsaDownload prev = recentJobs.get(1); + verify(prev.getStage().equals(DONE), "Unexpectedly found two ongoing jobs."); + return Optional.of( + DownloadSchedule.of( + mostRecent, + CompletedJob.of(prev), + isTimeAgain(mostRecent, maxNopInterval))); + } + }); + } + + private boolean isTimeAgain(BsaDownload mostRecent, Duration interval) { + return mostRecent.getCreationTime().plus(interval).minus(CRON_JITTER).isBefore(clock.nowUtc()); + } + + /** + * Adds a new {@link BsaDownload} to the database and returns a {@link DownloadSchedule} for it. + */ + private DownloadSchedule scheduleNewJob(Optional prevJob) { + BsaDownload job = new BsaDownload(); + tm().insert(job); + return prevJob + .map( + prev -> + DownloadSchedule.of(job, CompletedJob.of(prev), isTimeAgain(prev, maxNopInterval))) + .orElseGet(() -> DownloadSchedule.of(job)); + } + + @VisibleForTesting + ImmutableList loadRecentProcessedJobs() { + return ImmutableList.copyOf( + tm().getEntityManager() + .createQuery( + "FROM BsaDownload WHERE stage NOT IN :nop_stages ORDER BY creationTime DESC") + .setParameter("nop_stages", ImmutableList.of(CHECKSUMS_NOT_MATCH, NOP)) + .setMaxResults(2) + .getResultList()); + } +} diff --git a/core/src/test/java/google/registry/bsa/persistence/BsaDownloadTest.java b/core/src/test/java/google/registry/bsa/persistence/BsaDownloadTest.java index a0acf2982..a6548ea03 100644 --- a/core/src/test/java/google/registry/bsa/persistence/BsaDownloadTest.java +++ b/core/src/test/java/google/registry/bsa/persistence/BsaDownloadTest.java @@ -59,7 +59,7 @@ public class BsaDownloadTest { BsaDownload job = new BsaDownload(); assertThat(job.getChecksums()).isEmpty(); ImmutableMap checksums = ImmutableMap.of(BLOCK, "a", BLOCK_PLUS, "b"); - job.setBlockListChecksums(checksums); + job.setChecksums(checksums); assertThat(job.getChecksums()).isEqualTo(checksums); assertThat(job.blockListChecksums).isEqualTo("BLOCK=a,BLOCK_PLUS=b"); } diff --git a/core/src/test/java/google/registry/bsa/persistence/DownloadSchedulerTest.java b/core/src/test/java/google/registry/bsa/persistence/DownloadSchedulerTest.java new file mode 100644 index 000000000..55de12702 --- /dev/null +++ b/core/src/test/java/google/registry/bsa/persistence/DownloadSchedulerTest.java @@ -0,0 +1,210 @@ +// Copyright 2023 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.bsa.persistence; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth8.assertThat; +import static google.registry.bsa.DownloadStage.CHECKSUMS_NOT_MATCH; +import static google.registry.bsa.DownloadStage.DONE; +import static google.registry.bsa.DownloadStage.DOWNLOAD; +import static google.registry.bsa.DownloadStage.MAKE_DIFF; +import static google.registry.bsa.DownloadStage.NOP; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static org.joda.time.DateTimeZone.UTC; +import static org.joda.time.Duration.standardDays; +import static org.joda.time.Duration.standardMinutes; +import static org.joda.time.Duration.standardSeconds; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import google.registry.bsa.BlockList; +import google.registry.bsa.DownloadStage; +import google.registry.bsa.persistence.DownloadSchedule.CompletedJob; +import google.registry.persistence.transaction.JpaTestExtensions; +import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationWithCoverageExtension; +import google.registry.testing.FakeClock; +import java.util.Optional; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** Unit tests for {@link DownloadScheduler} */ +public class DownloadSchedulerTest { + + static final Duration DOWNLOAD_INTERVAL = standardMinutes(30); + static final Duration MAX_NOP_INTERVAL = standardDays(1); + + protected FakeClock fakeClock = new FakeClock(DateTime.now(UTC)); + + @RegisterExtension + final JpaIntegrationWithCoverageExtension jpa = + new JpaTestExtensions.Builder().withClock(fakeClock).buildIntegrationWithCoverageExtension(); + + private DownloadScheduler scheduler; + + @BeforeEach + void setup() { + scheduler = new DownloadScheduler(DOWNLOAD_INTERVAL, MAX_NOP_INTERVAL, fakeClock); + } + + @AfterEach + void dbCheck() { + ImmutableSet terminalStages = ImmutableSet.of(DONE, NOP, CHECKSUMS_NOT_MATCH); + assertThat( + tm().transact( + () -> + tm().getEntityManager() + .createQuery("FROM BsaDownload", BsaDownload.class) + .getResultStream() + .filter(job -> !terminalStages.contains(job.getStage())) + .count())) + .isAtMost(1); + } + + @Test + void firstJobEver() { + Optional scheduleOptional = scheduler.schedule(); + assertThat(scheduleOptional).isPresent(); + DownloadSchedule schedule = scheduleOptional.get(); + assertThat(schedule.latestCompleted()).isEmpty(); + assertThat(schedule.jobName()).isEqualTo(fakeClock.nowUtc().toString()); + assertThat(schedule.stage()).isEqualTo(DownloadStage.DOWNLOAD); + assertThat(schedule.alwaysDownload()).isTrue(); + } + + @Test + void oneInProgressJob() { + BsaDownload inProgressJob = insertOneJobAndAdvanceClock(MAKE_DIFF); + Optional scheduleOptional = scheduler.schedule(); + assertThat(scheduleOptional).isPresent(); + DownloadSchedule schedule = scheduleOptional.get(); + assertThat(schedule.jobId()).isEqualTo(inProgressJob.jobId); + assertThat(schedule.jobName()).isEqualTo(inProgressJob.getJobName()); + assertThat(schedule.stage()).isEqualTo(MAKE_DIFF); + assertThat(schedule.latestCompleted()).isEmpty(); + assertThat(schedule.alwaysDownload()).isTrue(); + } + + @Test + void oneInProgressJobOneCompletedJob() { + BsaDownload completed = insertOneJobAndAdvanceClock(DONE); + BsaDownload inProgressJob = insertOneJobAndAdvanceClock(MAKE_DIFF); + Optional scheduleOptional = scheduler.schedule(); + assertThat(scheduleOptional).isPresent(); + DownloadSchedule schedule = scheduleOptional.get(); + assertThat(schedule.jobId()).isEqualTo(inProgressJob.jobId); + assertThat(schedule.jobName()).isEqualTo(inProgressJob.getJobName()); + assertThat(schedule.stage()).isEqualTo(MAKE_DIFF); + assertThat(schedule.alwaysDownload()).isFalse(); + assertThat(schedule.latestCompleted()).isPresent(); + CompletedJob lastCompleted = schedule.latestCompleted().get(); + assertThat(lastCompleted.jobName()).isEqualTo(completed.getJobName()); + assertThat(lastCompleted.checksums()).isEqualTo(completed.getChecksums()); + } + + @Test + void doneJob_noNewSchedule() { + insertOneJobAndAdvanceClock(DONE); + assertThat(scheduler.schedule()).isEmpty(); + } + + @Test + void doneJob_newSchedule() { + BsaDownload completed = insertOneJobAndAdvanceClock(DONE); + fakeClock.advanceBy(DOWNLOAD_INTERVAL); + Optional scheduleOptional = scheduler.schedule(); + assertThat(scheduleOptional).isPresent(); + DownloadSchedule schedule = scheduleOptional.get(); + assertThat(schedule.stage()).isEqualTo(DOWNLOAD); + assertThat(schedule.alwaysDownload()).isFalse(); + assertThat(schedule.latestCompleted()).isPresent(); + CompletedJob completedJob = schedule.latestCompleted().get(); + assertThat(completedJob.jobName()).isEqualTo(completed.getJobName()); + assertThat(completedJob.checksums()).isEqualTo(completedJob.checksums()); + } + + @Test + void doneJob_newSchedule_alwaysDownload() { + insertOneJobAndAdvanceClock(DONE); + fakeClock.advanceBy(MAX_NOP_INTERVAL); + Optional scheduleOptional = scheduler.schedule(); + assertThat(scheduleOptional).isPresent(); + DownloadSchedule schedule = scheduleOptional.get(); + assertThat(schedule.alwaysDownload()).isTrue(); + } + + @Test + void doneJob_cronEarlyWithJitter_newSchedule() { + insertOneJobAndAdvanceClock(DONE); + fakeClock.advanceBy(DOWNLOAD_INTERVAL.minus(standardSeconds(5))); + assertThat(scheduler.schedule()).isPresent(); + } + + @Test + void doneJob_cronEarlyMoreThanJitter_newSchedule() { + insertOneJobAndAdvanceClock(DONE); + fakeClock.advanceBy(DOWNLOAD_INTERVAL.minus(standardSeconds(6))); + assertThat(scheduler.schedule()).isEmpty(); + } + + @Test + void loadRecentProcessedJobs_noneExists() { + assertThat(tm().transact(() -> scheduler.loadRecentProcessedJobs())).isEmpty(); + } + + @Test + void loadRecentProcessedJobs_nopJobsOnly() { + insertOneJobAndAdvanceClock(DownloadStage.NOP); + insertOneJobAndAdvanceClock(DownloadStage.CHECKSUMS_NOT_MATCH); + assertThat(tm().transact(() -> scheduler.loadRecentProcessedJobs())).isEmpty(); + } + + @Test + void loadRecentProcessedJobs_oneInProgressJob() { + BsaDownload job = insertOneJobAndAdvanceClock(MAKE_DIFF); + assertThat(tm().transact(() -> scheduler.loadRecentProcessedJobs())).containsExactly(job); + } + + @Test + void loadRecentProcessedJobs_oneDoneJob() { + BsaDownload job = insertOneJobAndAdvanceClock(DONE); + assertThat(tm().transact(() -> scheduler.loadRecentProcessedJobs())).containsExactly(job); + } + + @Test + void loadRecentProcessedJobs_multipleJobs() { + insertOneJobAndAdvanceClock(DownloadStage.DONE); + insertOneJobAndAdvanceClock(DownloadStage.DONE); + BsaDownload completed = insertOneJobAndAdvanceClock(DownloadStage.DONE); + insertOneJobAndAdvanceClock(DownloadStage.NOP); + insertOneJobAndAdvanceClock(DownloadStage.CHECKSUMS_NOT_MATCH); + BsaDownload inprogress = insertOneJobAndAdvanceClock(DownloadStage.APPLY_DIFF); + assertThat(tm().transact(() -> scheduler.loadRecentProcessedJobs())) + .containsExactly(inprogress, completed) + .inOrder(); + } + + private BsaDownload insertOneJobAndAdvanceClock(DownloadStage stage) { + BsaDownload job = new BsaDownload(); + job.setStage(stage); + job.setChecksums(ImmutableMap.of(BlockList.BLOCK, "1", BlockList.BLOCK_PLUS, "2")); + tm().transact(() -> tm().insert(job)); + fakeClock.advanceOneMilli(); + return job; + } +}