Remove ReportingUtils and use CloudTasksUtil to enqueue tasks in GenerateInvoicesAction and GenerateSpec11ReportAction (#1491)

* Remove ReportingUtils and use CloudTaskUtil to enqueue 

* Use schedule time helper to enqueue and update schedule time comparison

* Fix comment, indentation in gradle file and improve time comparison
This commit is contained in:
Rachel Guan 2022-02-08 17:48:47 -05:00 committed by GitHub
parent 3af6ade080
commit fb140808a6
7 changed files with 95 additions and 75 deletions

View file

@ -367,6 +367,8 @@ dependencies {
compile deps['org.flywaydb:flyway-core']
closureCompiler deps['com.google.javascript:closure-compiler']
testCompile 'com.google.protobuf:protobuf-java-util:3.17.3'
testCompile 'com.google.protobuf:protobuf-java-util:3.17.3'
}
task jaxbToJava {

View file

@ -40,6 +40,8 @@ public class ReportingModule {
public static final String BEAM_QUEUE = "beam-reporting";
/** The amount of time expected for the Dataflow jobs to complete. */
public static final int ENQUEUE_DELAY_MINUTES = 10;
/**
* The request parameter name used by reporting actions that takes a year/month parameter, which
* defaults to the last month.

View file

@ -1,38 +0,0 @@
// Copyright 2018 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.reporting;
import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.taskqueue.TaskOptions;
import java.util.Map;
import org.joda.time.Duration;
import org.joda.time.YearMonth;
/** Static methods common to various reporting tasks. */
public class ReportingUtils {
private static final int ENQUEUE_DELAY_MINUTES = 10;
/** Enqueues a task that takes a Beam jobId and the {@link YearMonth} as parameters. */
public static void enqueueBeamReportingTask(String path, Map<String, String> parameters) {
TaskOptions publishTask =
TaskOptions.Builder.withUrl(path)
.method(TaskOptions.Method.POST)
// Dataflow jobs tend to take about 10 minutes to complete.
.countdownMillis(Duration.standardMinutes(ENQUEUE_DELAY_MINUTES).getMillis());
parameters.forEach(publishTask::param);
QueueFactory.getQueue(ReportingModule.BEAM_QUEUE).add(publishTask);
}
}

View file

@ -17,7 +17,6 @@ package google.registry.reporting.billing;
import static google.registry.beam.BeamUtils.createJobName;
import static google.registry.model.common.DatabaseMigrationStateSchedule.PrimaryDatabase.CLOUD_SQL;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.reporting.ReportingUtils.enqueueBeamReportingTask;
import static google.registry.request.Action.Method.POST;
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static javax.servlet.http.HttpServletResponse.SC_OK;
@ -27,6 +26,7 @@ import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.flogger.FluentLogger;
import com.google.common.net.MediaType;
import google.registry.config.RegistryConfig.Config;
@ -35,14 +35,16 @@ import google.registry.model.common.DatabaseMigrationStateSchedule.PrimaryDataba
import google.registry.persistence.PersistenceModule;
import google.registry.reporting.ReportingModule;
import google.registry.request.Action;
import google.registry.request.Action.Service;
import google.registry.request.Parameter;
import google.registry.request.RequestParameters;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import google.registry.util.CloudTasksUtils;
import java.io.IOException;
import java.util.Map;
import javax.inject.Inject;
import org.joda.time.Duration;
import org.joda.time.YearMonth;
/**
@ -76,6 +78,7 @@ public class GenerateInvoicesAction implements Runnable {
private final Response response;
private final Dataflow dataflow;
private final PrimaryDatabase database;
private final CloudTasksUtils cloudTasksUtils;
@Inject
GenerateInvoicesAction(
@ -88,6 +91,7 @@ public class GenerateInvoicesAction implements Runnable {
@Parameter(RequestParameters.PARAM_DATABASE) PrimaryDatabase database,
YearMonth yearMonth,
BillingEmailUtils emailUtils,
CloudTasksUtils cloudTasksUtils,
Clock clock,
Response response,
Dataflow dataflow) {
@ -105,6 +109,7 @@ public class GenerateInvoicesAction implements Runnable {
this.database = database;
this.yearMonth = yearMonth;
this.emailUtils = emailUtils;
this.cloudTasksUtils = cloudTasksUtils;
this.clock = clock;
this.response = response;
this.dataflow = dataflow;
@ -144,13 +149,18 @@ public class GenerateInvoicesAction implements Runnable {
logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString());
String jobId = launchResponse.getJob().getId();
if (shouldPublish) {
Map<String, String> beamTaskParameters =
ImmutableMap.of(
cloudTasksUtils.enqueue(
ReportingModule.BEAM_QUEUE,
CloudTasksUtils.createPostTask(
PublishInvoicesAction.PATH,
Service.BACKEND.toString(),
ImmutableMultimap.of(
ReportingModule.PARAM_JOB_ID,
jobId,
ReportingModule.PARAM_YEAR_MONTH,
yearMonth.toString());
enqueueBeamReportingTask(PublishInvoicesAction.PATH, beamTaskParameters);
yearMonth.toString()),
clock,
Duration.standardMinutes(ReportingModule.ENQUEUE_DELAY_MINUTES)));
}
response.setStatus(SC_OK);
response.setPayload(String.format("Launched invoicing pipeline: %s", jobId));

View file

@ -16,7 +16,6 @@ package google.registry.reporting.spec11;
import static google.registry.beam.BeamUtils.createJobName;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.reporting.ReportingUtils.enqueueBeamReportingTask;
import static google.registry.request.Action.Method.POST;
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static javax.servlet.http.HttpServletResponse.SC_OK;
@ -26,6 +25,7 @@ import com.google.api.services.dataflow.model.LaunchFlexTemplateParameter;
import com.google.api.services.dataflow.model.LaunchFlexTemplateRequest;
import com.google.api.services.dataflow.model.LaunchFlexTemplateResponse;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.flogger.FluentLogger;
import com.google.common.net.MediaType;
import google.registry.config.RegistryConfig.Config;
@ -34,14 +34,16 @@ import google.registry.keyring.api.KeyModule.Key;
import google.registry.model.common.DatabaseMigrationStateSchedule.PrimaryDatabase;
import google.registry.reporting.ReportingModule;
import google.registry.request.Action;
import google.registry.request.Action.Service;
import google.registry.request.Parameter;
import google.registry.request.RequestParameters;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.util.Clock;
import google.registry.util.CloudTasksUtils;
import java.io.IOException;
import java.util.Map;
import javax.inject.Inject;
import org.joda.time.Duration;
import org.joda.time.LocalDate;
/**
@ -73,6 +75,7 @@ public class GenerateSpec11ReportAction implements Runnable {
private final Dataflow dataflow;
private final PrimaryDatabase database;
private final boolean sendEmail;
private final CloudTasksUtils cloudTasksUtils;
@Inject
GenerateSpec11ReportAction(
@ -86,7 +89,8 @@ public class GenerateSpec11ReportAction implements Runnable {
@Parameter(ReportingModule.SEND_EMAIL) boolean sendEmail,
Clock clock,
Response response,
Dataflow dataflow) {
Dataflow dataflow,
CloudTasksUtils cloudTasksUtils) {
this.projectId = projectId;
this.jobRegion = jobRegion;
this.stagingBucketUrl = stagingBucketUrl;
@ -101,6 +105,7 @@ public class GenerateSpec11ReportAction implements Runnable {
this.response = response;
this.dataflow = dataflow;
this.sendEmail = sendEmail;
this.cloudTasksUtils = cloudTasksUtils;
}
@Override
@ -136,11 +141,19 @@ public class GenerateSpec11ReportAction implements Runnable {
.execute();
logger.atInfo().log("Got response: %s", launchResponse.getJob().toPrettyString());
String jobId = launchResponse.getJob().getId();
Map<String, String> beamTaskParameters =
ImmutableMap.of(
ReportingModule.PARAM_JOB_ID, jobId, ReportingModule.PARAM_DATE, date.toString());
if (sendEmail) {
enqueueBeamReportingTask(PublishSpec11ReportAction.PATH, beamTaskParameters);
cloudTasksUtils.enqueue(
ReportingModule.BEAM_QUEUE,
CloudTasksUtils.createPostTask(
PublishSpec11ReportAction.PATH,
Service.BACKEND.toString(),
ImmutableMultimap.of(
ReportingModule.PARAM_JOB_ID,
jobId,
ReportingModule.PARAM_DATE,
date.toString()),
clock,
Duration.standardMinutes(ReportingModule.ENQUEUE_DELAY_MINUTES)));
}
response.setStatus(SC_OK);
response.setPayload(String.format("Launched Spec11 pipeline: %s", jobId));

View file

@ -15,23 +15,27 @@
package google.registry.reporting.billing;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued;
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.common.net.MediaType;
import com.google.protobuf.util.Timestamps;
import google.registry.beam.BeamActionTestBase;
import google.registry.model.common.DatabaseMigrationStateSchedule.PrimaryDatabase;
import google.registry.reporting.ReportingModule;
import google.registry.testing.AppEngineExtension;
import google.registry.testing.CloudTasksHelper;
import google.registry.testing.CloudTasksHelper.TaskMatcher;
import google.registry.testing.DualDatabaseTest;
import google.registry.testing.FakeClock;
import google.registry.testing.TaskQueueHelper.TaskMatcher;
import google.registry.testing.TestOfyAndSql;
import google.registry.util.CloudTasksUtils;
import java.io.IOException;
import org.joda.time.Duration;
import org.joda.time.YearMonth;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -45,6 +49,8 @@ class GenerateInvoicesActionTest extends BeamActionTestBase {
private final BillingEmailUtils emailUtils = mock(BillingEmailUtils.class);
private FakeClock clock = new FakeClock();
private CloudTasksHelper cloudTasksHelper = new CloudTasksHelper();
private CloudTasksUtils cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils();
private GenerateInvoicesAction action;
@TestOfyAndSql
@ -60,6 +66,7 @@ class GenerateInvoicesActionTest extends BeamActionTestBase {
PrimaryDatabase.DATASTORE,
new YearMonth(2017, 10),
emailUtils,
cloudTasksUtils,
clock,
response,
dataflow);
@ -68,13 +75,19 @@ class GenerateInvoicesActionTest extends BeamActionTestBase {
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getPayload()).isEqualTo("Launched invoicing pipeline: jobid");
TaskMatcher matcher =
cloudTasksHelper.assertTasksEnqueued(
"beam-reporting",
new TaskMatcher()
.url("/_dr/task/publishInvoices")
.method("POST")
.method(HttpMethod.POST)
.param("jobId", "jobid")
.param("yearMonth", "2017-10");
assertTasksEnqueued("beam-reporting", matcher);
.param("yearMonth", "2017-10")
.scheduleTime(
Timestamps.fromMillis(
clock
.nowUtc()
.plus(Duration.standardMinutes(ReportingModule.ENQUEUE_DELAY_MINUTES))
.getMillis())));
}
@TestOfyAndSql
@ -90,6 +103,7 @@ class GenerateInvoicesActionTest extends BeamActionTestBase {
PrimaryDatabase.DATASTORE,
new YearMonth(2017, 10),
emailUtils,
cloudTasksUtils,
clock,
response,
dataflow);
@ -97,7 +111,7 @@ class GenerateInvoicesActionTest extends BeamActionTestBase {
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getPayload()).isEqualTo("Launched invoicing pipeline: jobid");
assertNoTasksEnqueued("beam-reporting");
cloudTasksHelper.assertNoTasksEnqueued("beam-reporting");
}
@TestOfyAndSql
@ -114,6 +128,7 @@ class GenerateInvoicesActionTest extends BeamActionTestBase {
PrimaryDatabase.DATASTORE,
new YearMonth(2017, 10),
emailUtils,
cloudTasksUtils,
clock,
response,
dataflow);
@ -121,6 +136,6 @@ class GenerateInvoicesActionTest extends BeamActionTestBase {
assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR);
assertThat(response.getPayload()).isEqualTo("Pipeline launch failed: Pipeline error");
verify(emailUtils).sendAlertEmail("Pipeline Launch failed due to Pipeline error");
assertNoTasksEnqueued("beam-reporting");
cloudTasksHelper.assertNoTasksEnqueued("beam-reporting");
}
}

View file

@ -15,20 +15,24 @@
package google.registry.reporting.spec11;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.testing.TaskQueueHelper.assertNoTasksEnqueued;
import static google.registry.testing.TaskQueueHelper.assertTasksEnqueued;
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static org.apache.http.HttpStatus.SC_OK;
import static org.mockito.Mockito.when;
import com.google.cloud.tasks.v2.HttpMethod;
import com.google.common.net.MediaType;
import com.google.protobuf.util.Timestamps;
import google.registry.beam.BeamActionTestBase;
import google.registry.model.common.DatabaseMigrationStateSchedule.PrimaryDatabase;
import google.registry.reporting.ReportingModule;
import google.registry.testing.AppEngineExtension;
import google.registry.testing.CloudTasksHelper;
import google.registry.testing.CloudTasksHelper.TaskMatcher;
import google.registry.testing.FakeClock;
import google.registry.testing.TaskQueueHelper.TaskMatcher;
import google.registry.util.CloudTasksUtils;
import java.io.IOException;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -40,6 +44,8 @@ class GenerateSpec11ReportActionTest extends BeamActionTestBase {
AppEngineExtension.builder().withDatastoreAndCloudSql().withTaskQueue().build();
private final FakeClock clock = new FakeClock(DateTime.parse("2018-06-11T12:23:56Z"));
private CloudTasksHelper cloudTasksHelper = new CloudTasksHelper();
private CloudTasksUtils cloudTasksUtils = cloudTasksHelper.getTestCloudTasksUtils();
private GenerateSpec11ReportAction action;
@Test
@ -56,13 +62,14 @@ class GenerateSpec11ReportActionTest extends BeamActionTestBase {
true,
clock,
response,
dataflow);
dataflow,
cloudTasksUtils);
when(launch.execute()).thenThrow(new IOException("Dataflow failure"));
action.run();
assertThat(response.getStatus()).isEqualTo(SC_INTERNAL_SERVER_ERROR);
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).contains("Dataflow failure");
assertNoTasksEnqueued("beam-reporting");
cloudTasksHelper.assertNoTasksEnqueued("beam-reporting");
}
@Test
@ -79,18 +86,26 @@ class GenerateSpec11ReportActionTest extends BeamActionTestBase {
true,
clock,
response,
dataflow);
dataflow,
cloudTasksUtils);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("Launched Spec11 pipeline: jobid");
TaskMatcher matcher =
cloudTasksHelper.assertTasksEnqueued(
"beam-reporting",
new TaskMatcher()
.url("/_dr/task/publishSpec11")
.method("POST")
.method(HttpMethod.POST)
.param("jobId", "jobid")
.param("date", "2018-06-11");
assertTasksEnqueued("beam-reporting", matcher);
.param("date", "2018-06-11")
.scheduleTime(
Timestamps.fromMillis(
clock
.nowUtc()
.plus(Duration.standardMinutes(ReportingModule.ENQUEUE_DELAY_MINUTES))
.getMillis())));
}
@Test
@ -107,11 +122,12 @@ class GenerateSpec11ReportActionTest extends BeamActionTestBase {
false,
clock,
response,
dataflow);
dataflow,
cloudTasksUtils);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(response.getContentType()).isEqualTo(MediaType.PLAIN_TEXT_UTF_8);
assertThat(response.getPayload()).isEqualTo("Launched Spec11 pipeline: jobid");
assertNoTasksEnqueued("beam-reporting");
cloudTasksHelper.assertNoTasksEnqueued("beam-reporting");
}
}