diff --git a/core/src/main/java/google/registry/backup/AppEngineEnvironment.java b/core/src/main/java/google/registry/backup/AppEngineEnvironment.java new file mode 100644 index 000000000..7c955370e --- /dev/null +++ b/core/src/main/java/google/registry/backup/AppEngineEnvironment.java @@ -0,0 +1,66 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.backup; + +import com.google.apphosting.api.ApiProxy; +import com.google.apphosting.api.ApiProxy.Environment; +import com.google.common.collect.ImmutableMap; +import java.io.Closeable; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; + +/** + * Sets up a placeholder {@link Environment} on a non-AppEngine platform so that Datastore Entities + * can be deserialized. See {@code DatastoreEntityExtension} in test source for more information. + */ +public class AppEngineEnvironment implements Closeable { + + private static final Environment PLACEHOLDER_ENV = createAppEngineEnvironment(); + + private boolean isPlaceHolderNeeded; + + AppEngineEnvironment() { + isPlaceHolderNeeded = ApiProxy.getCurrentEnvironment() == null; + // isPlaceHolderNeeded may be true when we are invoked in a test with AppEngineRule. + if (isPlaceHolderNeeded) { + ApiProxy.setEnvironmentForCurrentThread(PLACEHOLDER_ENV); + } + } + + @Override + public void close() { + if (isPlaceHolderNeeded) { + ApiProxy.setEnvironmentForCurrentThread(null); + } + } + + /** Returns a placeholder {@link Environment} that can return hardcoded AppId and Attributes. */ + private static Environment createAppEngineEnvironment() { + return (Environment) + Proxy.newProxyInstance( + Environment.class.getClassLoader(), + new Class[] {Environment.class}, + (Object proxy, Method method, Object[] args) -> { + switch (method.getName()) { + case "getAppId": + return "PlaceholderAppId"; + case "getAttributes": + return ImmutableMap.of(); + default: + throw new UnsupportedOperationException(method.getName()); + } + }); + } +} diff --git a/core/src/main/java/google/registry/backup/CommitLogImports.java b/core/src/main/java/google/registry/backup/CommitLogImports.java new file mode 100644 index 000000000..7c37149f8 --- /dev/null +++ b/core/src/main/java/google/registry/backup/CommitLogImports.java @@ -0,0 +1,95 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.backup; + +import static com.google.common.base.Preconditions.checkState; +import static google.registry.backup.BackupUtils.createDeserializingIterator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Streams; +import google.registry.model.ImmutableObject; +import google.registry.model.ofy.CommitLogCheckpoint; +import google.registry.model.ofy.CommitLogManifest; +import google.registry.model.ofy.CommitLogMutation; +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.Iterator; +import java.util.stream.Stream; + +/** + * Helpers for reading CommitLog records from a file. + * + *

This class is adapted from {@link RestoreCommitLogsAction}, and will be used in the initial + * population of the Cloud SQL database. + */ +public final class CommitLogImports { + + private CommitLogImports() {} + + /** + * Returns entities in an {@code inputStream} (from a single CommitLog file) as an {@link + * ImmutableList} of {@link VersionedEntity} records. Upon completion the {@code inputStream} is + * closed. + * + *

The returned list may be empty, since CommitLogs are written at fixed intervals regardless + * if actual changes exist. + * + *

A CommitLog file starts with a {@link CommitLogCheckpoint}, followed by (repeated) + * subsequences of [{@link CommitLogManifest}, [{@link CommitLogMutation}] ...]. Each subsequence + * represents the changes in one transaction. The {@code CommitLogManifest} contains deleted + * entity keys, whereas each {@code CommitLogMutation} contains one whole entity. + */ + public static ImmutableList loadEntities(InputStream inputStream) { + try (AppEngineEnvironment appEngineEnvironment = new AppEngineEnvironment(); + InputStream input = new BufferedInputStream(inputStream)) { + Iterator commitLogs = createDeserializingIterator(input); + checkState(commitLogs.hasNext()); + checkState(commitLogs.next() instanceof CommitLogCheckpoint); + + return Streams.stream(commitLogs) + .map( + e -> + e instanceof CommitLogManifest + ? VersionedEntity.fromManifest((CommitLogManifest) e) + : Stream.of(VersionedEntity.fromMutation((CommitLogMutation) e))) + .flatMap(s -> s) + .collect(ImmutableList.toImmutableList()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** Covenience method that adapts {@link #loadEntities(InputStream)} to a {@link File}. */ + public static ImmutableList loadEntities(File commitLogFile) { + try { + return loadEntities(new FileInputStream(commitLogFile)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Covenience method that adapts {@link #loadEntities(InputStream)} to a {@link + * ReadableByteChannel}. + */ + public static ImmutableList loadEntities(ReadableByteChannel channel) { + return loadEntities(Channels.newInputStream(channel)); + } +} diff --git a/core/src/main/java/google/registry/backup/VersionedEntity.java b/core/src/main/java/google/registry/backup/VersionedEntity.java new file mode 100644 index 000000000..2cbd56150 --- /dev/null +++ b/core/src/main/java/google/registry/backup/VersionedEntity.java @@ -0,0 +1,160 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.backup; + +import com.google.appengine.api.datastore.Entity; +import com.google.appengine.api.datastore.EntityTranslator; +import com.google.appengine.api.datastore.Key; +import com.google.auto.value.AutoValue; +import com.google.auto.value.extension.memoized.Memoized; +import google.registry.model.ofy.CommitLogManifest; +import google.registry.model.ofy.CommitLogMutation; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Optional; +import java.util.stream.Stream; +import javax.annotation.Nullable; + +/** + * A Datastore {@link Entity Entity's} timestamped state. + * + *

For a new or updated Entity, its ProtocolBuffer bytes are stored along with its {@link Key}. + * For a deleted entity, only its {@link Key} is stored, and the {@link #entityProtoBytes} is left + * as null. + * + *

Note that {@link Optional java.util.Optional} is not serializable, therefore cannot be used as + * property type in this class. + */ +@AutoValue +public abstract class VersionedEntity implements Serializable { + + private static final long serialVersionUID = 1L; + + public abstract long commitTimeMills(); + + /** The {@link Key} of the {@link Entity}. */ + public abstract Key key(); + + /** Serialized form of the {@link Entity}. This property is {@code null} for a deleted Entity. */ + @Nullable + abstract ImmutableBytes entityProtoBytes(); + + @Memoized + public Optional getEntity() { + return Optional.ofNullable(entityProtoBytes()) + .map(ImmutableBytes::getBytes) + .map(EntityTranslator::createFromPbBytes); + } + + public boolean isDelete() { + return entityProtoBytes() == null; + } + + /** + * Converts deleted entity keys in {@code manifest} into a {@link Stream} of {@link + * VersionedEntity VersionedEntities}. See {@link CommitLogImports#loadEntities} for more + * information. + */ + public static Stream fromManifest(CommitLogManifest manifest) { + long commitTimeMillis = manifest.getCommitTime().getMillis(); + return manifest.getDeletions().stream() + .map(com.googlecode.objectify.Key::getRaw) + .map(key -> builder().commitTimeMills(commitTimeMillis).key(key).build()); + } + + /* Converts a {@link CommitLogMutation} to a {@link VersionedEntity}. */ + public static VersionedEntity fromMutation(CommitLogMutation mutation) { + return from( + com.googlecode.objectify.Key.create(mutation).getParent().getId(), + mutation.getEntityProtoBytes()); + } + + public static VersionedEntity from(long commitTimeMillis, byte[] entityProtoBytes) { + return builder() + .entityProtoBytes(entityProtoBytes) + .key(EntityTranslator.createFromPbBytes(entityProtoBytes).getKey()) + .commitTimeMills(commitTimeMillis) + .build(); + } + + static Builder builder() { + return new AutoValue_VersionedEntity.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder commitTimeMills(long commitTimeMillis); + + abstract Builder entityProtoBytes(ImmutableBytes bytes); + + public abstract Builder key(Key key); + + public abstract VersionedEntity build(); + + public Builder entityProtoBytes(byte[] bytes) { + return entityProtoBytes(new ImmutableBytes(bytes)); + } + } + + /** + * Wraps a byte array and prevents it from being modified by its original owner. + * + *

While this class seems an overkill, it exists for two reasons: + * + *

+ */ + static final class ImmutableBytes implements Serializable { + + private static final long serialVersionUID = 1L; + + private final byte[] bytes; + + ImmutableBytes(byte[] bytes) { + this.bytes = Arrays.copyOf(bytes, bytes.length); + } + + /** + * Returns the saved byte array. Invocation is restricted to trusted callers, who must not + * modify the array. + */ + byte[] getBytes() { + return bytes; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ImmutableBytes)) { + return false; + } + ImmutableBytes that = (ImmutableBytes) o; + // Do not use Objects.equals, which checks reference identity instead of data in array. + return Arrays.equals(bytes, that.bytes); + } + + @Override + public int hashCode() { + // Do not use Objects.hashCode, which hashes the reference, not the data in array. + return Arrays.hashCode(bytes); + } + } +} diff --git a/core/src/main/java/google/registry/beam/initsql/BackupPaths.java b/core/src/main/java/google/registry/beam/initsql/BackupPaths.java index 2665b1919..0a4c498a6 100644 --- a/core/src/main/java/google/registry/beam/initsql/BackupPaths.java +++ b/core/src/main/java/google/registry/beam/initsql/BackupPaths.java @@ -17,8 +17,7 @@ package google.registry.beam.initsql; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import org.joda.time.DateTime; /** * Helpers for determining the fully qualified paths to Nomulus backup files. A backup consists of a @@ -30,13 +29,9 @@ public final class BackupPaths { private static final String WILDCARD_CHAR = "*"; private static final String EXPORT_PATTERN_TEMPLATE = "%s/all_namespaces/kind_%s/input-%s"; - /** - * Regex pattern that captures the kind string in a file name. Datastore places no restrictions on - * what characters may be used in a kind string. - */ - private static final String FILENAME_TO_KIND_REGEX = ".+/all_namespaces/kind_(.+)/input-.+"; - private static final Pattern FILENAME_TO_KIND_PATTERN = Pattern.compile(FILENAME_TO_KIND_REGEX); + public static final String COMMIT_LOG_NAME_PREFIX = "commit_diff_until_"; + private static final String COMMIT_LOG_PATTERN_TEMPLATE = "%s/" + COMMIT_LOG_NAME_PREFIX + "*"; /** * Returns a regex pattern that matches all Datastore export files of a given {@code kind}. @@ -65,22 +60,15 @@ public final class BackupPaths { return String.format(EXPORT_PATTERN_TEMPLATE, exportDir, kind, Integer.toString(shard)); } - /** - * Returns the 'kind' of entity stored in a file based on the file name. - * - *

This method poses low risk and greatly simplifies the implementation of some transforms in - * {@link ExportLoadingTransforms}. - * - * @see ExportLoadingTransforms - */ - public static String getKindFromFileName(String fileName) { + public static String getCommitLogFileNamePattern(String commitLogDir) { + return String.format(COMMIT_LOG_PATTERN_TEMPLATE, commitLogDir); + } + + /** Gets the Commit timestamp from a CommitLog file name. */ + public static DateTime getCommitLogTimestamp(String fileName) { checkArgument(!isNullOrEmpty(fileName), "Null or empty fileName."); - Matcher matcher = FILENAME_TO_KIND_PATTERN.matcher(fileName); - checkArgument( - matcher.matches(), - "Illegal file name %s, should match %s.", - fileName, - FILENAME_TO_KIND_REGEX); - return matcher.group(1); + int start = fileName.lastIndexOf(COMMIT_LOG_NAME_PREFIX); + checkArgument(start >= 0, "Illegal file name %s.", fileName); + return DateTime.parse(fileName.substring(start + COMMIT_LOG_NAME_PREFIX.length())); } } diff --git a/core/src/main/java/google/registry/beam/initsql/CommitLogTransforms.java b/core/src/main/java/google/registry/beam/initsql/CommitLogTransforms.java new file mode 100644 index 000000000..429722d8a --- /dev/null +++ b/core/src/main/java/google/registry/beam/initsql/CommitLogTransforms.java @@ -0,0 +1,109 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.initsql; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static google.registry.beam.initsql.BackupPaths.getCommitLogFileNamePattern; +import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp; +import static google.registry.beam.initsql.Transforms.processFiles; +import static google.registry.util.DateTimeUtils.isBeforeOrAt; + +import google.registry.backup.CommitLogImports; +import google.registry.backup.VersionedEntity; +import java.io.IOException; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.FileIO.ReadableFile; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.DateTime; + +/** + * {@link org.apache.beam.sdk.transforms.PTransform Pipeline transforms} for loading from Nomulus + * CommitLog files. They are all part of a transformation that loads raw records from a sequence of + * Datastore CommitLog files, and are broken apart for testing. + */ +public class CommitLogTransforms { + + /** + * Returns a {@link PTransform transform} that can generate a collection of patterns that match + * all Datastore CommitLog files. + */ + public static PTransform> getCommitLogFilePatterns( + String commitLogDir) { + return Create.of(getCommitLogFileNamePattern(commitLogDir)).withCoder(StringUtf8Coder.of()); + } + + /** + * Returns files with timestamps between {@code fromTime} (inclusive) and {@code endTime} + * (exclusive). + */ + public static PTransform, PCollection> + filterCommitLogsByTime(DateTime fromTime, DateTime toTime) { + checkNotNull(fromTime, "fromTime"); + checkNotNull(toTime, "toTime"); + checkArgument( + fromTime.isBefore(toTime), + "Invalid time range: fromTime (%s) is before endTime (%s)", + fromTime, + toTime); + return ParDo.of(new FilterCommitLogFileByTime(fromTime, toTime)); + } + + /** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */ + public static PTransform, PCollection> + loadCommitLogsFromFiles() { + return processFiles(new LoadOneCommitLogsFile()); + } + + static class FilterCommitLogFileByTime extends DoFn { + private final DateTime fromTime; + private final DateTime toTime; + + public FilterCommitLogFileByTime(DateTime fromTime, DateTime toTime) { + this.fromTime = fromTime; + this.toTime = toTime; + } + + @ProcessElement + public void processElement(@Element String fileName, OutputReceiver out) { + DateTime timestamp = getCommitLogTimestamp(fileName); + if (isBeforeOrAt(fromTime, timestamp) && timestamp.isBefore(toTime)) { + out.output(fileName); + } + } + } + + /** + * Reads a CommitLog file and converts its content into {@link VersionedEntity VersionedEntities}. + */ + static class LoadOneCommitLogsFile extends DoFn { + + @ProcessElement + public void processElement(@Element ReadableFile file, OutputReceiver out) { + try { + CommitLogImports.loadEntities(file.open()).forEach(out::output); + } catch (IOException e) { + // Let the pipeline retry the whole file. + throw new RuntimeException(e); + } + } + } +} diff --git a/core/src/main/java/google/registry/beam/initsql/ExportLoadingTransforms.java b/core/src/main/java/google/registry/beam/initsql/ExportLoadingTransforms.java index b7dcc62ee..4f2324c54 100644 --- a/core/src/main/java/google/registry/beam/initsql/ExportLoadingTransforms.java +++ b/core/src/main/java/google/registry/beam/initsql/ExportLoadingTransforms.java @@ -15,38 +15,26 @@ package google.registry.beam.initsql; import static google.registry.beam.initsql.BackupPaths.getExportFileNamePattern; -import static google.registry.beam.initsql.BackupPaths.getKindFromFileName; -import static org.apache.beam.sdk.values.TypeDescriptors.kvs; -import static org.apache.beam.sdk.values.TypeDescriptors.strings; +import static google.registry.beam.initsql.Transforms.processFiles; import com.google.common.collect.ImmutableList; +import google.registry.backup.VersionedEntity; import google.registry.tools.LevelDbLogReader; import java.io.IOException; import java.util.Collection; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.Compression; -import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.FileIO.ReadableFile; -import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; /** * {@link PTransform Pipeline transforms} for loading from Datastore export files. They are all part * of a transformation that loads raw records from a Datastore export, and are broken apart for * testing. - * - *

We drop the 'kind' information in {@link #getDatastoreExportFilePatterns} and recover it later - * using the file paths. Although we could have kept it by passing around {@link KV key-value - * pairs}, the code would be more complicated, especially in {@link #loadDataFromFiles()}. */ public class ExportLoadingTransforms { @@ -63,51 +51,28 @@ public class ExportLoadingTransforms { .withCoder(StringUtf8Coder.of()); } - /** - * Returns a {@link PTransform} from file name patterns to file {@link Metadata Metadata records}. - */ - public static PTransform, PCollection> getFilesByPatterns() { - return new PTransform, PCollection>() { - @Override - public PCollection expand(PCollection input) { - return input.apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)); - } - }; + /** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */ + public static PTransform, PCollection> + loadExportDataFromFiles() { + return processFiles(new LoadOneExportShard()); } /** - * Returns a {@link PTransform} from file {@link Metadata} to {@link KV Key-Value pairs} with - * entity 'kind' as key and raw record as value. - */ - public static PTransform, PCollection>> - loadDataFromFiles() { - return new PTransform, PCollection>>() { - @Override - public PCollection> expand(PCollection input) { - return input - .apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)) - .apply( - MapElements.into(kvs(strings(), TypeDescriptor.of(ReadableFile.class))) - .via(file -> KV.of(getKindFromFileName(file.getMetadata().toString()), file))) - .apply("Load One LevelDb File", ParDo.of(new LoadOneFile())); - } - }; - } - - /** - * Reads a LevelDb file and converts each raw record into a {@link KV pair} of kind and bytes. + * Reads a LevelDb file and converts each raw record into a {@link VersionedEntity}. All such + * entities use {@link Long#MIN_VALUE} as timestamp, so that they go before data from CommitLogs. * *

LevelDb files are not seekable because a large object may span multiple blocks. If a * sequential read fails, the file needs to be retried from the beginning. */ - private static class LoadOneFile extends DoFn, KV> { + private static class LoadOneExportShard extends DoFn { + + private static final long TIMESTAMP = Long.MIN_VALUE; @ProcessElement - public void processElement( - @Element KV kv, OutputReceiver> output) { + public void processElement(@Element ReadableFile file, OutputReceiver output) { try { - LevelDbLogReader.from(kv.getValue().open()) - .forEachRemaining(record -> output.output(KV.of(kv.getKey(), record))); + LevelDbLogReader.from(file.open()) + .forEachRemaining(record -> output.output(VersionedEntity.from(TIMESTAMP, record))); } catch (IOException e) { // Let the pipeline retry the whole file. throw new RuntimeException(e); diff --git a/core/src/main/java/google/registry/beam/initsql/Transforms.java b/core/src/main/java/google/registry/beam/initsql/Transforms.java new file mode 100644 index 000000000..8eefc0cef --- /dev/null +++ b/core/src/main/java/google/registry/beam/initsql/Transforms.java @@ -0,0 +1,61 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.initsql; + +import google.registry.backup.VersionedEntity; +import org.apache.beam.sdk.io.Compression; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileIO.ReadableFile; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; + +/** + * Common {@link PTransform pipeline transforms} used in pipelines that load from both Datastore + * export files and Nomulus CommitLog files. + */ +public class Transforms { + + /** + * Returns a {@link PTransform} from file name patterns to file {@link Metadata Metadata records}. + */ + public static PTransform, PCollection> getFilesByPatterns() { + return new PTransform, PCollection>() { + @Override + public PCollection expand(PCollection input) { + return input.apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)); + } + }; + } + + /** + * Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity} using + * caller-provided {@code transformer}. + */ + public static PTransform, PCollection> processFiles( + DoFn transformer) { + return new PTransform, PCollection>() { + @Override + public PCollection expand(PCollection input) { + return input + .apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED)) + .apply(transformer.getClass().getSimpleName(), ParDo.of(transformer)); + } + }; + } +} diff --git a/core/src/test/java/google/registry/backup/CommitLogExports.java b/core/src/test/java/google/registry/backup/CommitLogExports.java new file mode 100644 index 000000000..b16a1aca7 --- /dev/null +++ b/core/src/test/java/google/registry/backup/CommitLogExports.java @@ -0,0 +1,231 @@ +// Copyright 2017 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.backup; + +import static com.google.common.base.MoreObjects.firstNonNull; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.concat; +import static com.google.common.collect.Lists.partition; +import static google.registry.backup.BackupUtils.serializeEntity; +import static google.registry.model.ofy.CommitLogBucket.getBucketKey; +import static google.registry.model.ofy.ObjectifyService.ofy; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.util.DateTimeUtils.START_OF_TIME; +import static google.registry.util.DateTimeUtils.isAtOrAfter; +import static java.util.Comparator.comparingLong; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Streams; +import com.googlecode.objectify.Key; +import google.registry.model.ImmutableObject; +import google.registry.model.ofy.CommitLogBucket; +import google.registry.model.ofy.CommitLogCheckpoint; +import google.registry.model.ofy.CommitLogCheckpointRoot; +import google.registry.model.ofy.CommitLogManifest; +import google.registry.model.ofy.CommitLogMutation; +import google.registry.util.Clock; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.annotation.Nullable; +import org.joda.time.DateTime; + +/** + * Helpers for exporting the diff between two commit log checkpoints to a local file. + * + *

In production, CommitLogs are saved periodically by cron jobs. During each job, the {@link + * CommitLogCheckpointAction} is invoked first to compute a {@link CommitLogCheckpoint} and persist + * it in Datastore. Then the {@link ExportCommitLogDiffAction} is invoked to export the diffs + * accumulated between the previous and current checkpoints to a file. + * + *

The {@link #computeCheckpoint(Clock)} method is copied with simplification from {@link + * CommitLogCheckpointAction}, and the {@link #saveCommitLogs(String, CommitLogCheckpoint, + * CommitLogCheckpoint)} method is copied with simplification from {@link + * ExportCommitLogDiffAction}. We opted for copying instead of refactoring to reduce risk to + * production code. + */ +public final class CommitLogExports { + + public static final String DIFF_FILE_PREFIX = "commit_diff_until_"; + + private static final int EXPORT_DIFF_BATCH_SIZE = 100; + + private CommitLogExports() {} + + /** + * Returns the next {@link CommitLogCheckpoint} for Commit logs. Please refer to the class javadoc + * for background. + */ + public static CommitLogCheckpoint computeCheckpoint(Clock clock) { + CommitLogCheckpointStrategy strategy = new CommitLogCheckpointStrategy(); + strategy.clock = clock; + strategy.ofy = ofy(); + + CommitLogCheckpoint checkpoint = strategy.computeCheckpoint(); + tm().transact( + () -> { + DateTime lastWrittenTime = CommitLogCheckpointRoot.loadRoot().getLastWrittenTime(); + checkState( + checkpoint.getCheckpointTime().isAfter(lastWrittenTime), + "Newer checkpoint already written at time: %s", + lastWrittenTime); + ofy() + .saveWithoutBackup() + .entities( + checkpoint, CommitLogCheckpointRoot.create(checkpoint.getCheckpointTime())); + }); + return checkpoint; + } + + /** + * Saves the incremental changes between {@code prevCheckpoint} and {@code checkpoint} and returns + * the {@link File}. Please refer to class javadoc for background. + */ + public static File saveCommitLogs( + String commitLogDir, + @Nullable CommitLogCheckpoint prevCheckpoint, + CommitLogCheckpoint checkpoint) { + checkArgument( + prevCheckpoint == null + || (isAtOrAfter(prevCheckpoint.getCheckpointTime(), START_OF_TIME) + && prevCheckpoint.getCheckpointTime().isBefore(checkpoint.getCheckpointTime())), + "Inversed checkpoint: prev is %s, current is %s.", + Optional.ofNullable(prevCheckpoint) + .map(CommitLogCheckpoint::getCheckpointTime) + .map(DateTime::toString) + .orElse("null"), + checkpoint.getCheckpointTime().toString()); + + // Load the keys of all the manifests to include in this diff. + List> sortedKeys = loadAllDiffKeys(prevCheckpoint, checkpoint); + // Open an output channel to GCS, wrapped in a stream for convenience. + File commitLogFile = + new File(commitLogDir + "/" + DIFF_FILE_PREFIX + checkpoint.getCheckpointTime()); + try (OutputStream commitLogStream = + new BufferedOutputStream(new FileOutputStream(commitLogFile))) { + // Export the upper checkpoint itself. + serializeEntity(checkpoint, commitLogStream); + // If there are no manifests to export, stop early, now that we've written out the file with + // the checkpoint itself (which is needed for restores, even if it's empty). + if (sortedKeys.isEmpty()) { + return commitLogFile; + } + // Export to GCS in chunks, one per fixed batch of commit logs. While processing one batch, + // asynchronously load the entities for the next one. + List>> keyChunks = partition(sortedKeys, EXPORT_DIFF_BATCH_SIZE); + // Objectify's map return type is asynchronous. Calling .values() will block until it loads. + Map nextChunkToExport = ofy().load().keys(keyChunks.get(0)); + for (int i = 0; i < keyChunks.size(); i++) { + // Force the async load to finish. + Collection chunkValues = nextChunkToExport.values(); + // Since there is no hard bound on how much data this might be, take care not to let the + // Objectify session cache fill up and potentially run out of memory. This is the only safe + // point to do this since at this point there is no async load in progress. + ofy().clearSessionCache(); + // Kick off the next async load, which can happen in parallel to the current GCS export. + if (i + 1 < keyChunks.size()) { + nextChunkToExport = ofy().load().keys(keyChunks.get(i + 1)); + } + exportChunk(commitLogStream, chunkValues); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return commitLogFile; + } + + /** + * Loads all the diff keys, sorted in a transaction-consistent chronological order. + * + * @param lowerCheckpoint exclusive lower bound on keys in this diff, or null if no lower bound + * @param upperCheckpoint inclusive upper bound on keys in this diff + */ + private static ImmutableList> loadAllDiffKeys( + @Nullable final CommitLogCheckpoint lowerCheckpoint, + final CommitLogCheckpoint upperCheckpoint) { + // Fetch the keys (no data) between these checkpoints, and sort by timestamp. This ordering is + // transaction-consistent by virtue of our checkpoint strategy and our customized Ofy; see + // CommitLogCheckpointStrategy for the proof. We break ties by sorting on bucket ID to ensure + // a deterministic order. + return upperCheckpoint.getBucketTimestamps().keySet().stream() + .flatMap( + bucketNum -> + Streams.stream(loadDiffKeysFromBucket(lowerCheckpoint, upperCheckpoint, bucketNum))) + .sorted( + comparingLong(Key::getId) + .thenComparingLong(a -> a.getParent().getId())) + .collect(toImmutableList()); + } + + /** + * Loads the diff keys for one bucket. + * + * @param lowerCheckpoint exclusive lower bound on keys in this diff, or null if no lower bound + * @param upperCheckpoint inclusive upper bound on keys in this diff + * @param bucketNum the bucket to load diff keys from + */ + private static Iterable> loadDiffKeysFromBucket( + @Nullable CommitLogCheckpoint lowerCheckpoint, + CommitLogCheckpoint upperCheckpoint, + int bucketNum) { + // If no lower checkpoint exists, or if it exists but had no timestamp for this bucket number + // (because the bucket count was increased between these checkpoints), then use START_OF_TIME + // as the effective exclusive lower bound. + DateTime lowerCheckpointBucketTime = + firstNonNull( + (lowerCheckpoint == null) ? null : lowerCheckpoint.getBucketTimestamps().get(bucketNum), + START_OF_TIME); + // Since START_OF_TIME=0 is not a valid id in a key, add 1 to both bounds. Then instead of + // loading lowerBound < x <= upperBound, we can load lowerBound <= x < upperBound. + DateTime lowerBound = lowerCheckpointBucketTime.plusMillis(1); + DateTime upperBound = upperCheckpoint.getBucketTimestamps().get(bucketNum).plusMillis(1); + // If the lower and upper bounds are equal, there can't be any results, so skip the query. + if (lowerBound.equals(upperBound)) { + return ImmutableSet.of(); + } + Key bucketKey = getBucketKey(bucketNum); + return ofy() + .load() + .type(CommitLogManifest.class) + .ancestor(bucketKey) + .filterKey(">=", CommitLogManifest.createKey(bucketKey, lowerBound)) + .filterKey("<", CommitLogManifest.createKey(bucketKey, upperBound)) + .keys(); + } + + /** Writes a chunks-worth of manifests and associated mutations to GCS. */ + private static void exportChunk(OutputStream gcsStream, Collection chunk) + throws IOException { + // Kickoff async loads for all the manifests in the chunk. + ImmutableList.Builder> entities = + new ImmutableList.Builder<>(); + for (CommitLogManifest manifest : chunk) { + entities.add(ImmutableList.of(manifest)); + entities.add(ofy().load().type(CommitLogMutation.class).ancestor(manifest)); + } + for (ImmutableObject entity : concat(entities.build())) { + serializeEntity(entity, gcsStream); + } + } +} diff --git a/core/src/test/java/google/registry/beam/initsql/BackupPathsTest.java b/core/src/test/java/google/registry/beam/initsql/BackupPathsTest.java deleted file mode 100644 index 537a8c429..000000000 --- a/core/src/test/java/google/registry/beam/initsql/BackupPathsTest.java +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2020 The Nomulus Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package google.registry.beam.initsql; - -import static com.google.common.truth.Truth.assertThat; -import static google.registry.beam.initsql.BackupPaths.getKindFromFileName; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import org.junit.jupiter.api.Test; - -/** Unit tests for {@link google.registry.beam.initsql.BackupPaths}. */ -public class BackupPathsTest { - - @Test - void getKindFromFileName_empty() { - assertThrows(IllegalArgumentException.class, () -> getKindFromFileName("")); - } - - @Test - void getKindFromFileName_notMatch() { - assertThrows( - IllegalArgumentException.class, - () -> getKindFromFileName("/tmp/all_namespaces/kind_/input-0")); - } - - @Test - void getKindFromFileName_success() { - assertThat(getKindFromFileName("scheme:/somepath/all_namespaces/kind_mykind/input-something")) - .isEqualTo("mykind"); - } - - @Test - void getKindFromFileName_specialChar_success() { - assertThat( - getKindFromFileName("scheme:/somepath/all_namespaces/kind_.*+? /(a)/input-something")) - .isEqualTo(".*+? /(a)"); - } -} diff --git a/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java b/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java index 6ecca474a..2b5dd33e1 100644 --- a/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java +++ b/core/src/test/java/google/registry/beam/initsql/BackupTestStore.java @@ -20,6 +20,8 @@ import static google.registry.persistence.transaction.TransactionManagerFactory. import com.google.appengine.api.datastore.Entity; import com.googlecode.objectify.Key; +import google.registry.backup.CommitLogExports; +import google.registry.model.ofy.CommitLogCheckpoint; import google.registry.testing.AppEngineRule; import google.registry.testing.FakeClock; import google.registry.tools.LevelDbFileBuilder; @@ -35,6 +37,10 @@ import org.joda.time.format.DateTimeFormatter; *

A Datastore backup consists of an unsynchronized data export and a sequence of incremental * Commit Logs that overlap with the export process. Together they can be used to recreate a * consistent snapshot of the Datastore. + * + *

For convenience of test-writing, the {@link #fakeClock} is advanced by 1 millisecond after + * every transaction is invoked on this store, ensuring strictly increasing timestamps on causally + * dependent transactions. In production, the same ordering is ensured by sleep and retry. */ class BackupTestStore implements AutoCloseable { @@ -44,6 +50,8 @@ class BackupTestStore implements AutoCloseable { private final FakeClock fakeClock; private AppEngineRule appEngine; + private CommitLogCheckpoint prevCommitLogCheckpoint; + BackupTestStore(FakeClock fakeClock) throws Exception { this.fakeClock = fakeClock; this.appEngine = @@ -55,16 +63,27 @@ class BackupTestStore implements AutoCloseable { this.appEngine.beforeEach(null); } + void transact(Iterable deletes, Iterable newOrUpdated) { + tm().transact( + () -> { + ofy().delete().entities(deletes); + ofy().save().entities(newOrUpdated); + }); + fakeClock.advanceOneMilli(); + } + /** Inserts or updates {@code entities} in the Datastore. */ @SafeVarargs final void insertOrUpdate(Object... entities) { tm().transact(() -> ofy().save().entities(entities).now()); + fakeClock.advanceOneMilli(); } /** Deletes {@code entities} from the Datastore. */ @SafeVarargs final void delete(Object... entities) { tm().transact(() -> ofy().delete().entities(entities).now()); + fakeClock.advanceOneMilli(); } /** @@ -109,8 +128,12 @@ class BackupTestStore implements AutoCloseable { builder.build(); } - void saveCommitLog() { - throw new UnsupportedOperationException("Not implemented yet"); + File saveCommitLogs(String commitLogDir) { + CommitLogCheckpoint checkpoint = CommitLogExports.computeCheckpoint(fakeClock); + File commitLogFile = + CommitLogExports.saveCommitLogs(commitLogDir, prevCommitLogCheckpoint, checkpoint); + prevCommitLogCheckpoint = checkpoint; + return commitLogFile; } @Override diff --git a/core/src/test/java/google/registry/beam/initsql/BackupTestStoreTest.java b/core/src/test/java/google/registry/beam/initsql/BackupTestStoreTest.java index f47f83d26..dd310d6f5 100644 --- a/core/src/test/java/google/registry/beam/initsql/BackupTestStoreTest.java +++ b/core/src/test/java/google/registry/beam/initsql/BackupTestStoreTest.java @@ -19,27 +19,36 @@ import static com.google.common.truth.Truth.assertWithMessage; import static com.google.common.truth.Truth8.assertThat; import static google.registry.model.common.EntityGroupRoot.getCrossTldKey; import static google.registry.model.ofy.ObjectifyService.ofy; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; import static google.registry.testing.DatastoreHelper.newContactResource; import static google.registry.testing.DatastoreHelper.newDomainBase; import static google.registry.testing.DatastoreHelper.newRegistry; import com.google.appengine.api.datastore.Entity; import com.google.appengine.api.datastore.EntityTranslator; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Streams; import com.google.storage.onestore.v3.OnestoreEntity.EntityProto; import com.googlecode.objectify.Key; +import google.registry.backup.CommitLogImports; +import google.registry.backup.VersionedEntity; import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DesignatedContact; import google.registry.model.domain.DomainBase; +import google.registry.model.ofy.Ofy; import google.registry.model.registry.Registry; +import google.registry.persistence.VKey; import google.registry.testing.FakeClock; +import google.registry.testing.InjectRule; import google.registry.tools.LevelDbLogReader; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Stream; @@ -47,6 +56,7 @@ import org.joda.time.DateTime; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; /** Unit tests for {@link BackupTestStore}. */ @@ -56,17 +66,25 @@ public class BackupTestStoreTest { private FakeClock fakeClock; private BackupTestStore store; + private Registry registry; + private ContactResource contact; + private DomainBase domain; + @TempDir File tempDir; + @RegisterExtension InjectRule injectRule = new InjectRule(); + @BeforeEach void beforeEach() throws Exception { fakeClock = new FakeClock(START_TIME); store = new BackupTestStore(fakeClock); + injectRule.setStaticField(Ofy.class, "clock", fakeClock); - store.insertOrUpdate(newRegistry("tld1", "TLD1")); - ContactResource contact1 = newContactResource("contact_1"); - DomainBase domain1 = newDomainBase("domain1.tld1", contact1); - store.insertOrUpdate(contact1, domain1); + registry = newRegistry("tld1", "TLD1"); + store.insertOrUpdate(registry); + contact = newContactResource("contact_1"); + domain = newDomainBase("domain1.tld1", contact); + store.insertOrUpdate(contact, domain); } @AfterEach @@ -77,7 +95,8 @@ public class BackupTestStoreTest { @Test void export_filesCreated() throws IOException { String exportRootPath = tempDir.getAbsolutePath(); - File exportFolder = new File(exportRootPath, "2000-01-01T00:00:00_000"); + assertThat(fakeClock.nowUtc().toString()).isEqualTo("2000-01-01T00:00:00.002Z"); + File exportFolder = new File(exportRootPath, "2000-01-01T00:00:00_002"); assertWithMessage("Directory %s should not exist.", exportFolder.getAbsoluteFile()) .that(exportFolder.exists()) .isFalse(); @@ -100,7 +119,7 @@ public class BackupTestStoreTest { void export_folderNameChangesWithTime() throws IOException { String exportRootPath = tempDir.getAbsolutePath(); fakeClock.advanceOneMilli(); - File exportFolder = new File(exportRootPath, "2000-01-01T00:00:00_001"); + File exportFolder = new File(exportRootPath, "2000-01-01T00:00:00_003"); assertWithMessage("Directory %s should not exist.", exportFolder.getAbsoluteFile()) .that(exportFolder.exists()) .isFalse(); @@ -147,6 +166,69 @@ public class BackupTestStoreTest { assertThat(tlds).containsExactly("tld2"); } + @Test + void saveCommitLogs_fileCreated() { + File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath()); + assertThat(commitLogFile.exists()).isTrue(); + assertThat(commitLogFile.getName()).isEqualTo("commit_diff_until_2000-01-01T00:00:00.002Z"); + } + + @Test + void saveCommitLogs_inserts() { + File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath()); + assertThat(commitLogFile.exists()).isTrue(); + ImmutableList mutations = CommitLogImports.loadEntities(commitLogFile); + assertThat(mutations.stream().map(VersionedEntity::getEntity).map(Optional::get)) + .containsExactlyElementsIn(toDatastoreEntities(registry, contact, domain)); + // Registry created at -2, contract and domain created at -1. + assertThat(mutations.stream().map(VersionedEntity::commitTimeMills)) + .containsExactly( + fakeClock.nowUtc().getMillis() - 2, + fakeClock.nowUtc().getMillis() - 1, + fakeClock.nowUtc().getMillis() - 1); + } + + @Test + void saveCommitLogs_deletes() { + fakeClock.advanceOneMilli(); + store.saveCommitLogs(tempDir.getAbsolutePath()); + ContactResource newContact = newContactResource("contact2"); + VKey vKey = newContact.createVKey(); + domain = + domain + .asBuilder() + .setRegistrant(vKey) + .setContacts( + ImmutableSet.of( + DesignatedContact.create(DesignatedContact.Type.ADMIN, vKey), + DesignatedContact.create(DesignatedContact.Type.TECH, vKey))) + .build(); + store.insertOrUpdate(domain, newContact); + store.delete(contact); + fakeClock.advanceOneMilli(); + File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath()); + ImmutableList mutations = CommitLogImports.loadEntities(commitLogFile); + assertThat(mutations.stream().filter(VersionedEntity::isDelete).map(VersionedEntity::key)) + .containsExactly(Key.create(contact).getRaw()); + + assertThat( + mutations.stream() + .filter(Predicates.not(VersionedEntity::isDelete)) + .map(VersionedEntity::getEntity) + .map(Optional::get)) + .containsExactlyElementsIn(toDatastoreEntities(domain, newContact)); + } + + @Test + void saveCommitLogs_empty() { + fakeClock.advanceOneMilli(); + store.saveCommitLogs(tempDir.getAbsolutePath()); + fakeClock.advanceOneMilli(); + File commitLogFile = store.saveCommitLogs(tempDir.getAbsolutePath()); + assertThat(commitLogFile.exists()).isTrue(); + assertThat(CommitLogImports.loadEntities(commitLogFile)).isEmpty(); + } + private File export(String exportRootPath, Set> excludes) throws IOException { return store.export( exportRootPath, @@ -168,4 +250,13 @@ public class BackupTestStoreTest { Entity entity = EntityTranslator.createFromPb(proto); return ofyEntityType.cast(ofy().load().fromEntity(entity)); } + + @SafeVarargs + private static ImmutableList toDatastoreEntities(Object... ofyEntities) { + return tm().transact( + () -> + Stream.of(ofyEntities) + .map(oe -> ofy().save().toEntity(oe)) + .collect(ImmutableList.toImmutableList())); + } } diff --git a/core/src/test/java/google/registry/beam/initsql/CommitLogTransformsTest.java b/core/src/test/java/google/registry/beam/initsql/CommitLogTransformsTest.java new file mode 100644 index 000000000..d4240cd63 --- /dev/null +++ b/core/src/test/java/google/registry/beam/initsql/CommitLogTransformsTest.java @@ -0,0 +1,223 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.beam.initsql; + +import static google.registry.model.ofy.ObjectifyService.ofy; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; +import static google.registry.testing.DatastoreHelper.newContactResource; +import static google.registry.testing.DatastoreHelper.newDomainBase; +import static google.registry.testing.DatastoreHelper.newRegistry; + +import com.google.appengine.api.datastore.Entity; +import com.google.common.collect.ImmutableList; +import google.registry.backup.VersionedEntity; +import google.registry.model.contact.ContactResource; +import google.registry.model.domain.DomainBase; +import google.registry.model.ofy.Ofy; +import google.registry.model.registry.Registry; +import google.registry.testing.FakeClock; +import google.registry.testing.InjectRule; +import java.io.File; +import java.io.Serializable; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.fs.MatchResult.Metadata; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.DateTime; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link CommitLogTransforms}. */ +// TODO(weiminyu): Upgrade to JUnit5 when TestPipeline is upgraded. It is also easy to adapt with +// a wrapper. +@RunWith(JUnit4.class) +public class CommitLogTransformsTest implements Serializable { + private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z"); + + @Rule public final transient TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule public final transient InjectRule injectRule = new InjectRule(); + + @Rule + public final transient TestPipeline pipeline = + TestPipeline.create().enableAbandonedNodeEnforcement(true); + + private FakeClock fakeClock; + private transient BackupTestStore store; + private File commitLogsDir; + private File firstCommitLogFile; + // Canned data that are persisted to Datastore, used by assertions in tests. + // TODO(weiminyu): use Ofy entity pojos directly. + private transient ImmutableList persistedEntities; + + @Before + public void beforeEach() throws Exception { + fakeClock = new FakeClock(START_TIME); + store = new BackupTestStore(fakeClock); + injectRule.setStaticField(Ofy.class, "clock", fakeClock); + + Registry registry = newRegistry("tld1", "TLD1"); + store.insertOrUpdate(registry); + ContactResource contact1 = newContactResource("contact_1"); + DomainBase domain1 = newDomainBase("domain1.tld1", contact1); + store.insertOrUpdate(contact1, domain1); + persistedEntities = + ImmutableList.of(registry, contact1, domain1).stream() + .map(ofyEntity -> tm().transact(() -> ofy().save().toEntity(ofyEntity))) + .collect(ImmutableList.toImmutableList()); + commitLogsDir = temporaryFolder.newFolder(); + firstCommitLogFile = store.saveCommitLogs(commitLogsDir.getAbsolutePath()); + } + + @After + public void afterEach() throws Exception { + if (store != null) { + store.close(); + store = null; + } + } + + @Test + @Category(NeedsRunner.class) + public void getCommitLogFilePatterns() { + PCollection patterns = + pipeline.apply( + "Get CommitLog file patterns", + CommitLogTransforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath())); + + ImmutableList expectedPatterns = + ImmutableList.of(commitLogsDir.getAbsolutePath() + "/commit_diff_until_*"); + + PAssert.that(patterns).containsInAnyOrder(expectedPatterns); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void getFilesByPatterns() { + PCollection fileMetas = + pipeline + .apply( + "File patterns to metadata", + Create.of(commitLogsDir.getAbsolutePath() + "/commit_diff_until_*") + .withCoder(StringUtf8Coder.of())) + .apply(Transforms.getFilesByPatterns()); + + // Transform fileMetas to file names for assertions. + PCollection fileNames = + fileMetas.apply( + "File metadata to path string", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element Metadata metadata, OutputReceiver out) { + out.output(metadata.resourceId().toString()); + } + })); + + ImmutableList expectedFilenames = + ImmutableList.of(firstCommitLogFile.getAbsolutePath()); + + PAssert.that(fileNames).containsInAnyOrder(expectedFilenames); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void filterCommitLogsByTime() { + ImmutableList commitLogFilenames = + ImmutableList.of( + "/commit_diff_until_2000-01-01T00:00:00.000Z", + "/commit_diff_until_2000-01-01T00:00:00.001Z", + "/commit_diff_until_2000-01-01T00:00:00.002Z", + "/commit_diff_until_2000-01-01T00:00:00.003Z", + "/commit_diff_until_2000-01-01T00:00:00.004Z"); + PCollection filteredFilenames = + pipeline + .apply( + "Generate All Filenames", + Create.of(commitLogFilenames).withCoder(StringUtf8Coder.of())) + .apply( + "Filtered by Time", + CommitLogTransforms.filterCommitLogsByTime( + DateTime.parse("2000-01-01T00:00:00.001Z"), + DateTime.parse("2000-01-01T00:00:00.003Z"))); + PAssert.that(filteredFilenames) + .containsInAnyOrder( + "/commit_diff_until_2000-01-01T00:00:00.001Z", + "/commit_diff_until_2000-01-01T00:00:00.002Z"); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void loadOneCommitLogFile() { + PCollection entities = + pipeline + .apply( + "Get CommitLog file patterns", + CommitLogTransforms.getCommitLogFilePatterns(commitLogsDir.getAbsolutePath())) + .apply("Find CommitLogs", Transforms.getFilesByPatterns()) + .apply(CommitLogTransforms.loadCommitLogsFromFiles()); + + PCollection timestamps = + entities.apply( + "Extract commitTimeMillis", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element VersionedEntity entity, OutputReceiver out) { + out.output(entity.commitTimeMills()); + } + })); + PAssert.that(timestamps) + .containsInAnyOrder( + fakeClock.nowUtc().getMillis() - 2, + fakeClock.nowUtc().getMillis() - 1, + fakeClock.nowUtc().getMillis() - 1); + + PCollection datastoreEntities = + entities.apply( + "To Datastore Entities", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement( + @Element VersionedEntity entity, OutputReceiver out) { + entity.getEntity().ifPresent(out::output); + } + })); + + PAssert.that(datastoreEntities).containsInAnyOrder(persistedEntities); + + pipeline.run(); + } +} diff --git a/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java b/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java index 2650e1c40..a08f69d49 100644 --- a/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java +++ b/core/src/test/java/google/registry/beam/initsql/ExportloadingTransformsTest.java @@ -21,14 +21,15 @@ import static google.registry.testing.DatastoreHelper.newDomainBase; import static google.registry.testing.DatastoreHelper.newRegistry; import com.google.appengine.api.datastore.Entity; -import com.google.appengine.api.datastore.EntityTranslator; import com.google.common.collect.ImmutableList; -import com.google.storage.onestore.v3.OnestoreEntity.EntityProto; import com.googlecode.objectify.Key; +import google.registry.backup.VersionedEntity; import google.registry.model.contact.ContactResource; import google.registry.model.domain.DomainBase; +import google.registry.model.ofy.Ofy; import google.registry.model.registry.Registry; import google.registry.testing.FakeClock; +import google.registry.testing.InjectRule; import java.io.File; import java.io.Serializable; import java.util.Collections; @@ -40,8 +41,8 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.joda.time.DateTime; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -60,6 +61,8 @@ import org.junit.runners.JUnit4; // a wrapper. @RunWith(JUnit4.class) public class ExportloadingTransformsTest implements Serializable { + private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z"); + private static final ImmutableList> ALL_KINDS = ImmutableList.of(Registry.class, ContactResource.class, DomainBase.class); private static final ImmutableList ALL_KIND_STRS = @@ -67,6 +70,8 @@ public class ExportloadingTransformsTest implements Serializable { @Rule public final transient TemporaryFolder exportRootDir = new TemporaryFolder(); + @Rule public final transient InjectRule injectRule = new InjectRule(); + @Rule public final transient TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(true); @@ -80,8 +85,9 @@ public class ExportloadingTransformsTest implements Serializable { @Before public void beforeEach() throws Exception { - fakeClock = new FakeClock(); + fakeClock = new FakeClock(START_TIME); store = new BackupTestStore(fakeClock); + injectRule.setStaticField(Ofy.class, "clock", fakeClock); Registry registry = newRegistry("tld1", "TLD1"); store.insertOrUpdate(registry); @@ -107,7 +113,7 @@ public class ExportloadingTransformsTest implements Serializable { @Test @Category(NeedsRunner.class) - public void getBackupDataFilePatterns() { + public void getExportFilePatterns() { PCollection patterns = pipeline.apply( "Get Datastore file patterns", @@ -138,7 +144,7 @@ public class ExportloadingTransformsTest implements Serializable { exportDir.getAbsolutePath() + "/all_namespaces/kind_ContactResource/input-*") .withCoder(StringUtf8Coder.of())) - .apply(ExportLoadingTransforms.getFilesByPatterns()); + .apply(Transforms.getFilesByPatterns()); // Transform fileMetas to file names for assertions. PCollection fileNames = @@ -166,25 +172,25 @@ public class ExportloadingTransformsTest implements Serializable { @Test public void loadDataFromFiles() { - PCollection> taggedRecords = + PCollection taggedRecords = pipeline .apply( "Get Datastore file patterns", ExportLoadingTransforms.getDatastoreExportFilePatterns( exportDir.getAbsolutePath(), ALL_KIND_STRS)) - .apply("Find Datastore files", ExportLoadingTransforms.getFilesByPatterns()) - .apply("Load from Datastore files", ExportLoadingTransforms.loadDataFromFiles()); + .apply("Find Datastore files", Transforms.getFilesByPatterns()) + .apply("Load from Datastore files", ExportLoadingTransforms.loadExportDataFromFiles()); // Transform bytes to pojo for analysis PCollection entities = taggedRecords.apply( "Raw records to Entity", ParDo.of( - new DoFn, Entity>() { + new DoFn() { @ProcessElement public void processElement( - @Element KV kv, OutputReceiver out) { - out.output(parseBytes(kv.getValue())); + @Element VersionedEntity versionedEntity, OutputReceiver out) { + out.output(versionedEntity.getEntity().get()); } })); @@ -192,10 +198,4 @@ public class ExportloadingTransformsTest implements Serializable { pipeline.run(); } - - private static Entity parseBytes(byte[] record) { - EntityProto proto = new EntityProto(); - proto.parseFrom(record); - return EntityTranslator.createFromPb(proto); - } }