Add map reduce job for contact import

Some additional changes were made by Ben McIlwain.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=133875824
This commit is contained in:
Wolfgang Meyers 2016-09-21 14:27:00 -07:00 committed by Ben McIlwain
parent aed3c0f0d0
commit 28eeda189d
21 changed files with 3866 additions and 21 deletions

View file

@ -275,6 +275,12 @@
<url-pattern>/_dr/task/expandRecurringBillingEvents</url-pattern>
</servlet-mapping>
<!-- Mapreduce to import contacts from escrow file -->
<servlet-mapping>
<servlet-name>backend-servlet</servlet-name>
<url-pattern>/_dr/task/importRdeContacts</url-pattern>
</servlet-mapping>
<!-- Security config -->
<security-constraint>
<web-resource-collection>

View file

@ -50,6 +50,7 @@ import google.registry.monitoring.whitebox.MetricsExportAction;
import google.registry.monitoring.whitebox.VerifyEntityIntegrityAction;
import google.registry.monitoring.whitebox.WhiteboxModule;
import google.registry.rde.BrdaCopyAction;
import google.registry.rde.RdeContactImportAction;
import google.registry.rde.RdeModule;
import google.registry.rde.RdeReportAction;
import google.registry.rde.RdeReporter;
@ -105,6 +106,7 @@ interface BackendRequestComponent {
NordnVerifyAction nordnVerifyAction();
PublishDnsUpdatesAction publishDnsUpdatesAction();
ReadDnsQueueAction readDnsQueueAction();
RdeContactImportAction rdeContactImportAction();
RdeReportAction rdeReportAction();
RdeStagingAction rdeStagingAction();
RdeUploadAction rdeUploadAction();

View file

@ -14,6 +14,7 @@ java_library(
"//java/com/google/common/collect",
"//java/com/google/common/html",
"//java/com/google/common/io",
"//java/com/google/common/math",
"//java/com/google/common/net",
"//third_party/java/appengine:appengine-api",
"//third_party/java/appengine_gcs_client",

View file

@ -0,0 +1,135 @@
// Copyright 2016 The Domain Registry Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.rde;
import static google.registry.mapreduce.MapreduceRunner.PARAM_MAP_SHARDS;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.rde.RdeModule.PATH;
import static google.registry.util.PipelineUtils.createJobPath;
import com.google.appengine.tools.cloudstorage.GcsService;
import com.google.appengine.tools.cloudstorage.GcsServiceFactory;
import com.google.appengine.tools.cloudstorage.RetryParams;
import com.google.appengine.tools.mapreduce.Mapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import google.registry.config.ConfigModule;
import google.registry.config.ConfigModule.Config;
import google.registry.gcs.GcsUtils;
import google.registry.mapreduce.MapreduceRunner;
import google.registry.model.contact.ContactResource;
import google.registry.request.Action;
import google.registry.request.Parameter;
import google.registry.request.Response;
import google.registry.util.SystemClock;
import javax.inject.Inject;
/**
* A mapreduce that imports contacts from an escrow file.
*
* <p>Specify the escrow file to import with the "path" parameter.
*/
@Action(path = "/_dr/task/importRdeContacts")
public class RdeContactImportAction implements Runnable {
private static final GcsService GCS_SERVICE =
GcsServiceFactory.createGcsService(RetryParams.getDefaultInstance());
protected final MapreduceRunner mrRunner;
protected final Response response;
protected final String importBucketName;
protected final String importFileName;
protected final Optional<Integer> mapShards;
@Inject
public RdeContactImportAction(
MapreduceRunner mrRunner,
Response response,
@Config("rdeImportBucket") String importBucketName,
@Parameter(PATH) String importFileName,
@Parameter(PARAM_MAP_SHARDS) Optional<Integer> mapShards) {
this.mrRunner = mrRunner;
this.response = response;
this.importBucketName = importBucketName;
this.importFileName = importFileName;
this.mapShards = mapShards;
}
@Override
public void run() {
response.sendJavaScriptRedirect(createJobPath(mrRunner
.setJobName("Import contacts from escrow file")
.setModuleName("backend")
.runMapOnly(
createMapper(),
ImmutableList.of(createInput()))));
}
/**
* Creates a new {@link RdeContactInput}
*
* <p>Should be overridden in a subclass for the purposes of unit testing.
*/
@VisibleForTesting
RdeContactInput createInput() {
return new RdeContactInput(mapShards, importBucketName, importFileName);
}
/**
* Creates a new {@link RdeContactImportMapper}
*
* <p>Should be overridden in a subclass for the purposes of unit testing.
*/
@VisibleForTesting
RdeContactImportMapper createMapper() {
return new RdeContactImportMapper(importBucketName);
}
/** Mapper to import contacts from an escrow file. */
public static class RdeContactImportMapper extends Mapper<ContactResource, Void, Void> {
private static final long serialVersionUID = -7645091075256589374L;
private final String importBucketName;
private transient RdeImportUtils importUtils;
public RdeContactImportMapper(String importBucketName) {
this.importBucketName = importBucketName;
}
private RdeImportUtils getImportUtils() {
if (importUtils == null) {
importUtils = createRdeImportUtils();
}
return importUtils;
}
/**
* Creates a new instance of RdeImportUtils.
*/
private RdeImportUtils createRdeImportUtils() {
return new RdeImportUtils(
ofy(),
new SystemClock(),
importBucketName,
new GcsUtils(GCS_SERVICE, ConfigModule.provideGcsBufferSize()));
}
@Override
public void map(ContactResource contact) {
getImportUtils().importContact(contact);
}
}
}

View file

@ -0,0 +1,134 @@
// Copyright 2016 The Domain Registry Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.rde;
import static com.google.common.math.IntMath.divide;
import static java.math.RoundingMode.CEILING;
import static java.math.RoundingMode.FLOOR;
import com.google.appengine.tools.cloudstorage.GcsFilename;
import com.google.appengine.tools.cloudstorage.GcsService;
import com.google.appengine.tools.cloudstorage.GcsServiceFactory;
import com.google.appengine.tools.cloudstorage.RetryParams;
import com.google.appengine.tools.mapreduce.Input;
import com.google.appengine.tools.mapreduce.InputReader;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import google.registry.config.ConfigModule;
import google.registry.gcs.GcsUtils;
import google.registry.model.contact.ContactResource;
import google.registry.rde.RdeParser.RdeHeader;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
/**
* A MapReduce {@link Input} that imports {@link ContactResource} objects from an escrow file.
*
* <p>If a mapShards parameter has been specified, up to that many readers will be created
* so that each map shard has one reader. If a mapShards parameter has not been specified, a
* default number of readers will be created.
*/
public class RdeContactInput extends Input<ContactResource> {
private static final long serialVersionUID = -366966393494008712L;
private static final GcsService GCS_SERVICE =
GcsServiceFactory.createGcsService(RetryParams.getDefaultInstance());
/**
* Default number of readers if map shards are not specified.
*/
private static final int DEFAULT_READERS = 50;
/**
* Minimum number of records per reader.
*/
private static final int MINIMUM_RECORDS_PER_READER = 100;
/**
* Optional argument to explicitly specify the number of readers.
*/
private final int numReaders;
private final String importBucketName;
private final String importFileName;
/**
* Creates a new {@link RdeContactInput}
*
* @param mapShards Number of readers that should be created
* @param importBucketName Name of GCS bucket for escrow file imports
* @param importFileName Name of escrow file in GCS
*/
public RdeContactInput(Optional<Integer> mapShards, String importBucketName,
String importFileName) {
this.numReaders = mapShards.or(DEFAULT_READERS);
this.importBucketName = importBucketName;
this.importFileName = importFileName;
}
@Override
public List<? extends InputReader<ContactResource>> createReaders() throws IOException {
int numReaders = this.numReaders;
RdeHeader header = newParser().getHeader();
int numberOfContacts = header.getContactCount().intValue();
if (numberOfContacts / numReaders < MINIMUM_RECORDS_PER_READER) {
numReaders = divide(numberOfContacts, MINIMUM_RECORDS_PER_READER, FLOOR);
// use at least one reader
numReaders = Math.max(numReaders, 1);
}
ImmutableList.Builder<RdeContactReader> builder = new ImmutableList.Builder<>();
int contactsPerReader =
Math.max(MINIMUM_RECORDS_PER_READER, divide(numberOfContacts, numReaders, CEILING));
int offset = 0;
for (int i = 0; i < numReaders; i++) {
builder = builder.add(newReader(offset, contactsPerReader));
offset += contactsPerReader;
}
return builder.build();
}
/**
* Creates a new instance of {@link RdeContactReader}
*
* <p>This method can be overridden by a subclass for the purposes of unit testing.
*/
protected RdeContactReader newReader(int offset, int maxResults) {
return new RdeContactReader(importBucketName, importFileName, offset, maxResults);
}
/**
* Creates a new instance of {@link RdeParser}
*/
private RdeParser newParser() {
GcsUtils utils = new GcsUtils(GCS_SERVICE, ConfigModule.provideGcsBufferSize());
GcsFilename filename = new GcsFilename(importBucketName, importFileName);
try (InputStream xmlInput = utils.openInputStream(filename)) {
return new RdeParser(xmlInput);
} catch (Exception e) {
throw new InitializationException(
String.format("Error opening rde file %s/%s", importBucketName, importFileName), e);
}
}
/**
* Thrown when the input cannot initialize properly.
*/
private static class InitializationException extends RuntimeException {
public InitializationException(String message, Throwable cause) {
super(message, cause);
}
}
}

View file

@ -0,0 +1,107 @@
// Copyright 2016 The Domain Registry Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.rde;
import com.google.appengine.tools.cloudstorage.GcsFilename;
import com.google.appengine.tools.cloudstorage.GcsService;
import com.google.appengine.tools.cloudstorage.GcsServiceFactory;
import com.google.appengine.tools.cloudstorage.RetryParams;
import com.google.appengine.tools.mapreduce.InputReader;
import google.registry.config.ConfigModule;
import google.registry.gcs.GcsUtils;
import google.registry.model.contact.ContactResource;
import google.registry.util.FormattingLogger;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.NoSuchElementException;
import javax.annotation.concurrent.NotThreadSafe;
/** Mapreduce {@link InputReader} for reading contacts from escrow files */
@NotThreadSafe
public class RdeContactReader extends InputReader<ContactResource> implements Serializable {
private static final long serialVersionUID = -3688793834175577691L;
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
private static final GcsService GCS_SERVICE =
GcsServiceFactory.createGcsService(RetryParams.getDefaultInstance());
final String importBucketName;
final String importFileName;
final int offset;
final int maxResults;
private int count = 0;
transient RdeParser parser;
/**
* Creates a new instance of {@link RdeParser}
*/
private RdeParser newParser() {
GcsUtils utils = new GcsUtils(GCS_SERVICE, ConfigModule.provideGcsBufferSize());
GcsFilename filename = new GcsFilename(importBucketName, importFileName);
InputStream xmlInput = utils.openInputStream(filename);
try {
RdeParser parser = new RdeParser(xmlInput);
// skip the file offset and count
// if count is greater than 0, the reader has been rehydrated after doing some work.
// skip any already processed records.
parser.skipContacts(offset + count);
return parser;
} catch (Exception e) {
logger.severefmt(e, "Error opening rde file %s/%s", importBucketName, importFileName);
throw new RuntimeException(e);
}
}
public RdeContactReader(
String importBucketName,
String importFileName,
int offset,
int maxResults) {
this.importBucketName = importBucketName;
this.importFileName = importFileName;
this.offset = offset;
this.maxResults = maxResults;
}
@Override
public ContactResource next() throws IOException {
if (count < maxResults) {
if (parser == null) {
parser = newParser();
if (parser.isAtContact()) {
count++;
return XjcToContactResourceConverter.convertContact(parser.getContact());
}
}
if (parser.nextContact()) {
count++;
return XjcToContactResourceConverter.convertContact(parser.getContact());
}
}
throw new NoSuchElementException();
}
@Override
public void endSlice() throws IOException {
super.endSlice();
if (parser != null) {
parser.close();
}
}
}

View file

@ -36,11 +36,14 @@ import google.registry.xjc.rderegistrar.XjcRdeRegistrar;
import java.io.IOException;
import java.io.InputStream;
import javax.inject.Inject;
import javax.xml.bind.JAXBException;
import javax.xml.stream.XMLStreamException;
import org.joda.time.DateTime;
/** Utility functions for escrow file import. */
public final class RdeImportUtils {
/**
* Utility functions for escrow file import.
*/
public class RdeImportUtils {
private static final FormattingLogger logger = FormattingLogger.getLoggerForCallerClass();
@ -147,7 +150,7 @@ public final class RdeImportUtils {
String.format("Registrar '%s' not found in the registry", registrar.getId()));
}
}
} catch (XMLStreamException e) {
} catch (XMLStreamException | JAXBException e) {
throw new IllegalArgumentException(
String.format("Invalid XML file: '%s'", escrowFilePath), e);
}

View file

@ -34,6 +34,7 @@ import org.joda.time.DateTime;
public final class RdeModule {
static final String PARAM_WATERMARK = "watermark";
static final String PATH = "path";
@Provides
@Parameter(PARAM_WATERMARK)
@ -52,4 +53,10 @@ public final class RdeModule {
static Queue provideQueueRdeReport() {
return getQueue("rde-report");
}
@Provides
@Parameter(PATH)
static String providePath(HttpServletRequest req) {
return RequestParameters.extractRequiredParameter(req, PATH);
}
}

View file

@ -17,9 +17,9 @@ package google.registry.rde;
import static com.google.common.base.Preconditions.checkArgument;
import static google.registry.util.PreconditionsUtils.checkArgumentNotNull;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import google.registry.xjc.XjcXmlTransformer;
import google.registry.xjc.rdecontact.XjcRdeContact;
import google.registry.xjc.rdecontact.XjcRdeContactElement;
import google.registry.xjc.rdedomain.XjcRdeDomain;
@ -37,17 +37,17 @@ import google.registry.xjc.rdenndn.XjcRdeNndn;
import google.registry.xjc.rdenndn.XjcRdeNndnElement;
import google.registry.xjc.rderegistrar.XjcRdeRegistrar;
import google.registry.xjc.rderegistrar.XjcRdeRegistrarElement;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import javax.annotation.concurrent.NotThreadSafe;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.stax.StAXSource;
import javax.xml.transform.stream.StreamResult;
/**
* RDE escrow deposit file parser
@ -75,7 +75,7 @@ import javax.xml.transform.stream.StreamResult;
* parser.
*/
@NotThreadSafe
public class RdeParser {
public class RdeParser implements Closeable {
private static final String RDE_DOMAIN_URI = "urn:ietf:params:xml:ns:rdeDomain-1.0";
private static final String RDE_HOST_URI = "urn:ietf:params:xml:ns:rdeHost-1.0";
@ -86,6 +86,23 @@ public class RdeParser {
private static final String RDE_EPP_PARAMS_URI = "urn:ietf:params:xml:ns:rdeEppParams-1.0";
private static final String RDE_HEADER_URI = "urn:ietf:params:xml:ns:rdeHeader-1.0";
/** List of packages to initialize JAXBContext. **/
private static final String JAXB_CONTEXT_PACKAGES = Joiner.on(":")
.join(Arrays.asList(
"google.registry.xjc.contact",
"google.registry.xjc.domain",
"google.registry.xjc.host",
"google.registry.xjc.mark",
"google.registry.xjc.rde",
"google.registry.xjc.rdecontact",
"google.registry.xjc.rdedomain",
"google.registry.xjc.rdeeppparams",
"google.registry.xjc.rdeheader",
"google.registry.xjc.rdeidn",
"google.registry.xjc.rdenndn",
"google.registry.xjc.rderegistrar",
"google.registry.xjc.smd"));
/**
* Convenient immutable java representation of an RDE header
*/
@ -136,18 +153,23 @@ public class RdeParser {
}
}
private final InputStream xmlInput;
private final XMLStreamReader reader;
private final Unmarshaller unmarshaller;
private RdeHeader header;
/**
* Creates a new instance of {@link RdeParser}
*
* @param xmlInput Contents of the escrow deposit file
* @throws JAXBException
*/
public RdeParser(InputStream xmlInput) throws XMLStreamException {
XMLInputFactory factory = XMLInputFactory.newInstance();
reader = factory.createXMLStreamReader(xmlInput);
header = new RdeHeader(readHeader());
public RdeParser(InputStream xmlInput) throws XMLStreamException, JAXBException {
this.xmlInput = xmlInput;
this.unmarshaller = JAXBContext.newInstance(JAXB_CONTEXT_PACKAGES).createUnmarshaller();
this.reader = XMLInputFactory.newInstance().createXMLStreamReader(xmlInput);
this.header = new RdeHeader(readHeader());
}
/**
@ -176,12 +198,7 @@ public class RdeParser {
checkArgumentNotNull(uri, "uri cannot be null");
try {
if (isAtElement(uri, name)) {
TransformerFactory tf = TransformerFactory.newInstance();
Transformer t = tf.newTransformer();
ByteArrayOutputStream bout = new ByteArrayOutputStream();
t.transform(new StAXSource(reader), new StreamResult(bout));
ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
Object element = XjcXmlTransformer.unmarshal(Object.class, bin);
Object element = unmarshaller.unmarshal(reader);
return element;
} else {
throw new IllegalStateException(String.format("Not at element %s:%s", uri, name));
@ -562,4 +579,14 @@ public class RdeParser {
(XjcRdeEppParamsElement) unmarshalElement(RDE_EPP_PARAMS_URI, "eppParams");
return element.getValue();
}
/**
* Closes the underlying InputStream
*
* @throws IOException if the underlying stream throws {@link IOException} on close.
*/
@Override
public void close() throws IOException {
xmlInput.close();
}
}