Add end-to-end code to run Beam template from App Engine

This serves as proof-of-concept to verify we can use Beam for our invoice generation use case. Namely, it checks that we can:
- Deploy a Beam template to GCS
- Read from Bigquery within the template
- Run the template from App Engine

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=175755390
This commit is contained in:
larryruili 2017-11-14 16:42:45 -08:00 committed by jianglai
parent 28417b7599
commit 99996121b5
9 changed files with 209 additions and 96 deletions

View file

@ -7,24 +7,20 @@ licenses(["notice"]) # Apache 2.0
java_library(
name = "billing",
srcs = glob(["*.java"]),
runtime_deps = [
"@com_google_apis_google_api_services_bigquery",
],
deps = [
"//java/google/registry/config",
"//java/google/registry/request",
"//java/google/registry/request/auth",
"//java/google/registry/util",
"@com_google_api_client_appengine",
"@com_google_apis_google_api_services_dataflow",
"@com_google_dagger",
"@com_google_guava",
"@joda_time",
"@org_apache_beam_runners_google_cloud_dataflow_java",
"@org_apache_beam_sdks_java_core",
],
)
java_binary(
name = "counter",
srcs = ["MinWordCount.java"],
main_class = "google.registry.billing.MinWordCount",
deps = [
"@com_google_dagger",
"@com_google_guava",
"@com_google_http_client",
"@javax_servlet_api",
"@joda_time",
"@org_apache_beam_runners_google_cloud_dataflow_java",
"@org_apache_beam_sdks_java_core",

View file

@ -0,0 +1,50 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.billing;
import com.google.api.client.googleapis.extensions.appengine.auth.oauth2.AppIdentityCredential;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.services.dataflow.Dataflow;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
import dagger.Module;
import dagger.Provides;
import google.registry.config.RegistryConfig.Config;
import java.util.Set;
/** Module for dependencies required by monthly billing actions. */
@Module
public final class BillingModule {
private static final String CLOUD_PLATFORM_SCOPE =
"https://www.googleapis.com/auth/cloud-platform";
/** Constructs a {@link Dataflow} API client with default settings. */
@Provides
static Dataflow provideDataflow(
@Config("projectId") String projectId,
HttpTransport transport,
JsonFactory jsonFactory,
Function<Set<String>, AppIdentityCredential> appIdentityCredentialFunc) {
return new Dataflow.Builder(
transport,
jsonFactory,
appIdentityCredentialFunc.apply(ImmutableSet.of(CLOUD_PLATFORM_SCOPE)))
.setApplicationName(String.format("%s billing", projectId))
.build();
}
}

View file

@ -15,15 +15,27 @@
package google.registry.billing;
import static google.registry.request.Action.Method.POST;
import static javax.servlet.http.HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.LaunchTemplateParameters;
import com.google.api.services.dataflow.model.LaunchTemplateResponse;
import com.google.api.services.dataflow.model.RuntimeEnvironment;
import com.google.common.net.MediaType;
import google.registry.config.RegistryConfig.Config;
import google.registry.request.Action;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.util.FormattingLogger;
import java.io.IOException;
import javax.inject.Inject;
/**
* Generates invoices for the month and stores them on GCS.
*
* <p>Currently this is just a stub runner that verifies we can deploy dataflow jobs from App
* <p>Currently this is just a simplified runner that verifies we can deploy dataflow jobs from App
* Engine.
*/
@Action(
@ -33,13 +45,44 @@ import javax.inject.Inject;
)
public class GenerateInvoicesAction implements Runnable {
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
@Inject @Config("projectId") String projectId;
@Inject @Config("apacheBeamBucketUrl") String beamBucketUrl;
@Inject Dataflow dataflow;
@Inject Response response;
@Inject GenerateInvoicesAction() {}
static final String PATH = "/_dr/task/generateInvoices";
@Override
public void run() {
// MinWordCount minWordCount = new MinWordCount();
// minWordCount.run();
logger.info("Launching dataflow job");
try {
LaunchTemplateParameters params =
new LaunchTemplateParameters()
.setJobName("test-bigquerytemplate1")
.setEnvironment(
new RuntimeEnvironment()
.setZone("us-east1-c")
.setTempLocation(beamBucketUrl + "/temp"));
LaunchTemplateResponse launchResponse =
dataflow
.projects()
.templates()
.launch(projectId, params)
.setGcsPath(beamBucketUrl + "/templates/bigquery1")
.execute();
logger.infofmt("Got response: %s", launchResponse.getJob().toPrettyString());
} catch (IOException e) {
logger.warningfmt("Template Launch failed due to: %s", e.getMessage());
response.setStatus(SC_INTERNAL_SERVER_ERROR);
response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
response.setPayload(String.format("Template launch failed: %s", e.getMessage()));
return;
}
response.setStatus(SC_OK);
response.setContentType(MediaType.PLAIN_TEXT_UTF_8);
response.setPayload("Launched dataflow template.");
}
}

View file

@ -1,42 +0,0 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.billing;
import org.apache.beam.sdk.transforms.DoFn;
/** A minimal word count, serving as proof of concept for the Beam pipeline. */
public class MinWordCount implements Runnable {
public static void main(String[] args) {
MinWordCount wordCount = new MinWordCount();
wordCount.run();
}
@Override
public void run() {
// This is a stub function for a basic proof-of-concept Beam pipeline.
}
static class ExtractWordsFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^\\p{L}]+")) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
}

View file

@ -463,6 +463,17 @@ public final class RegistryConfig {
return config.gSuite.outgoingEmailDisplayName;
}
/**
* Returns the Google Cloud Storage bucket for storing Beam templates and results.
*
* @see google.registry.billing.GenerateInvoicesAction
*/
@Provides
@Config("apacheBeamBucketUrl")
public static String provideApacheBeamBucketUrl(@Config("projectId") String projectId) {
return String.format("gs://%s-beam", projectId);
}
/**
* Returns the Google Cloud Storage bucket for ICANN transaction and activity reports to
* be uploaded.

View file

@ -28,6 +28,7 @@ import google.registry.batch.MapreduceEntityCleanupAction;
import google.registry.batch.RefreshDnsOnHostRenameAction;
import google.registry.batch.ResaveAllEppResourcesAction;
import google.registry.batch.VerifyEntityIntegrityAction;
import google.registry.billing.BillingModule;
import google.registry.billing.GenerateInvoicesAction;
import google.registry.cron.CommitLogFanoutAction;
import google.registry.cron.CronModule;
@ -86,6 +87,7 @@ import google.registry.tmch.TmchSmdrlAction;
BackendModule.class,
BackupModule.class,
BatchModule.class,
BillingModule.class,
CloudDnsWriterModule.class,
CronModule.class,
DnsModule.class,

View file

@ -12,6 +12,8 @@ java_library(
srcs = glob(["*.java"]),
deps = [
"//java/google/registry/billing",
"//javatests/google/registry/testing",
"@com_google_apis_google_api_services_dataflow",
"@com_google_dagger",
"@com_google_guava",
"@com_google_truth",
@ -20,6 +22,7 @@ java_library(
"@junit",
"@org_apache_beam_runners_google_cloud_dataflow_java",
"@org_apache_beam_sdks_java_core",
"@org_mockito_all",
],
)

View file

@ -0,0 +1,88 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.billing;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.Dataflow.Projects;
import com.google.api.services.dataflow.Dataflow.Projects.Templates;
import com.google.api.services.dataflow.Dataflow.Projects.Templates.Launch;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.LaunchTemplateParameters;
import com.google.api.services.dataflow.model.LaunchTemplateResponse;
import com.google.api.services.dataflow.model.RuntimeEnvironment;
import google.registry.testing.FakeResponse;
import java.io.IOException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Unit tests for {@link GenerateInvoicesAction}. */
@RunWith(JUnit4.class)
public class GenerateInvoicesActionTest {
Dataflow dataflow = mock(Dataflow.class);
Projects projects = mock(Projects.class);
Templates templates = mock(Templates.class);
Launch launch = mock(Launch.class);
GenerateInvoicesAction action;
FakeResponse response = new FakeResponse();
@Before
public void initializeObjects() throws Exception {
when(dataflow.projects()).thenReturn(projects);
when(projects.templates()).thenReturn(templates);
when(templates.launch(any(String.class), any(LaunchTemplateParameters.class)))
.thenReturn(launch);
when(launch.setGcsPath(any(String.class))).thenReturn(launch);
when(launch.execute()).thenReturn(new LaunchTemplateResponse().setJob(new Job()));
action = new GenerateInvoicesAction();
action.dataflow = dataflow;
action.response = response;
action.projectId = "test-project";
action.beamBucketUrl = "gs://test-project-beam";
}
@Test
public void testLaunchTemplateJob() throws Exception {
action.run();
LaunchTemplateParameters expectedParams =
new LaunchTemplateParameters()
.setJobName("test-bigquerytemplate1")
.setEnvironment(
new RuntimeEnvironment()
.setZone("us-east1-c")
.setTempLocation("gs://test-project-beam/temp"));
verify(templates).launch("test-project", expectedParams);
verify(launch).setGcsPath("gs://test-project-beam/templates/bigquery1");
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getPayload()).isEqualTo("Launched dataflow template.");
}
@Test
public void testCaughtIOException() throws Exception {
when(launch.execute()).thenThrow(new IOException("expected"));
action.run();
assertThat(response.getStatus()).isEqualTo(500);
assertThat(response.getPayload()).isEqualTo("Template launch failed: expected");
}
}

View file

@ -1,38 +0,0 @@
// Copyright 2017 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.billing;
import static com.google.common.truth.Truth.assertThat;
import google.registry.billing.MinWordCount.ExtractWordsFn;
import java.util.List;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Unit tests for {@link MinWordCount}*/
@RunWith(JUnit4.class)
public class MinWordCountTest {
@Test
public void testDebuggingWordCount() throws Exception {
ExtractWordsFn extractWordsFn = new ExtractWordsFn();
DoFnTester<String, String> fntester = DoFnTester.of(extractWordsFn);
String testInput = "hi there\nwhats up\nim good thanks";
List<String> outputs = fntester.processBundle(testInput);
assertThat(outputs).containsExactly("hi", "there", "whats", "up", "im", "good", "thanks");
}
}