diff --git a/core/build.gradle b/core/build.gradle index 63c427c75..9fe18d743 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -367,6 +367,8 @@ dependencies { compile deps['org.flywaydb:flyway-core'] closureCompiler deps['com.google.javascript:closure-compiler'] + testCompile 'com.google.protobuf:protobuf-java-util:3.17.3' + testCompile 'com.google.protobuf:protobuf-java-util:3.17.3' } task jaxbToJava { diff --git a/core/src/main/java/google/registry/reporting/ReportingModule.java b/core/src/main/java/google/registry/reporting/ReportingModule.java index 34f19ef7f..937de75ab 100644 --- a/core/src/main/java/google/registry/reporting/ReportingModule.java +++ b/core/src/main/java/google/registry/reporting/ReportingModule.java @@ -40,6 +40,8 @@ public class ReportingModule { public static final String BEAM_QUEUE = "beam-reporting"; + /** The amount of time expected for the Dataflow jobs to complete. */ + public static final int ENQUEUE_DELAY_MINUTES = 10; /** * The request parameter name used by reporting actions that takes a year/month parameter, which * defaults to the last month. diff --git a/core/src/main/java/google/registry/reporting/ReportingUtils.java b/core/src/main/java/google/registry/reporting/ReportingUtils.java deleted file mode 100644 index 233e18017..000000000 --- a/core/src/main/java/google/registry/reporting/ReportingUtils.java +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2018 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.reporting; - -import com.google.appengine.api.taskqueue.QueueFactory; -import com.google.appengine.api.taskqueue.TaskOptions; -import java.util.Map; -import org.joda.time.Duration; -import org.joda.time.YearMonth; - -/** Static methods common to various reporting tasks. */ -public class ReportingUtils { - - private static final int ENQUEUE_DELAY_MINUTES = 10; - - /** Enqueues a task that takes a Beam jobId and the {@link YearMonth} as parameters. */ - public static void enqueueBeamReportingTask(String path, Map parameters) { - TaskOptions publishTask = - TaskOptions.Builder.withUrl(path) - .method(TaskOptions.Method.POST) - // Dataflow jobs tend to take about 10 minutes to complete. - .countdownMillis(Duration.standardMinutes(ENQUEUE_DELAY_MINUTES).getMillis()); - parameters.forEach(publishTask::param); - QueueFactory.getQueue(ReportingModule.BEAM_QUEUE).add(publishTask); - } -} diff --git a/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java b/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java index 1219b2e1a..7f283de65 100644 --- a/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java +++ b/core/src/main/java/google/registry/reporting/billing/GenerateInvoicesAction.java @@ -17,7 +17,6 @@ package google.registry.reporting.billing; import static google.registry.beam.BeamUtils.createJobName; import static google.registry.model.common.DatabaseMigrationStateSchedule.PrimaryDatabase.CLOUD_SQL; import static google.registry.persistence.transaction.TransactionManagerFactory.tm; -import static google.registry.reporting.ReportingUtils.enqueueBeamReportingTask; import static google.registry.request.Action.Method.POST; import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; import static javax.servlet.http.HttpServletResponse.SC_OK; @@ -27,6 +26,7 @@ import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter; import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultimap; import com.google.common.flogger.FluentLogger; import com.google.common.net.MediaType; import google.registry.config.RegistryConfig.Config; @@ -35,14 +35,16 @@ import google.registry.model.common.DatabaseMigrationStateSchedule.PrimaryDataba import google.registry.persistence.PersistenceModule; import google.registry.reporting.ReportingModule; import google.registry.request.Action; +import google.registry.request.Action.Service; import google.registry.request.Parameter; import google.registry.request.RequestParameters; import google.registry.request.Response; import google.registry.request.auth.Auth; import google.registry.util.Clock; +import google.registry.util.CloudTasksUtils; import java.io.IOException; -import java.util.Map; import javax.inject.Inject; +import org.joda.time.Duration; import org.joda.time.YearMonth; /** @@ -76,6 +78,7 @@ public class GenerateInvoicesAction implements Runnable { private final Response response; private final Dataflow dataflow; private final PrimaryDatabase database; + private final CloudTasksUtils cloudTasksUtils; @Inject GenerateInvoicesAction( @@ -88,6 +91,7 @@ public class GenerateInvoicesAction implements Runnable { @Parameter(RequestParameters.PARAM_DATABASE) PrimaryDatabase database, YearMonth yearMonth, BillingEmailUtils emailUtils, + CloudTasksUtils cloudTasksUtils, Clock clock, Response response, Dataflow dataflow) { @@ -105,6 +109,7 @@ public class GenerateInvoicesAction implements Runnable { this.database = database; this.yearMonth = yearMonth; this.emailUtils = emailUtils; + this.cloudTasksUtils = cloudTasksUtils; this.clock = clock; this.response = response; this.dataflow = dataflow; @@ -144,13 +149,18 @@ public class GenerateInvoicesAction implements Runnable { logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString()); String jobId = launchResponse.getJob().getId(); if (shouldPublish) { - Map beamTaskParameters = - ImmutableMap.of( - ReportingModule.PARAM_JOB_ID, - jobId, - ReportingModule.PARAM_YEAR_MONTH, - yearMonth.toString()); - enqueueBeamReportingTask(PublishInvoicesAction.PATH, beamTaskParameters); + cloudTasksUtils.enqueue( + ReportingModule.BEAM_QUEUE, + CloudTasksUtils.createPostTask( + PublishInvoicesAction.PATH, + Service.BACKEND.toString(), + ImmutableMultimap.of( + ReportingModule.PARAM_JOB_ID, + jobId, + ReportingModule.PARAM_YEAR_MONTH, + yearMonth.toString()), + clock, + Duration.standardMinutes(ReportingModule.ENQUEUE_DELAY_MINUTES))); } response.setStatus(SC_OK); response.setPayload(String.format("Launched invoicing pipeline: %s", jobId)); diff --git a/core/src/main/java/google/registry/reporting/spec11/GenerateSpec11ReportAction.java b/core/src/main/java/google/registry/reporting/spec11/GenerateSpec11ReportAction.java index 63250469f..a9162d192 100644 --- a/core/src/main/java/google/registry/reporting/spec11/GenerateSpec11ReportAction.java +++ b/core/src/main/java/google/registry/reporting/spec11/GenerateSpec11ReportAction.java @@ -16,7 +16,6 @@ package google.registry.reporting.spec11; import static google.registry.beam.BeamUtils.createJobName; import static google.registry.persistence.transaction.TransactionManagerFactory.tm; -import static google.registry.reporting.ReportingUtils.enqueueBeamReportingTask; import static google.registry.request.Action.Method.POST; import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; import static javax.servlet.http.HttpServletResponse.SC_OK; @@ -26,6 +25,7 @@ import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter; import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest; import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultimap; import com.google.common.flogger.FluentLogger; import com.google.common.net.MediaType; import google.registry.config.RegistryConfig.Config; @@ -34,14 +34,16 @@ import google.registry.keyring.api.KeyModule.Key; import google.registry.model.common.DatabaseMigrationStateSchedule.PrimaryDatabase; import google.registry.reporting.ReportingModule; import google.registry.request.Action; +import google.registry.request.Action.Service; import google.registry.request.Parameter; import google.registry.request.RequestParameters; import google.registry.request.Response; import google.registry.request.auth.Auth; import google.registry.util.Clock; +import google.registry.util.CloudTasksUtils; import java.io.IOException; -import java.util.Map; import javax.inject.Inject; +import org.joda.time.Duration; import org.joda.time.LocalDate; /** @@ -73,6 +75,7 @@ public class GenerateSpec11ReportAction implements Runnable { private final Dataflow dataflow; private final PrimaryDatabase database; private final boolean sendEmail; + private final CloudTasksUtils cloudTasksUtils; @Inject GenerateSpec11ReportAction( @@ -86,7 +89,8 @@ public class GenerateSpec11ReportAction implements Runnable { @Parameter(ReportingModule.SEND_EMAIL) boolean sendEmail, Clock clock, Response response, - Dataflow dataflow) { + Dataflow dataflow, + CloudTasksUtils cloudTasksUtils) { this.projectId = projectId; this.jobRegion = jobRegion; this.stagingBucketUrl = stagingBucketUrl; @@ -101,6 +105,7 @@ public class GenerateSpec11ReportAction implements Runnable { this.response = response; this.dataflow = dataflow; this.sendEmail = sendEmail; + this.cloudTasksUtils = cloudTasksUtils; } @Override @@ -136,11 +141,19 @@ public class GenerateSpec11ReportAction implements Runnable { .execute(); logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString()); String jobId = launchResponse.getJob().getId(); - Map beamTaskParameters = - ImmutableMap.of( - ReportingModule.PARAM_JOB_ID, jobId, ReportingModule.PARAM_DATE, date.toString()); if (sendEmail) { - enqueueBeamReportingTask(PublishSpec11ReportAction.PATH, beamTaskParameters); + cloudTasksUtils.enqueue( + ReportingModule.BEAM_QUEUE, + CloudTasksUtils.createPostTask( + PublishSpec11ReportAction.PATH, + Service.BACKEND.toString(), + ImmutableMultimap.of( + ReportingModule.PARAM_JOB_ID, + jobId, + ReportingModule.PARAM_DATE, + date.toString()), + clock, + Duration.standardMinutes(ReportingModule.ENQUEUE_DELAY_MINUTES))); } response.setStatus(SC_OK); response.setPayload(String.format("Launched Spec11 pipeline: %s", jobId)); diff --git a/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java b/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java index 88fdb9c65..bf2d44388 100644 --- a/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java +++ b/core/src/test/java/google/registry/reporting/billing/GenerateInvoicesActionTest.java @@ -15,23 +15,27 @@ package google.registry.reporting.billing; import static com.google.common.truth.Truth.assertThat; -import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued; -import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; import static javax.servlet.http.HttpServletResponse.SC_OK; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.cloud.tasks.v2.HttpMethod; import com.google.common.net.MediaType; +import com.google.protobuf.util.Timestamps; import google.registry.beam.BeamActionTestBase; import google.registry.model.common.DatabaseMigrationStateSchedule.PrimaryDatabase; +import google.registry.reporting.ReportingModule; import google.registry.testing.AppEngineExtension; +import google.registry.testing.CloudTasksHelper; +import google.registry.testing.CloudTasksHelper.TaskMatcher; import google.registry.testing.DualDatabaseTest; import google.registry.testing.FakeClock; -import google.registry.testing.TaskQueueHelper.TaskMatcher; import google.registry.testing.TestOfyAndSql; +import google.registry.util.CloudTasksUtils; import java.io.IOException; +import org.joda.time.Duration; import org.joda.time.YearMonth; import org.junit.jupiter.api.extension.RegisterExtension; @@ -45,6 +49,8 @@ class GenerateInvoicesActionTest extends BeamActionTestBase { private final BillingEmailUtils emailUtils = mock(BillingEmailUtils.class); private FakeClock clock = new FakeClock(); + private CloudTasksHelper cloudTasksHelper = new CloudTasksHelper(); + private CloudTasksUtils cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils(); private GenerateInvoicesAction action; @TestOfyAndSql @@ -60,6 +66,7 @@ class GenerateInvoicesActionTest extends BeamActionTestBase { PrimaryDatabase.DATASTORE, new YearMonth(2017, 10), emailUtils, + cloudTasksUtils, clock, response, dataflow); @@ -68,13 +75,19 @@ class GenerateInvoicesActionTest extends BeamActionTestBase { assertThat(response.getStatus()).isEqualTo(SC_OK); assertThat(response.getPayload()).isEqualTo("Launched invoicing pipeline: jobid"); - TaskMatcher matcher = + cloudTasksHelper.assertTasksEnqueued( + "beam-reporting", new TaskMatcher() .url("/_dr/task/publishInvoices") - .method("POST") + .method(HttpMethod.POST) .param("jobId", "jobid") - .param("yearMonth", "2017-10"); - assertTasksEnqueued("beam-reporting", matcher); + .param("yearMonth", "2017-10") + .scheduleTime( + Timestamps.fromMillis( + clock + .nowUtc() + .plus(Duration.standardMinutes(ReportingModule.ENQUEUE_DELAY_MINUTES)) + .getMillis()))); } @TestOfyAndSql @@ -90,6 +103,7 @@ class GenerateInvoicesActionTest extends BeamActionTestBase { PrimaryDatabase.DATASTORE, new YearMonth(2017, 10), emailUtils, + cloudTasksUtils, clock, response, dataflow); @@ -97,7 +111,7 @@ class GenerateInvoicesActionTest extends BeamActionTestBase { assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); assertThat(response.getStatus()).isEqualTo(SC_OK); assertThat(response.getPayload()).isEqualTo("Launched invoicing pipeline: jobid"); - assertNoTasksEnqueued("beam-reporting"); + cloudTasksHelper.assertNoTasksEnqueued("beam-reporting"); } @TestOfyAndSql @@ -114,6 +128,7 @@ class GenerateInvoicesActionTest extends BeamActionTestBase { PrimaryDatabase.DATASTORE, new YearMonth(2017, 10), emailUtils, + cloudTasksUtils, clock, response, dataflow); @@ -121,6 +136,6 @@ class GenerateInvoicesActionTest extends BeamActionTestBase { assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR); assertThat(response.getPayload()).isEqualTo("Pipeline launch failed: Pipeline error"); verify(emailUtils).sendAlertEmail("Pipeline Launch failed due to Pipeline error"); - assertNoTasksEnqueued("beam-reporting"); + cloudTasksHelper.assertNoTasksEnqueued("beam-reporting"); } } diff --git a/core/src/test/java/google/registry/reporting/spec11/GenerateSpec11ReportActionTest.java b/core/src/test/java/google/registry/reporting/spec11/GenerateSpec11ReportActionTest.java index c551b2c50..19b34785b 100644 --- a/core/src/test/java/google/registry/reporting/spec11/GenerateSpec11ReportActionTest.java +++ b/core/src/test/java/google/registry/reporting/spec11/GenerateSpec11ReportActionTest.java @@ -15,20 +15,24 @@ package google.registry.reporting.spec11; import static com.google.common.truth.Truth.assertThat; -import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued; -import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued; import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR; import static org.apache.http.HttpStatus.SC_OK; import static org.mockito.Mockito.when; +import com.google.cloud.tasks.v2.HttpMethod; import com.google.common.net.MediaType; +import com.google.protobuf.util.Timestamps; import google.registry.beam.BeamActionTestBase; import google.registry.model.common.DatabaseMigrationStateSchedule.PrimaryDatabase; +import google.registry.reporting.ReportingModule; import google.registry.testing.AppEngineExtension; +import google.registry.testing.CloudTasksHelper; +import google.registry.testing.CloudTasksHelper.TaskMatcher; import google.registry.testing.FakeClock; -import google.registry.testing.TaskQueueHelper.TaskMatcher; +import google.registry.util.CloudTasksUtils; import java.io.IOException; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -40,6 +44,8 @@ class GenerateSpec11ReportActionTest extends BeamActionTestBase { AppEngineExtension.builder().withDatastoreAndCloudSql().withTaskQueue().build(); private final FakeClock clock = new FakeClock(DateTime.parse("2018-06-11T12:23:56Z")); + private CloudTasksHelper cloudTasksHelper = new CloudTasksHelper(); + private CloudTasksUtils cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils(); private GenerateSpec11ReportAction action; @Test @@ -56,13 +62,14 @@ class GenerateSpec11ReportActionTest extends BeamActionTestBase { true, clock, response, - dataflow); + dataflow, + cloudTasksUtils); when(launch.execute()).thenThrow(new IOException("Dataflow failure")); action.run(); assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR); assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); assertThat(response.getPayload()).contains("Dataflow failure"); - assertNoTasksEnqueued("beam-reporting"); + cloudTasksHelper.assertNoTasksEnqueued("beam-reporting"); } @Test @@ -79,18 +86,26 @@ class GenerateSpec11ReportActionTest extends BeamActionTestBase { true, clock, response, - dataflow); + dataflow, + cloudTasksUtils); action.run(); assertThat(response.getStatus()).isEqualTo(SC_OK); assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); assertThat(response.getPayload()).isEqualTo("Launched Spec11 pipeline: jobid"); - TaskMatcher matcher = + + cloudTasksHelper.assertTasksEnqueued( + "beam-reporting", new TaskMatcher() .url("/_dr/task/publishSpec11") - .method("POST") + .method(HttpMethod.POST) .param("jobId", "jobid") - .param("date", "2018-06-11"); - assertTasksEnqueued("beam-reporting", matcher); + .param("date", "2018-06-11") + .scheduleTime( + Timestamps.fromMillis( + clock + .nowUtc() + .plus(Duration.standardMinutes(ReportingModule.ENQUEUE_DELAY_MINUTES)) + .getMillis()))); } @Test @@ -107,11 +122,12 @@ class GenerateSpec11ReportActionTest extends BeamActionTestBase { false, clock, response, - dataflow); + dataflow, + cloudTasksUtils); action.run(); assertThat(response.getStatus()).isEqualTo(SC_OK); assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8); assertThat(response.getPayload()).isEqualTo("Launched Spec11 pipeline: jobid"); - assertNoTasksEnqueued("beam-reporting"); + cloudTasksHelper.assertNoTasksEnqueued("beam-reporting"); } }