diff --git a/core/src/main/java/google/registry/backup/CommitLogImports.java b/core/src/main/java/google/registry/backup/CommitLogImports.java
index 7c37149f8..da7377f08 100644
--- a/core/src/main/java/google/registry/backup/CommitLogImports.java
+++ b/core/src/main/java/google/registry/backup/CommitLogImports.java
@@ -15,10 +15,10 @@
package google.registry.backup;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableList.toImmutableList;
import static google.registry.backup.BackupUtils.createDeserializingIterator;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Streams;
import google.registry.model.ImmutableObject;
import google.registry.model.ofy.CommitLogCheckpoint;
import google.registry.model.ofy.CommitLogManifest;
@@ -31,7 +31,6 @@ import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Iterator;
-import java.util.stream.Stream;
/**
* Helpers for reading CommitLog records from a file.
@@ -43,6 +42,56 @@ public final class CommitLogImports {
private CommitLogImports() {}
+ /**
+ * Returns entities in an {@code inputStream} (from a single CommitLog file) as an {@link
+ * ImmutableList} of {@link ImmutableList}s of {@link VersionedEntity} records where the inner
+ * lists each consist of one transaction. Upon completion the {@code inputStream} is closed.
+ *
+ *
The returned list may be empty, since CommitLogs are written at fixed intervals regardless
+ * if actual changes exist. Each sublist, however, will not be empty.
+ *
+ *
A CommitLog file starts with a {@link CommitLogCheckpoint}, followed by (repeated)
+ * subsequences of [{@link CommitLogManifest}, [{@link CommitLogMutation}] ...]. Each subsequence
+ * represents the changes in one transaction. The {@code CommitLogManifest} contains deleted
+ * entity keys, whereas each {@code CommitLogMutation} contains one whole entity.
+ */
+ public static ImmutableList> loadEntitiesByTransaction(
+ InputStream inputStream) {
+ try (AppEngineEnvironment appEngineEnvironment = new AppEngineEnvironment();
+ InputStream input = new BufferedInputStream(inputStream)) {
+ Iterator commitLogs = createDeserializingIterator(input);
+ checkState(commitLogs.hasNext());
+ checkState(commitLogs.next() instanceof CommitLogCheckpoint);
+
+ ImmutableList.Builder> resultBuilder =
+ new ImmutableList.Builder<>();
+ ImmutableList.Builder currentTransactionBuilder =
+ new ImmutableList.Builder<>();
+
+ while (commitLogs.hasNext()) {
+ ImmutableObject currentObject = commitLogs.next();
+ if (currentObject instanceof CommitLogManifest) {
+ // CommitLogManifest means we are starting a new transaction
+ addIfNonempty(resultBuilder, currentTransactionBuilder);
+ currentTransactionBuilder = new ImmutableList.Builder<>();
+ VersionedEntity.fromManifest((CommitLogManifest) currentObject)
+ .forEach(currentTransactionBuilder::add);
+ } else if (currentObject instanceof CommitLogMutation) {
+ currentTransactionBuilder.add(
+ VersionedEntity.fromMutation((CommitLogMutation) currentObject));
+ } else {
+ throw new IllegalStateException(
+ String.format("Unknown entity type %s in commit logs", currentObject.getClass()));
+ }
+ }
+ // Add the last transaction in (if it's not empty)
+ addIfNonempty(resultBuilder, currentTransactionBuilder);
+ return resultBuilder.build();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Returns entities in an {@code inputStream} (from a single CommitLog file) as an {@link
* ImmutableList} of {@link VersionedEntity} records. Upon completion the {@code inputStream} is
@@ -57,23 +106,9 @@ public final class CommitLogImports {
* entity keys, whereas each {@code CommitLogMutation} contains one whole entity.
*/
public static ImmutableList loadEntities(InputStream inputStream) {
- try (AppEngineEnvironment appEngineEnvironment = new AppEngineEnvironment();
- InputStream input = new BufferedInputStream(inputStream)) {
- Iterator commitLogs = createDeserializingIterator(input);
- checkState(commitLogs.hasNext());
- checkState(commitLogs.next() instanceof CommitLogCheckpoint);
-
- return Streams.stream(commitLogs)
- .map(
- e ->
- e instanceof CommitLogManifest
- ? VersionedEntity.fromManifest((CommitLogManifest) e)
- : Stream.of(VersionedEntity.fromMutation((CommitLogMutation) e)))
- .flatMap(s -> s)
- .collect(ImmutableList.toImmutableList());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ return loadEntitiesByTransaction(inputStream).stream()
+ .flatMap(ImmutableList::stream)
+ .collect(toImmutableList());
}
/** Covenience method that adapts {@link #loadEntities(InputStream)} to a {@link File}. */
@@ -92,4 +127,13 @@ public final class CommitLogImports {
public static ImmutableList loadEntities(ReadableByteChannel channel) {
return loadEntities(Channels.newInputStream(channel));
}
+
+ private static void addIfNonempty(
+ ImmutableList.Builder> resultBuilder,
+ ImmutableList.Builder currentTransactionBuilder) {
+ ImmutableList currentTransaction = currentTransactionBuilder.build();
+ if (!currentTransaction.isEmpty()) {
+ resultBuilder.add(currentTransaction);
+ }
+ }
}
diff --git a/core/src/main/java/google/registry/backup/GcsDiffFileLister.java b/core/src/main/java/google/registry/backup/GcsDiffFileLister.java
index fdd9b5180..56e02fc8c 100644
--- a/core/src/main/java/google/registry/backup/GcsDiffFileLister.java
+++ b/core/src/main/java/google/registry/backup/GcsDiffFileLister.java
@@ -122,7 +122,7 @@ class GcsDiffFileLister {
// Reconstruct the sequence of files by traversing backwards from "lastUpperBoundTime" (i.e. the
// last file that we found) and finding its previous file until we either run out of files or
- // get to one that preceeds "fromTime".
+ // get to one that precedes "fromTime".
//
// GCS file listing is eventually consistent, so it's possible that we are missing a file. The
// metadata of a file is sufficient to identify the preceding file, so if we start from the
diff --git a/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java b/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java
new file mode 100644
index 000000000..389acffef
--- /dev/null
+++ b/core/src/main/java/google/registry/backup/ReplayCommitLogsToSqlAction.java
@@ -0,0 +1,188 @@
+// Copyright 2020 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.backup;
+
+import static google.registry.backup.ExportCommitLogDiffAction.DIFF_FILE_PREFIX;
+import static google.registry.model.ofy.EntityWritePriorities.getEntityPriority;
+import static google.registry.model.ofy.ObjectifyService.ofy;
+import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
+import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
+import static org.joda.time.Duration.standardHours;
+
+import com.google.appengine.api.datastore.Entity;
+import com.google.appengine.api.datastore.Key;
+import com.google.appengine.tools.cloudstorage.GcsFileMetadata;
+import com.google.appengine.tools.cloudstorage.GcsService;
+import com.google.common.collect.ImmutableList;
+import com.google.common.flogger.FluentLogger;
+import google.registry.config.RegistryConfig;
+import google.registry.model.server.Lock;
+import google.registry.model.translators.VKeyTranslatorFactory;
+import google.registry.persistence.VKey;
+import google.registry.request.Action;
+import google.registry.request.Response;
+import google.registry.request.auth.Auth;
+import google.registry.schema.replay.DatastoreEntity;
+import google.registry.schema.replay.DatastoreOnlyEntity;
+import google.registry.schema.replay.NonReplicatedEntity;
+import google.registry.schema.replay.SqlReplayCheckpoint;
+import google.registry.util.RequestStatusChecker;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channels;
+import java.util.Optional;
+import javax.inject.Inject;
+import javax.servlet.http.HttpServletResponse;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+
+/** Action that replays commit logs to Cloud SQL to keep it up to date. */
+@Action(
+ service = Action.Service.BACKEND,
+ path = ReplayCommitLogsToSqlAction.PATH,
+ method = Action.Method.POST,
+ automaticallyPrintOk = true,
+ auth = Auth.AUTH_INTERNAL_OR_ADMIN)
+public class ReplayCommitLogsToSqlAction implements Runnable {
+
+ static final String PATH = "/_dr/task/replayCommitLogsToSql";
+
+ private static final int BLOCK_SIZE =
+ 1024 * 1024; // Buffer 1mb at a time, for no particular reason.
+ private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private static final Duration LEASE_LENGTH = standardHours(1);
+
+ @Inject GcsService gcsService;
+ @Inject Response response;
+ @Inject RequestStatusChecker requestStatusChecker;
+ @Inject GcsDiffFileLister diffLister;
+
+ @Inject
+ ReplayCommitLogsToSqlAction() {}
+
+ @Override
+ public void run() {
+ if (!RegistryConfig.getCloudSqlReplayCommitLogs()) {
+ String message = "ReplayCommitLogsToSqlAction was called but disabled in the config.";
+ logger.atWarning().log(message);
+ // App Engine will retry on any non-2xx status code, which we don't want in this case.
+ response.setStatus(SC_NO_CONTENT);
+ response.setPayload(message);
+ return;
+ }
+ Optional lock =
+ Lock.acquire(
+ this.getClass().getSimpleName(), null, LEASE_LENGTH, requestStatusChecker, false);
+ if (lock.isEmpty()) {
+ String message = "Can't acquire SQL commit log replay lock, aborting.";
+ logger.atSevere().log(message);
+ // App Engine will retry on any non-2xx status code, which we don't want in this case.
+ // Let the next run after the next export happen naturally.
+ response.setStatus(SC_NO_CONTENT);
+ response.setPayload(message);
+ return;
+ }
+ try {
+ replayFiles();
+ response.setStatus(HttpServletResponse.SC_OK);
+ logger.atInfo().log("ReplayCommitLogsToSqlAction completed successfully.");
+ } finally {
+ lock.ifPresent(Lock::release);
+ }
+ }
+
+ private void replayFiles() {
+ // Start at the first millisecond we haven't seen yet
+ DateTime fromTime = jpaTm().transact(() -> SqlReplayCheckpoint.get().plusMillis(1));
+ // If there's an inconsistent file set, this will throw IllegalStateException and the job
+ // will try later -- this is likely because an export hasn't finished yet.
+ ImmutableList commitLogFiles =
+ diffLister.listDiffFiles(fromTime, /* current time */ null);
+ for (GcsFileMetadata metadata : commitLogFiles) {
+ // One transaction per GCS file
+ jpaTm().transact(() -> processFile(metadata));
+ }
+ logger.atInfo().log("Replayed %d commit log files to SQL successfully.", commitLogFiles.size());
+ }
+
+ private void processFile(GcsFileMetadata metadata) {
+ try (InputStream input =
+ Channels.newInputStream(
+ gcsService.openPrefetchingReadChannel(metadata.getFilename(), 0, BLOCK_SIZE))) {
+ // Load and process the Datastore transactions one at a time
+ ImmutableList> allTransactions =
+ CommitLogImports.loadEntitiesByTransaction(input);
+ allTransactions.forEach(this::replayTransaction);
+ // if we succeeded, set the last-seen time
+ DateTime checkpoint =
+ DateTime.parse(
+ metadata.getFilename().getObjectName().substring(DIFF_FILE_PREFIX.length()));
+ SqlReplayCheckpoint.set(checkpoint);
+ logger.atInfo().log("Replayed %d transactions from commit log file.", allTransactions.size());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void replayTransaction(ImmutableList transaction) {
+ transaction.stream()
+ .sorted(ReplayCommitLogsToSqlAction::compareByWeight)
+ .forEach(
+ versionedEntity ->
+ versionedEntity
+ .getEntity()
+ .ifPresentOrElse(
+ this::handleEntityPut, () -> handleEntityDelete(versionedEntity)));
+ }
+
+ private void handleEntityPut(Entity entity) {
+ Object ofyPojo = ofy().toPojo(entity);
+ if (ofyPojo instanceof DatastoreEntity) {
+ DatastoreEntity datastoreEntity = (DatastoreEntity) ofyPojo;
+ datastoreEntity.toSqlEntity().ifPresent(jpaTm()::put);
+ } else {
+ // this should never happen, but we shouldn't fail on it
+ logger.atSevere().log(
+ "%s does not implement DatastoreEntity, which is necessary for SQL replay.",
+ ofyPojo.getClass());
+ }
+ }
+
+ private void handleEntityDelete(VersionedEntity entityToDelete) {
+ Key key = entityToDelete.key();
+ VKey> entityVKey;
+ try {
+ entityVKey = VKeyTranslatorFactory.createVKey(key);
+ } catch (RuntimeException e) {
+ // This means that the key wasn't convertible to VKey through the standard methods or via
+ // a createVKey method. This means that the object isn't persisted in SQL so we ignore it.
+ logger.atInfo().log(
+ "Skipping SQL delete for kind %s since it is not convertible.", key.getKind());
+ return;
+ }
+ Class> entityClass = entityVKey.getKind();
+ // Delete the key iff the class represents a JPA entity that is replicated
+ if (!NonReplicatedEntity.class.isAssignableFrom(entityClass)
+ && !DatastoreOnlyEntity.class.isAssignableFrom(entityClass)
+ && entityClass.getAnnotation(javax.persistence.Entity.class) != null) {
+ jpaTm().delete(entityVKey);
+ }
+ }
+
+ private static int compareByWeight(VersionedEntity a, VersionedEntity b) {
+ return getEntityPriority(a.key().getKind(), a.getEntity().isEmpty())
+ - getEntityPriority(b.key().getKind(), b.getEntity().isEmpty());
+ }
+}
diff --git a/core/src/main/java/google/registry/config/RegistryConfig.java b/core/src/main/java/google/registry/config/RegistryConfig.java
index b9749f715..67cc5a17f 100644
--- a/core/src/main/java/google/registry/config/RegistryConfig.java
+++ b/core/src/main/java/google/registry/config/RegistryConfig.java
@@ -1592,6 +1592,22 @@ public final class RegistryConfig {
CONFIG_SETTINGS.get().cloudSql.replicateTransactions = replicateTransactions;
}
+ /**
+ * Returns whether or not to replay commit logs to the SQL database after export to GCS.
+ *
+ *
If true, we will trigger the {@link google.registry.backup.ReplayCommitLogsToSqlAction}
+ * after the {@link google.registry.backup.ExportCommitLogDiffAction} to load the commit logs and
+ * replay them to SQL.
+ */
+ public static boolean getCloudSqlReplayCommitLogs() {
+ return CONFIG_SETTINGS.get().cloudSql.replayCommitLogs;
+ }
+
+ @VisibleForTesting
+ public static void overrideCloudSqlReplayCommitLogs(boolean replayCommitLogs) {
+ CONFIG_SETTINGS.get().cloudSql.replayCommitLogs = replayCommitLogs;
+ }
+
/** Returns the roid suffix to be used for the roids of all contacts and hosts. */
public static String getContactAndHostRoidSuffix() {
return CONFIG_SETTINGS.get().registryPolicy.contactAndHostRoidSuffix;
diff --git a/core/src/main/java/google/registry/config/RegistryConfigSettings.java b/core/src/main/java/google/registry/config/RegistryConfigSettings.java
index 47db8bfb1..68afb830e 100644
--- a/core/src/main/java/google/registry/config/RegistryConfigSettings.java
+++ b/core/src/main/java/google/registry/config/RegistryConfigSettings.java
@@ -126,6 +126,7 @@ public class RegistryConfigSettings {
public String username;
public String instanceConnectionName;
public boolean replicateTransactions;
+ public boolean replayCommitLogs;
}
/** Configuration for Apache Beam (Cloud Dataflow). */
diff --git a/core/src/main/java/google/registry/config/files/default-config.yaml b/core/src/main/java/google/registry/config/files/default-config.yaml
index ba8d0f951..149043cc0 100644
--- a/core/src/main/java/google/registry/config/files/default-config.yaml
+++ b/core/src/main/java/google/registry/config/files/default-config.yaml
@@ -233,6 +233,8 @@ cloudSql:
# Set this to true to replicate cloud SQL transactions to datastore in the
# background.
replicateTransactions: false
+ # Set this to true to enable replay of commit logs to SQL
+ replayCommitLogs: false
cloudDns:
# Set both properties to null in Production.
diff --git a/core/src/main/java/google/registry/env/common/default/WEB-INF/queue.xml b/core/src/main/java/google/registry/env/common/default/WEB-INF/queue.xml
index 7b2cf9afb..4c6c631b5 100644
--- a/core/src/main/java/google/registry/env/common/default/WEB-INF/queue.xml
+++ b/core/src/main/java/google/registry/env/common/default/WEB-INF/queue.xml
@@ -208,6 +208,12 @@
5
+
+
+ replay-commit-logs-to-sql
+ 1/s
+
+
diff --git a/core/src/main/java/google/registry/model/ofy/EntityWritePriorities.java b/core/src/main/java/google/registry/model/ofy/EntityWritePriorities.java
new file mode 100644
index 000000000..6a86ba151
--- /dev/null
+++ b/core/src/main/java/google/registry/model/ofy/EntityWritePriorities.java
@@ -0,0 +1,60 @@
+// Copyright 2020 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.model.ofy;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Contains the mapping from class names to SQL-replay-write priorities.
+ *
+ *
When replaying Datastore commit logs to SQL (asynchronous replication), in order to avoid
+ * issues with foreign keys, we should replay entity writes so that foreign key references are
+ * always written after the entity that they reference. This class represents that DAG, where lower
+ * values represent an earlier write (and later delete). Higher-valued classes can have foreign keys
+ * on lower-valued classes, but not vice versa.
+ */
+public class EntityWritePriorities {
+
+ /**
+ * Mapping from class name to "priority".
+ *
+ *
Here, "priority" means the order in which the class should be inserted / updated in a
+ * transaction with respect to instances of other classes. By default, all classes have a priority
+ * number of zero.
+ *
+ *
For each transaction, classes should be written in priority order from the lowest number to
+ * the highest, in order to maintain foreign-key write consistency. For the same reason, deletes
+ * should happen after all writes.
+ */
+ static final ImmutableMap CLASS_PRIORITIES =
+ ImmutableMap.of(
+ "HistoryEntry", -10,
+ "AllocationToken", -9,
+ "ContactResource", 5,
+ "DomainBase", 10);
+
+ // The beginning of the range of priority numbers reserved for delete. This must be greater than
+ // any of the values in CLASS_PRIORITIES by enough overhead to accommodate any negative values in
+ // it. Note: by design, deletions will happen in the opposite order of insertions, which is
+ // necessary to make sure foreign keys aren't violated during deletion.
+ @VisibleForTesting static final int DELETE_RANGE = Integer.MAX_VALUE / 2;
+
+ /** Returns the priority of the entity type in the map entry. */
+ public static int getEntityPriority(String kind, boolean isDelete) {
+ int priority = CLASS_PRIORITIES.getOrDefault(kind, 0);
+ return isDelete ? DELETE_RANGE - priority : priority;
+ }
+}
diff --git a/core/src/main/java/google/registry/model/ofy/TransactionInfo.java b/core/src/main/java/google/registry/model/ofy/TransactionInfo.java
index ec81810a9..54ed4559d 100644
--- a/core/src/main/java/google/registry/model/ofy/TransactionInfo.java
+++ b/core/src/main/java/google/registry/model/ofy/TransactionInfo.java
@@ -20,6 +20,7 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Maps.filterValues;
import static com.google.common.collect.Maps.toMap;
import static google.registry.model.ofy.CommitLogBucket.getArbitraryBucketId;
+import static google.registry.model.ofy.EntityWritePriorities.getEntityPriority;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
@@ -60,7 +61,7 @@ class TransactionInfo {
TransactionInfo(DateTime now) {
this.transactionTime = now;
- ofy().load().key(bucketKey); // Asynchronously load value into session cache.
+ ofy().load().key(bucketKey); // Asynchronously load value into session cache.
}
TransactionInfo setReadOnly() {
@@ -100,23 +101,10 @@ class TransactionInfo {
.collect(toImmutableSet());
}
- // Mapping from class name to "weight" (which in this case is the order in which the class must
- // be "put" in a transaction with respect to instances of other classes). Lower weight classes
- // are put first, by default all classes have a weight of zero.
- static final ImmutableMap CLASS_WEIGHTS =
- ImmutableMap.of(
- "HistoryEntry", -1,
- "DomainBase", 1);
-
- // The beginning of the range of weights reserved for delete. This must be greater than any of
- // the values in CLASS_WEIGHTS by enough overhead to accomodate any negative values in it.
- @VisibleForTesting static final int DELETE_RANGE = Integer.MAX_VALUE / 2;
-
/** Returns the weight of the entity type in the map entry. */
@VisibleForTesting
static int getWeight(ImmutableMap.Entry, Object> entry) {
- int weight = CLASS_WEIGHTS.getOrDefault(entry.getKey().getKind(), 0);
- return entry.getValue().equals(Delete.SENTINEL) ? DELETE_RANGE - weight : weight;
+ return getEntityPriority(entry.getKey().getKind(), entry.getValue().equals(Delete.SENTINEL));
}
private static int compareByWeight(
diff --git a/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java b/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java
new file mode 100644
index 000000000..48a3b59f5
--- /dev/null
+++ b/core/src/test/java/google/registry/backup/ReplayCommitLogsToSqlActionTest.java
@@ -0,0 +1,458 @@
+// Copyright 2020 The Nomulus Authors. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package google.registry.backup;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+import static google.registry.backup.RestoreCommitLogsActionTest.GCS_BUCKET;
+import static google.registry.backup.RestoreCommitLogsActionTest.createCheckpoint;
+import static google.registry.backup.RestoreCommitLogsActionTest.saveDiffFile;
+import static google.registry.backup.RestoreCommitLogsActionTest.saveDiffFileNotToRestore;
+import static google.registry.model.common.EntityGroupRoot.getCrossTldKey;
+import static google.registry.model.ofy.CommitLogBucket.getBucketKey;
+import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
+import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
+import static google.registry.testing.DatabaseHelper.createTld;
+import static google.registry.testing.DatabaseHelper.newDomainBase;
+import static google.registry.testing.DatabaseHelper.persistActiveContact;
+import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
+import static javax.servlet.http.HttpServletResponse.SC_OK;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.appengine.tools.cloudstorage.GcsService;
+import com.google.appengine.tools.cloudstorage.GcsServiceFactory;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.truth.Truth8;
+import com.googlecode.objectify.Key;
+import google.registry.config.RegistryConfig;
+import google.registry.model.common.Cursor;
+import google.registry.model.common.Cursor.CursorType;
+import google.registry.model.contact.ContactResource;
+import google.registry.model.domain.DomainBase;
+import google.registry.model.domain.GracePeriod;
+import google.registry.model.domain.secdns.DelegationSignerData;
+import google.registry.model.ofy.CommitLogBucket;
+import google.registry.model.ofy.CommitLogManifest;
+import google.registry.model.ofy.CommitLogMutation;
+import google.registry.model.registrar.RegistrarContact;
+import google.registry.model.registry.label.ReservedList;
+import google.registry.model.server.Lock;
+import google.registry.model.tmch.ClaimsListShard;
+import google.registry.model.translators.VKeyTranslatorFactory;
+import google.registry.persistence.VKey;
+import google.registry.persistence.transaction.JpaTransactionManager;
+import google.registry.persistence.transaction.TransactionManagerFactory;
+import google.registry.schema.replay.SqlReplayCheckpoint;
+import google.registry.testing.AppEngineExtension;
+import google.registry.testing.FakeClock;
+import google.registry.testing.FakeResponse;
+import google.registry.testing.TestObject;
+import google.registry.util.RequestStatusChecker;
+import java.io.IOException;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/** Tests for {@link ReplayCommitLogsToSqlAction}. */
+@ExtendWith(MockitoExtension.class)
+public class ReplayCommitLogsToSqlActionTest {
+
+ private final FakeClock fakeClock = new FakeClock(DateTime.parse("2000-01-01TZ"));
+
+ @RegisterExtension
+ public final AppEngineExtension appEngine =
+ AppEngineExtension.builder()
+ .withDatastoreAndCloudSql()
+ .withClock(fakeClock)
+ .withOfyTestEntities(TestObject.class)
+ .withJpaUnitTestEntities(
+ RegistrarContact.class,
+ TestObject.class,
+ SqlReplayCheckpoint.class,
+ ContactResource.class,
+ DomainBase.class,
+ GracePeriod.class,
+ DelegationSignerData.class)
+ .build();
+
+ /** Local GCS service. */
+ private final GcsService gcsService = GcsServiceFactory.createGcsService();
+
+ private final ReplayCommitLogsToSqlAction action = new ReplayCommitLogsToSqlAction();
+ private final FakeResponse response = new FakeResponse();
+ @Mock private RequestStatusChecker requestStatusChecker;
+
+ @BeforeAll
+ static void beforeAll() {
+ VKeyTranslatorFactory.addTestEntityClass(TestObject.class);
+ }
+
+ @BeforeEach
+ void beforeEach() {
+ action.gcsService = gcsService;
+ action.response = response;
+ action.requestStatusChecker = requestStatusChecker;
+ action.diffLister = new GcsDiffFileLister();
+ action.diffLister.gcsService = gcsService;
+ action.diffLister.gcsBucket = GCS_BUCKET;
+ action.diffLister.executor = newDirectExecutorService();
+ RegistryConfig.overrideCloudSqlReplayCommitLogs(true);
+ }
+
+ @Test
+ void testReplay_multipleDiffFiles() throws Exception {
+ jpaTm()
+ .transact(
+ () -> {
+ jpaTm().insertWithoutBackup(TestObject.create("previous to keep"));
+ jpaTm().insertWithoutBackup(TestObject.create("previous to delete"));
+ });
+ DateTime now = fakeClock.nowUtc();
+ // Create 3 transactions, across two diff files.
+ // Before: {"previous to keep", "previous to delete"}
+ // 1a: Add {"a", "b"}, Delete {"previous to delete"}
+ // 1b: Add {"c", "d"}, Delete {"a"}
+ // 2: Add {"e", "f"}, Delete {"c"}
+ // After: {"previous to keep", "b", "d", "e", "f"}
+ Key manifest1aKey =
+ CommitLogManifest.createKey(getBucketKey(1), now.minusMinutes(3));
+ Key manifest1bKey =
+ CommitLogManifest.createKey(getBucketKey(2), now.minusMinutes(2));
+ Key manifest2Key =
+ CommitLogManifest.createKey(getBucketKey(1), now.minusMinutes(1));
+ saveDiffFileNotToRestore(gcsService, now.minusMinutes(2));
+ saveDiffFile(
+ gcsService,
+ createCheckpoint(now.minusMinutes(1)),
+ CommitLogManifest.create(
+ getBucketKey(1),
+ now.minusMinutes(3),
+ ImmutableSet.of(Key.create(TestObject.create("previous to delete")))),
+ CommitLogMutation.create(manifest1aKey, TestObject.create("a")),
+ CommitLogMutation.create(manifest1aKey, TestObject.create("b")),
+ CommitLogManifest.create(
+ getBucketKey(2),
+ now.minusMinutes(2),
+ ImmutableSet.of(Key.create(TestObject.create("a")))),
+ CommitLogMutation.create(manifest1bKey, TestObject.create("c")),
+ CommitLogMutation.create(manifest1bKey, TestObject.create("d")));
+ saveDiffFile(
+ gcsService,
+ createCheckpoint(now),
+ CommitLogManifest.create(
+ getBucketKey(1),
+ now.minusMinutes(1),
+ ImmutableSet.of(Key.create(TestObject.create("c")))),
+ CommitLogMutation.create(manifest2Key, TestObject.create("e")),
+ CommitLogMutation.create(manifest2Key, TestObject.create("f")));
+ jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1)));
+ fakeClock.advanceOneMilli();
+ runAndAssertSuccess(now);
+ assertExpectedIds("previous to keep", "b", "d", "e", "f");
+ }
+
+ @Test
+ void testReplay_noManifests() throws Exception {
+ DateTime now = fakeClock.nowUtc();
+ jpaTm().transact(() -> jpaTm().insertWithoutBackup(TestObject.create("previous to keep")));
+ saveDiffFileNotToRestore(gcsService, now.minusMinutes(1));
+ saveDiffFile(gcsService, createCheckpoint(now.minusMillis(2)));
+ jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMillis(1)));
+ runAndAssertSuccess(now.minusMillis(1));
+ assertExpectedIds("previous to keep");
+ }
+
+ @Test
+ void testReplay_manifestWithNoDeletions() throws Exception {
+ DateTime now = fakeClock.nowUtc();
+ jpaTm().transact(() -> jpaTm().insertWithoutBackup(TestObject.create("previous to keep")));
+ Key bucketKey = getBucketKey(1);
+ Key manifestKey = CommitLogManifest.createKey(bucketKey, now);
+ saveDiffFileNotToRestore(gcsService, now.minusMinutes(2));
+ jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1)));
+ saveDiffFile(
+ gcsService,
+ createCheckpoint(now.minusMinutes(1)),
+ CommitLogManifest.create(bucketKey, now, null),
+ CommitLogMutation.create(manifestKey, TestObject.create("a")),
+ CommitLogMutation.create(manifestKey, TestObject.create("b")));
+ runAndAssertSuccess(now.minusMinutes(1));
+ assertExpectedIds("previous to keep", "a", "b");
+ }
+
+ @Test
+ void testReplay_manifestWithNoMutations() throws Exception {
+ DateTime now = fakeClock.nowUtc();
+ jpaTm()
+ .transact(
+ () -> {
+ jpaTm().insertWithoutBackup(TestObject.create("previous to keep"));
+ jpaTm().insertWithoutBackup(TestObject.create("previous to delete"));
+ });
+ saveDiffFileNotToRestore(gcsService, now.minusMinutes(2));
+ jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1)));
+ saveDiffFile(
+ gcsService,
+ createCheckpoint(now.minusMinutes(1)),
+ CommitLogManifest.create(
+ getBucketKey(1),
+ now,
+ ImmutableSet.of(Key.create(TestObject.create("previous to delete")))));
+ runAndAssertSuccess(now.minusMinutes(1));
+ assertExpectedIds("previous to keep");
+ }
+
+ @Test
+ void wtestReplay_mutateExistingEntity() throws Exception {
+ DateTime now = fakeClock.nowUtc();
+ jpaTm().transact(() -> jpaTm().put(TestObject.create("existing", "a")));
+ Key manifestKey = CommitLogManifest.createKey(getBucketKey(1), now);
+ saveDiffFileNotToRestore(gcsService, now.minusMinutes(1).minusMillis(1));
+ jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1)));
+ saveDiffFile(
+ gcsService,
+ createCheckpoint(now.minusMillis(1)),
+ CommitLogManifest.create(getBucketKey(1), now, null),
+ CommitLogMutation.create(manifestKey, TestObject.create("existing", "b")));
+ action.run();
+ TestObject fromDatabase =
+ jpaTm().transact(() -> jpaTm().load(VKey.createSql(TestObject.class, "existing")));
+ assertThat(fromDatabase.getField()).isEqualTo("b");
+ }
+
+ // This should be harmless
+ @Test
+ void testReplay_deleteMissingEntity() throws Exception {
+ DateTime now = fakeClock.nowUtc();
+ jpaTm().transact(() -> jpaTm().put(TestObject.create("previous to keep", "a")));
+ saveDiffFileNotToRestore(gcsService, now.minusMinutes(1).minusMillis(1));
+ jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1)));
+ saveDiffFile(
+ gcsService,
+ createCheckpoint(now.minusMillis(1)),
+ CommitLogManifest.create(
+ getBucketKey(1),
+ now,
+ ImmutableSet.of(Key.create(TestObject.create("previous to delete")))));
+ action.run();
+ assertExpectedIds("previous to keep");
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ void testReplay_properlyWeighted() throws Exception {
+ DateTime now = fakeClock.nowUtc();
+ Key manifestKey =
+ CommitLogManifest.createKey(getBucketKey(1), now.minusMinutes(1));
+ // Create (but don't save to SQL) a domain + contact
+ createTld("tld");
+ DomainBase domain = newDomainBase("example.tld");
+ CommitLogMutation domainMutation =
+ tm().transact(() -> CommitLogMutation.create(manifestKey, domain));
+ ContactResource contact = tm().transact(() -> tm().load(domain.getRegistrant()));
+ CommitLogMutation contactMutation =
+ tm().transact(() -> CommitLogMutation.create(manifestKey, contact));
+
+ // Create and save to SQL a registrar contact that we will delete
+ RegistrarContact toDelete = AppEngineExtension.makeRegistrarContact1();
+ jpaTm().transact(() -> jpaTm().put(toDelete));
+ jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1)));
+
+ // spy the txn manager so we can see what order things were inserted/removed
+ JpaTransactionManager spy = spy(jpaTm());
+ TransactionManagerFactory.setJpaTm(() -> spy);
+ // Save in the commit logs the domain and contact (in that order) and the token deletion
+ saveDiffFile(
+ gcsService,
+ createCheckpoint(now.minusMinutes(1)),
+ CommitLogManifest.create(
+ getBucketKey(1), now.minusMinutes(1), ImmutableSet.of(Key.create(toDelete))),
+ domainMutation,
+ contactMutation);
+
+ runAndAssertSuccess(now.minusMinutes(1));
+ // Verify two things:
+ // 1. that the contact insert occurred before the domain insert (necessary for FK ordering)
+ // even though the domain came first in the file
+ // 2. that the allocation token delete occurred after the insertions
+ InOrder inOrder = Mockito.inOrder(spy);
+ inOrder.verify(spy).put(any(ContactResource.class));
+ inOrder.verify(spy).put(any(DomainBase.class));
+ inOrder.verify(spy).delete(toDelete.createVKey());
+ inOrder.verify(spy).put(any(SqlReplayCheckpoint.class));
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ void testReplay_properlyWeighted_doesNotApplyCrossTransactions() throws Exception {
+ DateTime now = fakeClock.nowUtc();
+ Key manifestKey =
+ CommitLogManifest.createKey(getBucketKey(1), now.minusMinutes(1));
+
+ // Create and save the standard contact
+ ContactResource contact = persistActiveContact("contact1234");
+ jpaTm().transact(() -> jpaTm().put(contact));
+
+ // Simulate a Datastore transaction with a new version of the contact
+ ContactResource contactWithEdit =
+ contact.asBuilder().setEmailAddress("replay@example.tld").build();
+ CommitLogMutation contactMutation =
+ tm().transact(() -> CommitLogMutation.create(manifestKey, contactWithEdit));
+
+ jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1)));
+
+ // spy the txn manager so we can see what order things were inserted
+ JpaTransactionManager spy = spy(jpaTm());
+ TransactionManagerFactory.setJpaTm(() -> spy);
+ // Save two commits -- the deletion, then the new version of the contact
+ saveDiffFile(
+ gcsService,
+ createCheckpoint(now.minusMinutes(1).plusMillis(1)),
+ CommitLogManifest.create(
+ getBucketKey(1), now.minusMinutes(1), ImmutableSet.of(Key.create(contact))),
+ CommitLogManifest.create(
+ getBucketKey(1), now.minusMinutes(1).plusMillis(1), ImmutableSet.of()),
+ contactMutation);
+ runAndAssertSuccess(now.minusMinutes(1).plusMillis(1));
+ // Verify that the delete occurred first (because it was in the first transaction) even though
+ // deletes have higher weight
+ ArgumentCaptor