Add an action to replay commit logs to SQL (#887)

* Add an action to replay commit logs to SQL

- We import from the commit-log GCS files so that we are sure that we
have a consistent snapshot.
- We use the object weighting (moved to ObjectWeights) to verify that we
are replaying objects in the correct order.
- We add a config setting (default false) of whether or not to replay
the logs.
- The action is triggered after the export if the aforementioned config
setting is on.

* Responses to CR

- Remove triggering of replay from the export action and remove the test
changes
- Add a method to load commit log diffs by transaction
- Replay one Datastore transaction at a time, per SQL transaction
- Minor logging / comment changes
- Change ObjectWeights to EntityWritePriorities and flesh out javadoc

* More CR responses

- Use one transaction per GCS diff file
- Fix up comments minutiae

* Add a class-level javadoc

* Add a log message and some periods

* bit of formatting

* Merge remote-tracking branch 'origin/master' into replayAction

* Handle toSqlEntity rather than toSqlEntities
This commit is contained in:
gbrodman 2020-12-03 16:50:41 -05:00 committed by GitHub
parent 0b82148a5f
commit 008da6c1fe
13 changed files with 897 additions and 112 deletions

View file

@ -15,10 +15,10 @@
package google.registry.backup; package google.registry.backup;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static google.registry.backup.BackupUtils.createDeserializingIterator; import static google.registry.backup.BackupUtils.createDeserializingIterator;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import google.registry.model.ImmutableObject; import google.registry.model.ImmutableObject;
import google.registry.model.ofy.CommitLogCheckpoint; import google.registry.model.ofy.CommitLogCheckpoint;
import google.registry.model.ofy.CommitLogManifest; import google.registry.model.ofy.CommitLogManifest;
@ -31,7 +31,6 @@ import java.io.InputStream;
import java.nio.channels.Channels; import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.util.Iterator; import java.util.Iterator;
import java.util.stream.Stream;
/** /**
* Helpers for reading CommitLog records from a file. * Helpers for reading CommitLog records from a file.
@ -43,6 +42,56 @@ public final class CommitLogImports {
private CommitLogImports() {} private CommitLogImports() {}
/**
* Returns entities in an {@code inputStream} (from a single CommitLog file) as an {@link
* ImmutableList} of {@link ImmutableList}s of {@link VersionedEntity} records where the inner
* lists each consist of one transaction. Upon completion the {@code inputStream} is closed.
*
* <p>The returned list may be empty, since CommitLogs are written at fixed intervals regardless
* if actual changes exist. Each sublist, however, will not be empty.
*
* <p>A CommitLog file starts with a {@link CommitLogCheckpoint}, followed by (repeated)
* subsequences of [{@link CommitLogManifest}, [{@link CommitLogMutation}] ...]. Each subsequence
* represents the changes in one transaction. The {@code CommitLogManifest} contains deleted
* entity keys, whereas each {@code CommitLogMutation} contains one whole entity.
*/
public static ImmutableList<ImmutableList<VersionedEntity>> loadEntitiesByTransaction(
InputStream inputStream) {
try (AppEngineEnvironment appEngineEnvironment = new AppEngineEnvironment();
InputStream input = new BufferedInputStream(inputStream)) {
Iterator<ImmutableObject> commitLogs = createDeserializingIterator(input);
checkState(commitLogs.hasNext());
checkState(commitLogs.next() instanceof CommitLogCheckpoint);
ImmutableList.Builder<ImmutableList<VersionedEntity>> resultBuilder =
new ImmutableList.Builder<>();
ImmutableList.Builder<VersionedEntity> currentTransactionBuilder =
new ImmutableList.Builder<>();
while (commitLogs.hasNext()) {
ImmutableObject currentObject = commitLogs.next();
if (currentObject instanceof CommitLogManifest) {
// CommitLogManifest means we are starting a new transaction
addIfNonempty(resultBuilder, currentTransactionBuilder);
currentTransactionBuilder = new ImmutableList.Builder<>();
VersionedEntity.fromManifest((CommitLogManifest) currentObject)
.forEach(currentTransactionBuilder::add);
} else if (currentObject instanceof CommitLogMutation) {
currentTransactionBuilder.add(
VersionedEntity.fromMutation((CommitLogMutation) currentObject));
} else {
throw new IllegalStateException(
String.format("Unknown entity type %s in commit logs", currentObject.getClass()));
}
}
// Add the last transaction in (if it's not empty)
addIfNonempty(resultBuilder, currentTransactionBuilder);
return resultBuilder.build();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/** /**
* Returns entities in an {@code inputStream} (from a single CommitLog file) as an {@link * Returns entities in an {@code inputStream} (from a single CommitLog file) as an {@link
* ImmutableList} of {@link VersionedEntity} records. Upon completion the {@code inputStream} is * ImmutableList} of {@link VersionedEntity} records. Upon completion the {@code inputStream} is
@ -57,23 +106,9 @@ public final class CommitLogImports {
* entity keys, whereas each {@code CommitLogMutation} contains one whole entity. * entity keys, whereas each {@code CommitLogMutation} contains one whole entity.
*/ */
public static ImmutableList<VersionedEntity> loadEntities(InputStream inputStream) { public static ImmutableList<VersionedEntity> loadEntities(InputStream inputStream) {
try (AppEngineEnvironment appEngineEnvironment = new AppEngineEnvironment(); return loadEntitiesByTransaction(inputStream).stream()
InputStream input = new BufferedInputStream(inputStream)) { .flatMap(ImmutableList::stream)
Iterator<ImmutableObject> commitLogs = createDeserializingIterator(input); .collect(toImmutableList());
checkState(commitLogs.hasNext());
checkState(commitLogs.next() instanceof CommitLogCheckpoint);
return Streams.stream(commitLogs)
.map(
e ->
e instanceof CommitLogManifest
? VersionedEntity.fromManifest((CommitLogManifest) e)
: Stream.of(VersionedEntity.fromMutation((CommitLogMutation) e)))
.flatMap(s -> s)
.collect(ImmutableList.toImmutableList());
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
/** Covenience method that adapts {@link #loadEntities(InputStream)} to a {@link File}. */ /** Covenience method that adapts {@link #loadEntities(InputStream)} to a {@link File}. */
@ -92,4 +127,13 @@ public final class CommitLogImports {
public static ImmutableList<VersionedEntity> loadEntities(ReadableByteChannel channel) { public static ImmutableList<VersionedEntity> loadEntities(ReadableByteChannel channel) {
return loadEntities(Channels.newInputStream(channel)); return loadEntities(Channels.newInputStream(channel));
} }
private static void addIfNonempty(
ImmutableList.Builder<ImmutableList<VersionedEntity>> resultBuilder,
ImmutableList.Builder<VersionedEntity> currentTransactionBuilder) {
ImmutableList<VersionedEntity> currentTransaction = currentTransactionBuilder.build();
if (!currentTransaction.isEmpty()) {
resultBuilder.add(currentTransaction);
}
}
} }

View file

@ -122,7 +122,7 @@ class GcsDiffFileLister {
// Reconstruct the sequence of files by traversing backwards from "lastUpperBoundTime" (i.e. the // Reconstruct the sequence of files by traversing backwards from "lastUpperBoundTime" (i.e. the
// last file that we found) and finding its previous file until we either run out of files or // last file that we found) and finding its previous file until we either run out of files or
// get to one that preceeds "fromTime". // get to one that precedes "fromTime".
// //
// GCS file listing is eventually consistent, so it's possible that we are missing a file. The // GCS file listing is eventually consistent, so it's possible that we are missing a file. The
// metadata of a file is sufficient to identify the preceding file, so if we start from the // metadata of a file is sufficient to identify the preceding file, so if we start from the

View file

@ -0,0 +1,188 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.backup;
import static google.registry.backup.ExportCommitLogDiffAction.DIFF_FILE_PREFIX;
import static google.registry.model.ofy.EntityWritePriorities.getEntityPriority;
import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
import static org.joda.time.Duration.standardHours;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.tools.cloudstorage.GcsFileMetadata;
import com.google.appengine.tools.cloudstorage.GcsService;
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.FluentLogger;
import google.registry.config.RegistryConfig;
import google.registry.model.server.Lock;
import google.registry.model.translators.VKeyTranslatorFactory;
import google.registry.persistence.VKey;
import google.registry.request.Action;
import google.registry.request.Response;
import google.registry.request.auth.Auth;
import google.registry.schema.replay.DatastoreEntity;
import google.registry.schema.replay.DatastoreOnlyEntity;
import google.registry.schema.replay.NonReplicatedEntity;
import google.registry.schema.replay.SqlReplayCheckpoint;
import google.registry.util.RequestStatusChecker;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.util.Optional;
import javax.inject.Inject;
import javax.servlet.http.HttpServletResponse;
import org.joda.time.DateTime;
import org.joda.time.Duration;
/** Action that replays commit logs to Cloud SQL to keep it up to date. */
@Action(
service = Action.Service.BACKEND,
path = ReplayCommitLogsToSqlAction.PATH,
method = Action.Method.POST,
automaticallyPrintOk = true,
auth = Auth.AUTH_INTERNAL_OR_ADMIN)
public class ReplayCommitLogsToSqlAction implements Runnable {
static final String PATH = "/_dr/task/replayCommitLogsToSql";
private static final int BLOCK_SIZE =
1024 * 1024; // Buffer 1mb at a time, for no particular reason.
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final Duration LEASE_LENGTH = standardHours(1);
@Inject GcsService gcsService;
@Inject Response response;
@Inject RequestStatusChecker requestStatusChecker;
@Inject GcsDiffFileLister diffLister;
@Inject
ReplayCommitLogsToSqlAction() {}
@Override
public void run() {
if (!RegistryConfig.getCloudSqlReplayCommitLogs()) {
String message = "ReplayCommitLogsToSqlAction was called but disabled in the config.";
logger.atWarning().log(message);
// App Engine will retry on any non-2xx status code, which we don't want in this case.
response.setStatus(SC_NO_CONTENT);
response.setPayload(message);
return;
}
Optional<Lock> lock =
Lock.acquire(
this.getClass().getSimpleName(), null, LEASE_LENGTH, requestStatusChecker, false);
if (lock.isEmpty()) {
String message = "Can't acquire SQL commit log replay lock, aborting.";
logger.atSevere().log(message);
// App Engine will retry on any non-2xx status code, which we don't want in this case.
// Let the next run after the next export happen naturally.
response.setStatus(SC_NO_CONTENT);
response.setPayload(message);
return;
}
try {
replayFiles();
response.setStatus(HttpServletResponse.SC_OK);
logger.atInfo().log("ReplayCommitLogsToSqlAction completed successfully.");
} finally {
lock.ifPresent(Lock::release);
}
}
private void replayFiles() {
// Start at the first millisecond we haven't seen yet
DateTime fromTime = jpaTm().transact(() -> SqlReplayCheckpoint.get().plusMillis(1));
// If there's an inconsistent file set, this will throw IllegalStateException and the job
// will try later -- this is likely because an export hasn't finished yet.
ImmutableList<GcsFileMetadata> commitLogFiles =
diffLister.listDiffFiles(fromTime, /* current time */ null);
for (GcsFileMetadata metadata : commitLogFiles) {
// One transaction per GCS file
jpaTm().transact(() -> processFile(metadata));
}
logger.atInfo().log("Replayed %d commit log files to SQL successfully.", commitLogFiles.size());
}
private void processFile(GcsFileMetadata metadata) {
try (InputStream input =
Channels.newInputStream(
gcsService.openPrefetchingReadChannel(metadata.getFilename(), 0, BLOCK_SIZE))) {
// Load and process the Datastore transactions one at a time
ImmutableList<ImmutableList<VersionedEntity>> allTransactions =
CommitLogImports.loadEntitiesByTransaction(input);
allTransactions.forEach(this::replayTransaction);
// if we succeeded, set the last-seen time
DateTime checkpoint =
DateTime.parse(
metadata.getFilename().getObjectName().substring(DIFF_FILE_PREFIX.length()));
SqlReplayCheckpoint.set(checkpoint);
logger.atInfo().log("Replayed %d transactions from commit log file.", allTransactions.size());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void replayTransaction(ImmutableList<VersionedEntity> transaction) {
transaction.stream()
.sorted(ReplayCommitLogsToSqlAction::compareByWeight)
.forEach(
versionedEntity ->
versionedEntity
.getEntity()
.ifPresentOrElse(
this::handleEntityPut, () -> handleEntityDelete(versionedEntity)));
}
private void handleEntityPut(Entity entity) {
Object ofyPojo = ofy().toPojo(entity);
if (ofyPojo instanceof DatastoreEntity) {
DatastoreEntity datastoreEntity = (DatastoreEntity) ofyPojo;
datastoreEntity.toSqlEntity().ifPresent(jpaTm()::put);
} else {
// this should never happen, but we shouldn't fail on it
logger.atSevere().log(
"%s does not implement DatastoreEntity, which is necessary for SQL replay.",
ofyPojo.getClass());
}
}
private void handleEntityDelete(VersionedEntity entityToDelete) {
Key key = entityToDelete.key();
VKey<?> entityVKey;
try {
entityVKey = VKeyTranslatorFactory.createVKey(key);
} catch (RuntimeException e) {
// This means that the key wasn't convertible to VKey through the standard methods or via
// a createVKey method. This means that the object isn't persisted in SQL so we ignore it.
logger.atInfo().log(
"Skipping SQL delete for kind %s since it is not convertible.", key.getKind());
return;
}
Class<?> entityClass = entityVKey.getKind();
// Delete the key iff the class represents a JPA entity that is replicated
if (!NonReplicatedEntity.class.isAssignableFrom(entityClass)
&& !DatastoreOnlyEntity.class.isAssignableFrom(entityClass)
&& entityClass.getAnnotation(javax.persistence.Entity.class) != null) {
jpaTm().delete(entityVKey);
}
}
private static int compareByWeight(VersionedEntity a, VersionedEntity b) {
return getEntityPriority(a.key().getKind(), a.getEntity().isEmpty())
- getEntityPriority(b.key().getKind(), b.getEntity().isEmpty());
}
}

View file

@ -1592,6 +1592,22 @@ public final class RegistryConfig {
CONFIG_SETTINGS.get().cloudSql.replicateTransactions = replicateTransactions; CONFIG_SETTINGS.get().cloudSql.replicateTransactions = replicateTransactions;
} }
/**
* Returns whether or not to replay commit logs to the SQL database after export to GCS.
*
* <p>If true, we will trigger the {@link google.registry.backup.ReplayCommitLogsToSqlAction}
* after the {@link google.registry.backup.ExportCommitLogDiffAction} to load the commit logs and
* replay them to SQL.
*/
public static boolean getCloudSqlReplayCommitLogs() {
return CONFIG_SETTINGS.get().cloudSql.replayCommitLogs;
}
@VisibleForTesting
public static void overrideCloudSqlReplayCommitLogs(boolean replayCommitLogs) {
CONFIG_SETTINGS.get().cloudSql.replayCommitLogs = replayCommitLogs;
}
/** Returns the roid suffix to be used for the roids of all contacts and hosts. */ /** Returns the roid suffix to be used for the roids of all contacts and hosts. */
public static String getContactAndHostRoidSuffix() { public static String getContactAndHostRoidSuffix() {
return CONFIG_SETTINGS.get().registryPolicy.contactAndHostRoidSuffix; return CONFIG_SETTINGS.get().registryPolicy.contactAndHostRoidSuffix;

View file

@ -126,6 +126,7 @@ public class RegistryConfigSettings {
public String username; public String username;
public String instanceConnectionName; public String instanceConnectionName;
public boolean replicateTransactions; public boolean replicateTransactions;
public boolean replayCommitLogs;
} }
/** Configuration for Apache Beam (Cloud Dataflow). */ /** Configuration for Apache Beam (Cloud Dataflow). */

View file

@ -233,6 +233,8 @@ cloudSql:
# Set this to true to replicate cloud SQL transactions to datastore in the # Set this to true to replicate cloud SQL transactions to datastore in the
# background. # background.
replicateTransactions: false replicateTransactions: false
# Set this to true to enable replay of commit logs to SQL
replayCommitLogs: false
cloudDns: cloudDns:
# Set both properties to null in Production. # Set both properties to null in Production.

View file

@ -208,6 +208,12 @@
<max-concurrent-requests>5</max-concurrent-requests> <max-concurrent-requests>5</max-concurrent-requests>
</queue> </queue>
<!-- Queue for replaying commit logs to SQL during the transition from Datastore -> SQL. -->
<queue>
<name>replay-commit-logs-to-sql</name>
<rate>1/s</rate>
</queue>
<!-- The load[0-9] queues are used for load-testing, and can be safely deleted <!-- The load[0-9] queues are used for load-testing, and can be safely deleted
in any environment that doesn't require load-testing. --> in any environment that doesn't require load-testing. -->
<queue> <queue>

View file

@ -0,0 +1,60 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.model.ofy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
/**
* Contains the mapping from class names to SQL-replay-write priorities.
*
* <p>When replaying Datastore commit logs to SQL (asynchronous replication), in order to avoid
* issues with foreign keys, we should replay entity writes so that foreign key references are
* always written after the entity that they reference. This class represents that DAG, where lower
* values represent an earlier write (and later delete). Higher-valued classes can have foreign keys
* on lower-valued classes, but not vice versa.
*/
public class EntityWritePriorities {
/**
* Mapping from class name to "priority".
*
* <p>Here, "priority" means the order in which the class should be inserted / updated in a
* transaction with respect to instances of other classes. By default, all classes have a priority
* number of zero.
*
* <p>For each transaction, classes should be written in priority order from the lowest number to
* the highest, in order to maintain foreign-key write consistency. For the same reason, deletes
* should happen after all writes.
*/
static final ImmutableMap<String, Integer> CLASS_PRIORITIES =
ImmutableMap.of(
"HistoryEntry", -10,
"AllocationToken", -9,
"ContactResource", 5,
"DomainBase", 10);
// The beginning of the range of priority numbers reserved for delete. This must be greater than
// any of the values in CLASS_PRIORITIES by enough overhead to accommodate any negative values in
// it. Note: by design, deletions will happen in the opposite order of insertions, which is
// necessary to make sure foreign keys aren't violated during deletion.
@VisibleForTesting static final int DELETE_RANGE = Integer.MAX_VALUE / 2;
/** Returns the priority of the entity type in the map entry. */
public static int getEntityPriority(String kind, boolean isDelete) {
int priority = CLASS_PRIORITIES.getOrDefault(kind, 0);
return isDelete ? DELETE_RANGE - priority : priority;
}
}

View file

@ -20,6 +20,7 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Maps.filterValues; import static com.google.common.collect.Maps.filterValues;
import static com.google.common.collect.Maps.toMap; import static com.google.common.collect.Maps.toMap;
import static google.registry.model.ofy.CommitLogBucket.getArbitraryBucketId; import static google.registry.model.ofy.CommitLogBucket.getArbitraryBucketId;
import static google.registry.model.ofy.EntityWritePriorities.getEntityPriority;
import static google.registry.model.ofy.ObjectifyService.ofy; import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
@ -60,7 +61,7 @@ class TransactionInfo {
TransactionInfo(DateTime now) { TransactionInfo(DateTime now) {
this.transactionTime = now; this.transactionTime = now;
ofy().load().key(bucketKey); // Asynchronously load value into session cache. ofy().load().key(bucketKey); // Asynchronously load value into session cache.
} }
TransactionInfo setReadOnly() { TransactionInfo setReadOnly() {
@ -100,23 +101,10 @@ class TransactionInfo {
.collect(toImmutableSet()); .collect(toImmutableSet());
} }
// Mapping from class name to "weight" (which in this case is the order in which the class must
// be "put" in a transaction with respect to instances of other classes). Lower weight classes
// are put first, by default all classes have a weight of zero.
static final ImmutableMap<String, Integer> CLASS_WEIGHTS =
ImmutableMap.of(
"HistoryEntry", -1,
"DomainBase", 1);
// The beginning of the range of weights reserved for delete. This must be greater than any of
// the values in CLASS_WEIGHTS by enough overhead to accomodate any negative values in it.
@VisibleForTesting static final int DELETE_RANGE = Integer.MAX_VALUE / 2;
/** Returns the weight of the entity type in the map entry. */ /** Returns the weight of the entity type in the map entry. */
@VisibleForTesting @VisibleForTesting
static int getWeight(ImmutableMap.Entry<Key<?>, Object> entry) { static int getWeight(ImmutableMap.Entry<Key<?>, Object> entry) {
int weight = CLASS_WEIGHTS.getOrDefault(entry.getKey().getKind(), 0); return getEntityPriority(entry.getKey().getKind(), entry.getValue().equals(Delete.SENTINEL));
return entry.getValue().equals(Delete.SENTINEL) ? DELETE_RANGE - weight : weight;
} }
private static int compareByWeight( private static int compareByWeight(

View file

@ -0,0 +1,458 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.backup;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static google.registry.backup.RestoreCommitLogsActionTest.GCS_BUCKET;
import static google.registry.backup.RestoreCommitLogsActionTest.createCheckpoint;
import static google.registry.backup.RestoreCommitLogsActionTest.saveDiffFile;
import static google.registry.backup.RestoreCommitLogsActionTest.saveDiffFileNotToRestore;
import static google.registry.model.common.EntityGroupRoot.getCrossTldKey;
import static google.registry.model.ofy.CommitLogBucket.getBucketKey;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static google.registry.testing.DatabaseHelper.createTld;
import static google.registry.testing.DatabaseHelper.newDomainBase;
import static google.registry.testing.DatabaseHelper.persistActiveContact;
import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.appengine.tools.cloudstorage.GcsService;
import com.google.appengine.tools.cloudstorage.GcsServiceFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.truth.Truth8;
import com.googlecode.objectify.Key;
import google.registry.config.RegistryConfig;
import google.registry.model.common.Cursor;
import google.registry.model.common.Cursor.CursorType;
import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DomainBase;
import google.registry.model.domain.GracePeriod;
import google.registry.model.domain.secdns.DelegationSignerData;
import google.registry.model.ofy.CommitLogBucket;
import google.registry.model.ofy.CommitLogManifest;
import google.registry.model.ofy.CommitLogMutation;
import google.registry.model.registrar.RegistrarContact;
import google.registry.model.registry.label.ReservedList;
import google.registry.model.server.Lock;
import google.registry.model.tmch.ClaimsListShard;
import google.registry.model.translators.VKeyTranslatorFactory;
import google.registry.persistence.VKey;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.persistence.transaction.TransactionManagerFactory;
import google.registry.schema.replay.SqlReplayCheckpoint;
import google.registry.testing.AppEngineExtension;
import google.registry.testing.FakeClock;
import google.registry.testing.FakeResponse;
import google.registry.testing.TestObject;
import google.registry.util.RequestStatusChecker;
import java.io.IOException;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
/** Tests for {@link ReplayCommitLogsToSqlAction}. */
@ExtendWith(MockitoExtension.class)
public class ReplayCommitLogsToSqlActionTest {
private final FakeClock fakeClock = new FakeClock(DateTime.parse("2000-01-01TZ"));
@RegisterExtension
public final AppEngineExtension appEngine =
AppEngineExtension.builder()
.withDatastoreAndCloudSql()
.withClock(fakeClock)
.withOfyTestEntities(TestObject.class)
.withJpaUnitTestEntities(
RegistrarContact.class,
TestObject.class,
SqlReplayCheckpoint.class,
ContactResource.class,
DomainBase.class,
GracePeriod.class,
DelegationSignerData.class)
.build();
/** Local GCS service. */
private final GcsService gcsService = GcsServiceFactory.createGcsService();
private final ReplayCommitLogsToSqlAction action = new ReplayCommitLogsToSqlAction();
private final FakeResponse response = new FakeResponse();
@Mock private RequestStatusChecker requestStatusChecker;
@BeforeAll
static void beforeAll() {
VKeyTranslatorFactory.addTestEntityClass(TestObject.class);
}
@BeforeEach
void beforeEach() {
action.gcsService = gcsService;
action.response = response;
action.requestStatusChecker = requestStatusChecker;
action.diffLister = new GcsDiffFileLister();
action.diffLister.gcsService = gcsService;
action.diffLister.gcsBucket = GCS_BUCKET;
action.diffLister.executor = newDirectExecutorService();
RegistryConfig.overrideCloudSqlReplayCommitLogs(true);
}
@Test
void testReplay_multipleDiffFiles() throws Exception {
jpaTm()
.transact(
() -> {
jpaTm().insertWithoutBackup(TestObject.create("previous to keep"));
jpaTm().insertWithoutBackup(TestObject.create("previous to delete"));
});
DateTime now = fakeClock.nowUtc();
// Create 3 transactions, across two diff files.
// Before: {"previous to keep", "previous to delete"}
// 1a: Add {"a", "b"}, Delete {"previous to delete"}
// 1b: Add {"c", "d"}, Delete {"a"}
// 2: Add {"e", "f"}, Delete {"c"}
// After: {"previous to keep", "b", "d", "e", "f"}
Key<CommitLogManifest> manifest1aKey =
CommitLogManifest.createKey(getBucketKey(1), now.minusMinutes(3));
Key<CommitLogManifest> manifest1bKey =
CommitLogManifest.createKey(getBucketKey(2), now.minusMinutes(2));
Key<CommitLogManifest> manifest2Key =
CommitLogManifest.createKey(getBucketKey(1), now.minusMinutes(1));
saveDiffFileNotToRestore(gcsService, now.minusMinutes(2));
saveDiffFile(
gcsService,
createCheckpoint(now.minusMinutes(1)),
CommitLogManifest.create(
getBucketKey(1),
now.minusMinutes(3),
ImmutableSet.of(Key.create(TestObject.create("previous to delete")))),
CommitLogMutation.create(manifest1aKey, TestObject.create("a")),
CommitLogMutation.create(manifest1aKey, TestObject.create("b")),
CommitLogManifest.create(
getBucketKey(2),
now.minusMinutes(2),
ImmutableSet.of(Key.create(TestObject.create("a")))),
CommitLogMutation.create(manifest1bKey, TestObject.create("c")),
CommitLogMutation.create(manifest1bKey, TestObject.create("d")));
saveDiffFile(
gcsService,
createCheckpoint(now),
CommitLogManifest.create(
getBucketKey(1),
now.minusMinutes(1),
ImmutableSet.of(Key.create(TestObject.create("c")))),
CommitLogMutation.create(manifest2Key, TestObject.create("e")),
CommitLogMutation.create(manifest2Key, TestObject.create("f")));
jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1)));
fakeClock.advanceOneMilli();
runAndAssertSuccess(now);
assertExpectedIds("previous to keep", "b", "d", "e", "f");
}
@Test
void testReplay_noManifests() throws Exception {
DateTime now = fakeClock.nowUtc();
jpaTm().transact(() -> jpaTm().insertWithoutBackup(TestObject.create("previous to keep")));
saveDiffFileNotToRestore(gcsService, now.minusMinutes(1));
saveDiffFile(gcsService, createCheckpoint(now.minusMillis(2)));
jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMillis(1)));
runAndAssertSuccess(now.minusMillis(1));
assertExpectedIds("previous to keep");
}
@Test
void testReplay_manifestWithNoDeletions() throws Exception {
DateTime now = fakeClock.nowUtc();
jpaTm().transact(() -> jpaTm().insertWithoutBackup(TestObject.create("previous to keep")));
Key<CommitLogBucket> bucketKey = getBucketKey(1);
Key<CommitLogManifest> manifestKey = CommitLogManifest.createKey(bucketKey, now);
saveDiffFileNotToRestore(gcsService, now.minusMinutes(2));
jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1)));
saveDiffFile(
gcsService,
createCheckpoint(now.minusMinutes(1)),
CommitLogManifest.create(bucketKey, now, null),
CommitLogMutation.create(manifestKey, TestObject.create("a")),
CommitLogMutation.create(manifestKey, TestObject.create("b")));
runAndAssertSuccess(now.minusMinutes(1));
assertExpectedIds("previous to keep", "a", "b");
}
@Test
void testReplay_manifestWithNoMutations() throws Exception {
DateTime now = fakeClock.nowUtc();
jpaTm()
.transact(
() -> {
jpaTm().insertWithoutBackup(TestObject.create("previous to keep"));
jpaTm().insertWithoutBackup(TestObject.create("previous to delete"));
});
saveDiffFileNotToRestore(gcsService, now.minusMinutes(2));
jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1)));
saveDiffFile(
gcsService,
createCheckpoint(now.minusMinutes(1)),
CommitLogManifest.create(
getBucketKey(1),
now,
ImmutableSet.of(Key.create(TestObject.create("previous to delete")))));
runAndAssertSuccess(now.minusMinutes(1));
assertExpectedIds("previous to keep");
}
@Test
void wtestReplay_mutateExistingEntity() throws Exception {
DateTime now = fakeClock.nowUtc();
jpaTm().transact(() -> jpaTm().put(TestObject.create("existing", "a")));
Key<CommitLogManifest> manifestKey = CommitLogManifest.createKey(getBucketKey(1), now);
saveDiffFileNotToRestore(gcsService, now.minusMinutes(1).minusMillis(1));
jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1)));
saveDiffFile(
gcsService,
createCheckpoint(now.minusMillis(1)),
CommitLogManifest.create(getBucketKey(1), now, null),
CommitLogMutation.create(manifestKey, TestObject.create("existing", "b")));
action.run();
TestObject fromDatabase =
jpaTm().transact(() -> jpaTm().load(VKey.createSql(TestObject.class, "existing")));
assertThat(fromDatabase.getField()).isEqualTo("b");
}
// This should be harmless
@Test
void testReplay_deleteMissingEntity() throws Exception {
DateTime now = fakeClock.nowUtc();
jpaTm().transact(() -> jpaTm().put(TestObject.create("previous to keep", "a")));
saveDiffFileNotToRestore(gcsService, now.minusMinutes(1).minusMillis(1));
jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1)));
saveDiffFile(
gcsService,
createCheckpoint(now.minusMillis(1)),
CommitLogManifest.create(
getBucketKey(1),
now,
ImmutableSet.of(Key.create(TestObject.create("previous to delete")))));
action.run();
assertExpectedIds("previous to keep");
}
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
void testReplay_properlyWeighted() throws Exception {
DateTime now = fakeClock.nowUtc();
Key<CommitLogManifest> manifestKey =
CommitLogManifest.createKey(getBucketKey(1), now.minusMinutes(1));
// Create (but don't save to SQL) a domain + contact
createTld("tld");
DomainBase domain = newDomainBase("example.tld");
CommitLogMutation domainMutation =
tm().transact(() -> CommitLogMutation.create(manifestKey, domain));
ContactResource contact = tm().transact(() -> tm().load(domain.getRegistrant()));
CommitLogMutation contactMutation =
tm().transact(() -> CommitLogMutation.create(manifestKey, contact));
// Create and save to SQL a registrar contact that we will delete
RegistrarContact toDelete = AppEngineExtension.makeRegistrarContact1();
jpaTm().transact(() -> jpaTm().put(toDelete));
jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1)));
// spy the txn manager so we can see what order things were inserted/removed
JpaTransactionManager spy = spy(jpaTm());
TransactionManagerFactory.setJpaTm(() -> spy);
// Save in the commit logs the domain and contact (in that order) and the token deletion
saveDiffFile(
gcsService,
createCheckpoint(now.minusMinutes(1)),
CommitLogManifest.create(
getBucketKey(1), now.minusMinutes(1), ImmutableSet.of(Key.create(toDelete))),
domainMutation,
contactMutation);
runAndAssertSuccess(now.minusMinutes(1));
// Verify two things:
// 1. that the contact insert occurred before the domain insert (necessary for FK ordering)
// even though the domain came first in the file
// 2. that the allocation token delete occurred after the insertions
InOrder inOrder = Mockito.inOrder(spy);
inOrder.verify(spy).put(any(ContactResource.class));
inOrder.verify(spy).put(any(DomainBase.class));
inOrder.verify(spy).delete(toDelete.createVKey());
inOrder.verify(spy).put(any(SqlReplayCheckpoint.class));
}
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
void testReplay_properlyWeighted_doesNotApplyCrossTransactions() throws Exception {
DateTime now = fakeClock.nowUtc();
Key<CommitLogManifest> manifestKey =
CommitLogManifest.createKey(getBucketKey(1), now.minusMinutes(1));
// Create and save the standard contact
ContactResource contact = persistActiveContact("contact1234");
jpaTm().transact(() -> jpaTm().put(contact));
// Simulate a Datastore transaction with a new version of the contact
ContactResource contactWithEdit =
contact.asBuilder().setEmailAddress("replay@example.tld").build();
CommitLogMutation contactMutation =
tm().transact(() -> CommitLogMutation.create(manifestKey, contactWithEdit));
jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1)));
// spy the txn manager so we can see what order things were inserted
JpaTransactionManager spy = spy(jpaTm());
TransactionManagerFactory.setJpaTm(() -> spy);
// Save two commits -- the deletion, then the new version of the contact
saveDiffFile(
gcsService,
createCheckpoint(now.minusMinutes(1).plusMillis(1)),
CommitLogManifest.create(
getBucketKey(1), now.minusMinutes(1), ImmutableSet.of(Key.create(contact))),
CommitLogManifest.create(
getBucketKey(1), now.minusMinutes(1).plusMillis(1), ImmutableSet.of()),
contactMutation);
runAndAssertSuccess(now.minusMinutes(1).plusMillis(1));
// Verify that the delete occurred first (because it was in the first transaction) even though
// deletes have higher weight
ArgumentCaptor<Object> putCaptor = ArgumentCaptor.forClass(Object.class);
InOrder inOrder = Mockito.inOrder(spy);
inOrder.verify(spy).delete(contact.createVKey());
inOrder.verify(spy).put(putCaptor.capture());
assertThat(putCaptor.getValue().getClass()).isEqualTo(ContactResource.class);
assertThat(jpaTm().transact(() -> jpaTm().load(contact.createVKey()).getEmailAddress()))
.isEqualTo("replay@example.tld");
}
@Test
void testSuccess_nonReplicatedEntity_isNotReplayed() {
DateTime now = fakeClock.nowUtc();
// spy the txn manager so we can verify it's never called
JpaTransactionManager spy = spy(jpaTm());
TransactionManagerFactory.setJpaTm(() -> spy);
jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1)));
Key<CommitLogManifest> manifestKey =
CommitLogManifest.createKey(getBucketKey(1), now.minusMinutes(1));
// Have a commit log with a couple objects that shouldn't be replayed
ReservedList reservedList =
new ReservedList.Builder().setReservedListMap(ImmutableMap.of()).setName("name").build();
Cursor cursor = Cursor.createGlobal(CursorType.RECURRING_BILLING, now.minusHours(1));
tm().transact(
() -> {
try {
saveDiffFile(
gcsService,
createCheckpoint(now.minusMinutes(1)),
CommitLogManifest.create(
getBucketKey(1), now.minusMinutes(1), ImmutableSet.of()),
// Reserved list is dually-written non-replicated
CommitLogMutation.create(manifestKey, reservedList),
// Cursors aren't replayed to SQL at all
CommitLogMutation.create(manifestKey, cursor));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
runAndAssertSuccess(now.minusMinutes(1));
// jpaTm()::put should only have been called with the checkpoint
verify(spy, times(2)).put(any(SqlReplayCheckpoint.class));
verify(spy, times(2)).put(any());
}
@Test
void testSuccess_nonReplicatedEntity_isNotDeleted() throws Exception {
DateTime now = fakeClock.nowUtc();
// spy the txn manager so we can verify it's never called
JpaTransactionManager spy = spy(jpaTm());
TransactionManagerFactory.setJpaTm(() -> spy);
jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1)));
// Save a couple deletes that aren't propagated to SQL (the objects deleted are irrelevant)
Key<ClaimsListShard> claimsListKey = Key.create(ClaimsListShard.class, 1L);
saveDiffFile(
gcsService,
createCheckpoint(now.minusMinutes(1)),
CommitLogManifest.create(
getBucketKey(1),
now.minusMinutes(1),
// one object only exists in Datastore, one is dually-written (so isn't replicated)
ImmutableSet.of(getCrossTldKey(), claimsListKey)));
runAndAssertSuccess(now.minusMinutes(1));
verify(spy, times(0)).delete(any(VKey.class));
}
@Test
void testFailure_notEnabled() {
RegistryConfig.overrideCloudSqlReplayCommitLogs(false);
action.run();
assertThat(response.getStatus()).isEqualTo(SC_NO_CONTENT);
assertThat(response.getPayload())
.isEqualTo("ReplayCommitLogsToSqlAction was called but disabled in the config.");
}
@Test
void testFailure_cannotAcquireLock() {
Truth8.assertThat(
Lock.acquire(
ReplayCommitLogsToSqlAction.class.getSimpleName(),
null,
Duration.standardHours(1),
requestStatusChecker,
false))
.isPresent();
fakeClock.advanceOneMilli();
action.run();
assertThat(response.getStatus()).isEqualTo(SC_NO_CONTENT);
assertThat(response.getPayload())
.isEqualTo("Can't acquire SQL commit log replay lock, aborting.");
}
private void runAndAssertSuccess(DateTime expectedCheckpointTime) {
action.run();
assertThat(response.getStatus()).isEqualTo(SC_OK);
assertThat(jpaTm().transact(SqlReplayCheckpoint::get)).isEqualTo(expectedCheckpointTime);
}
private void assertExpectedIds(String... expectedIds) {
ImmutableList<String> actualIds =
jpaTm()
.transact(
() ->
jpaTm().loadAll(TestObject.class).stream()
.map(TestObject::getId)
.collect(toImmutableList()));
assertThat(actualIds).containsExactlyElementsIn(expectedIds);
}
}

View file

@ -61,7 +61,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
/** Unit tests for {@link RestoreCommitLogsAction}. */ /** Unit tests for {@link RestoreCommitLogsAction}. */
public class RestoreCommitLogsActionTest { public class RestoreCommitLogsActionTest {
private static final String GCS_BUCKET = "gcs bucket"; static final String GCS_BUCKET = "gcs bucket";
private final DateTime now = DateTime.now(UTC); private final DateTime now = DateTime.now(UTC);
private final RestoreCommitLogsAction action = new RestoreCommitLogsAction(); private final RestoreCommitLogsAction action = new RestoreCommitLogsAction();
@ -89,9 +89,10 @@ public class RestoreCommitLogsActionTest {
@Test @Test
void testRestore_multipleDiffFiles() throws Exception { void testRestore_multipleDiffFiles() throws Exception {
ofy().saveWithoutBackup().entities( ofy()
TestObject.create("previous to keep"), .saveWithoutBackup()
TestObject.create("previous to delete")).now(); .entities(TestObject.create("previous to keep"), TestObject.create("previous to delete"))
.now();
// Create 3 transactions, across two diff files. // Create 3 transactions, across two diff files.
// Before: {"previous to keep", "previous to delete"} // Before: {"previous to keep", "previous to delete"}
// 1a: Add {"a", "b"}, Delete {"previous to delete"} // 1a: Add {"a", "b"}, Delete {"previous to delete"}
@ -104,29 +105,33 @@ public class RestoreCommitLogsActionTest {
CommitLogManifest.createKey(getBucketKey(2), now.minusMinutes(2)); CommitLogManifest.createKey(getBucketKey(2), now.minusMinutes(2));
Key<CommitLogManifest> manifest2Key = Key<CommitLogManifest> manifest2Key =
CommitLogManifest.createKey(getBucketKey(1), now.minusMinutes(1)); CommitLogManifest.createKey(getBucketKey(1), now.minusMinutes(1));
saveDiffFileNotToRestore(now.minusMinutes(2)); saveDiffFileNotToRestore(gcsService, now.minusMinutes(2));
Iterable<ImmutableObject> file1CommitLogs = saveDiffFile( Iterable<ImmutableObject> file1CommitLogs =
createCheckpoint(now.minusMinutes(1)), saveDiffFile(
CommitLogManifest.create( gcsService,
getBucketKey(1), createCheckpoint(now.minusMinutes(1)),
now.minusMinutes(3), CommitLogManifest.create(
ImmutableSet.of(Key.create(TestObject.create("previous to delete")))), getBucketKey(1),
CommitLogMutation.create(manifest1aKey, TestObject.create("a")), now.minusMinutes(3),
CommitLogMutation.create(manifest1aKey, TestObject.create("b")), ImmutableSet.of(Key.create(TestObject.create("previous to delete")))),
CommitLogManifest.create( CommitLogMutation.create(manifest1aKey, TestObject.create("a")),
getBucketKey(2), CommitLogMutation.create(manifest1aKey, TestObject.create("b")),
now.minusMinutes(2), CommitLogManifest.create(
ImmutableSet.of(Key.create(TestObject.create("a")))), getBucketKey(2),
CommitLogMutation.create(manifest1bKey, TestObject.create("c")), now.minusMinutes(2),
CommitLogMutation.create(manifest1bKey, TestObject.create("d"))); ImmutableSet.of(Key.create(TestObject.create("a")))),
Iterable<ImmutableObject> file2CommitLogs = saveDiffFile( CommitLogMutation.create(manifest1bKey, TestObject.create("c")),
createCheckpoint(now), CommitLogMutation.create(manifest1bKey, TestObject.create("d")));
CommitLogManifest.create( Iterable<ImmutableObject> file2CommitLogs =
getBucketKey(1), saveDiffFile(
now.minusMinutes(1), gcsService,
ImmutableSet.of(Key.create(TestObject.create("c")))), createCheckpoint(now),
CommitLogMutation.create(manifest2Key, TestObject.create("e")), CommitLogManifest.create(
CommitLogMutation.create(manifest2Key, TestObject.create("f"))); getBucketKey(1),
now.minusMinutes(1),
ImmutableSet.of(Key.create(TestObject.create("c")))),
CommitLogMutation.create(manifest2Key, TestObject.create("e")),
CommitLogMutation.create(manifest2Key, TestObject.create("f")));
action.fromTime = now.minusMinutes(1).minusMillis(1); action.fromTime = now.minusMinutes(1).minusMillis(1);
action.run(); action.run();
ofy().clearSessionCache(); ofy().clearSessionCache();
@ -139,10 +144,9 @@ public class RestoreCommitLogsActionTest {
@Test @Test
void testRestore_noManifests() throws Exception { void testRestore_noManifests() throws Exception {
ofy().saveWithoutBackup().entity( ofy().saveWithoutBackup().entity(TestObject.create("previous to keep")).now();
TestObject.create("previous to keep")).now(); saveDiffFileNotToRestore(gcsService, now.minusMinutes(1));
saveDiffFileNotToRestore(now.minusMinutes(1)); Iterable<ImmutableObject> commitLogs = saveDiffFile(gcsService, createCheckpoint(now));
Iterable<ImmutableObject> commitLogs = saveDiffFile(createCheckpoint(now));
action.run(); action.run();
ofy().clearSessionCache(); ofy().clearSessionCache();
assertExpectedIds("previous to keep"); assertExpectedIds("previous to keep");
@ -156,32 +160,37 @@ public class RestoreCommitLogsActionTest {
ofy().saveWithoutBackup().entity(TestObject.create("previous to keep")).now(); ofy().saveWithoutBackup().entity(TestObject.create("previous to keep")).now();
Key<CommitLogBucket> bucketKey = getBucketKey(1); Key<CommitLogBucket> bucketKey = getBucketKey(1);
Key<CommitLogManifest> manifestKey = CommitLogManifest.createKey(bucketKey, now); Key<CommitLogManifest> manifestKey = CommitLogManifest.createKey(bucketKey, now);
saveDiffFileNotToRestore(now.minusMinutes(1)); saveDiffFileNotToRestore(gcsService, now.minusMinutes(1));
Iterable<ImmutableObject> commitLogs = saveDiffFile( Iterable<ImmutableObject> commitLogs =
createCheckpoint(now), saveDiffFile(
CommitLogManifest.create(bucketKey, now, null), gcsService,
CommitLogMutation.create(manifestKey, TestObject.create("a")), createCheckpoint(now),
CommitLogMutation.create(manifestKey, TestObject.create("b"))); CommitLogManifest.create(bucketKey, now, null),
CommitLogMutation.create(manifestKey, TestObject.create("a")),
CommitLogMutation.create(manifestKey, TestObject.create("b")));
action.run(); action.run();
ofy().clearSessionCache(); ofy().clearSessionCache();
assertExpectedIds("previous to keep", "a", "b"); assertExpectedIds("previous to keep", "a", "b");
assertInDatastore(commitLogs); assertInDatastore(commitLogs);
assertInDatastore(CommitLogCheckpointRoot.create(now)); assertInDatastore(CommitLogCheckpointRoot.create(now));
assertCommitLogBuckets(ImmutableMap.of(1, now)); assertCommitLogBuckets(ImmutableMap.of(1, now));
} }
@Test @Test
void testRestore_manifestWithNoMutations() throws Exception { void testRestore_manifestWithNoMutations() throws Exception {
ofy().saveWithoutBackup().entities( ofy()
TestObject.create("previous to keep"), .saveWithoutBackup()
TestObject.create("previous to delete")).now(); .entities(TestObject.create("previous to keep"), TestObject.create("previous to delete"))
saveDiffFileNotToRestore(now.minusMinutes(1)); .now();
Iterable<ImmutableObject> commitLogs = saveDiffFile( saveDiffFileNotToRestore(gcsService, now.minusMinutes(1));
createCheckpoint(now), Iterable<ImmutableObject> commitLogs =
CommitLogManifest.create( saveDiffFile(
getBucketKey(1), gcsService,
now, createCheckpoint(now),
ImmutableSet.of(Key.create(TestObject.create("previous to delete"))))); CommitLogManifest.create(
getBucketKey(1),
now,
ImmutableSet.of(Key.create(TestObject.create("previous to delete")))));
action.run(); action.run();
ofy().clearSessionCache(); ofy().clearSessionCache();
assertExpectedIds("previous to keep"); assertExpectedIds("previous to keep");
@ -193,12 +202,13 @@ public class RestoreCommitLogsActionTest {
// This is a pathological case that shouldn't be possible, but we should be robust to it. // This is a pathological case that shouldn't be possible, but we should be robust to it.
@Test @Test
void testRestore_manifestWithNoMutationsOrDeletions() throws Exception { void testRestore_manifestWithNoMutationsOrDeletions() throws Exception {
ofy().saveWithoutBackup().entities( ofy().saveWithoutBackup().entities(TestObject.create("previous to keep")).now();
TestObject.create("previous to keep")).now(); saveDiffFileNotToRestore(gcsService, now.minusMinutes(1));
saveDiffFileNotToRestore(now.minusMinutes(1)); Iterable<ImmutableObject> commitLogs =
Iterable<ImmutableObject> commitLogs = saveDiffFile( saveDiffFile(
createCheckpoint(now), gcsService,
CommitLogManifest.create(getBucketKey(1), now, null)); createCheckpoint(now),
CommitLogManifest.create(getBucketKey(1), now, null));
action.run(); action.run();
ofy().clearSessionCache(); ofy().clearSessionCache();
assertExpectedIds("previous to keep"); assertExpectedIds("previous to keep");
@ -211,11 +221,13 @@ public class RestoreCommitLogsActionTest {
void testRestore_mutateExistingEntity() throws Exception { void testRestore_mutateExistingEntity() throws Exception {
ofy().saveWithoutBackup().entity(TestObject.create("existing", "a")).now(); ofy().saveWithoutBackup().entity(TestObject.create("existing", "a")).now();
Key<CommitLogManifest> manifestKey = CommitLogManifest.createKey(getBucketKey(1), now); Key<CommitLogManifest> manifestKey = CommitLogManifest.createKey(getBucketKey(1), now);
saveDiffFileNotToRestore(now.minusMinutes(1)); saveDiffFileNotToRestore(gcsService, now.minusMinutes(1));
Iterable<ImmutableObject> commitLogs = saveDiffFile( Iterable<ImmutableObject> commitLogs =
createCheckpoint(now), saveDiffFile(
CommitLogManifest.create(getBucketKey(1), now, null), gcsService,
CommitLogMutation.create(manifestKey, TestObject.create("existing", "b"))); createCheckpoint(now),
CommitLogManifest.create(getBucketKey(1), now, null),
CommitLogMutation.create(manifestKey, TestObject.create("existing", "b")));
action.run(); action.run();
ofy().clearSessionCache(); ofy().clearSessionCache();
assertThat(ofy().load().entity(TestObject.create("existing")).now().getField()).isEqualTo("b"); assertThat(ofy().load().entity(TestObject.create("existing")).now().getField()).isEqualTo("b");
@ -228,13 +240,15 @@ public class RestoreCommitLogsActionTest {
@Test @Test
void testRestore_deleteMissingEntity() throws Exception { void testRestore_deleteMissingEntity() throws Exception {
ofy().saveWithoutBackup().entity(TestObject.create("previous to keep", "a")).now(); ofy().saveWithoutBackup().entity(TestObject.create("previous to keep", "a")).now();
saveDiffFileNotToRestore(now.minusMinutes(1)); saveDiffFileNotToRestore(gcsService, now.minusMinutes(1));
Iterable<ImmutableObject> commitLogs = saveDiffFile( Iterable<ImmutableObject> commitLogs =
createCheckpoint(now), saveDiffFile(
CommitLogManifest.create( gcsService,
getBucketKey(1), createCheckpoint(now),
now, CommitLogManifest.create(
ImmutableSet.of(Key.create(TestObject.create("previous to delete"))))); getBucketKey(1),
now,
ImmutableSet.of(Key.create(TestObject.create("previous to delete")))));
action.run(); action.run();
ofy().clearSessionCache(); ofy().clearSessionCache();
assertExpectedIds("previous to keep"); assertExpectedIds("previous to keep");
@ -243,12 +257,13 @@ public class RestoreCommitLogsActionTest {
assertInDatastore(CommitLogCheckpointRoot.create(now)); assertInDatastore(CommitLogCheckpointRoot.create(now));
} }
private CommitLogCheckpoint createCheckpoint(DateTime now) { static CommitLogCheckpoint createCheckpoint(DateTime now) {
return CommitLogCheckpoint.create(now, toMap(getBucketIds(), x -> now)); return CommitLogCheckpoint.create(now, toMap(getBucketIds(), x -> now));
} }
private Iterable<ImmutableObject> saveDiffFile( static Iterable<ImmutableObject> saveDiffFile(
CommitLogCheckpoint checkpoint, ImmutableObject... entities) throws IOException { GcsService gcsService, CommitLogCheckpoint checkpoint, ImmutableObject... entities)
throws IOException {
DateTime now = checkpoint.getCheckpointTime(); DateTime now = checkpoint.getCheckpointTime();
List<ImmutableObject> allEntities = Lists.asList(checkpoint, entities); List<ImmutableObject> allEntities = Lists.asList(checkpoint, entities);
ByteArrayOutputStream output = new ByteArrayOutputStream(); ByteArrayOutputStream output = new ByteArrayOutputStream();
@ -264,8 +279,9 @@ public class RestoreCommitLogsActionTest {
return allEntities; return allEntities;
} }
private void saveDiffFileNotToRestore(DateTime now) throws Exception { static void saveDiffFileNotToRestore(GcsService gcsService, DateTime now) throws Exception {
saveDiffFile( saveDiffFile(
gcsService,
createCheckpoint(now), createCheckpoint(now),
CommitLogManifest.create(getBucketKey(1), now, null), CommitLogManifest.create(getBucketKey(1), now, null),
CommitLogMutation.create( CommitLogMutation.create(

View file

@ -18,19 +18,20 @@ import static com.google.common.truth.Truth.assertThat;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.googlecode.objectify.Key; import com.googlecode.objectify.Key;
import google.registry.model.ofy.TransactionInfo.Delete;
import google.registry.model.registrar.Registrar; import google.registry.model.registrar.Registrar;
import google.registry.model.reporting.HistoryEntry; import google.registry.model.reporting.HistoryEntry;
import google.registry.testing.AppEngineExtension; import google.registry.testing.AppEngineExtension;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.extension.RegisterExtension;
class TransactionInfoTest { class EntityWritePrioritiesTest {
@RegisterExtension @RegisterExtension
AppEngineExtension appEngine = new AppEngineExtension.Builder().withDatastore().build(); AppEngineExtension appEngine = new AppEngineExtension.Builder().withDatastore().build();
@Test @Test
void testGetWeight() { void testGetPriority() {
// just verify that the lowest is what we expect for both save and delete and verify that the // just verify that the lowest is what we expect for both save and delete and verify that the
// Registrar class is zero. // Registrar class is zero.
ImmutableMap<Key<?>, Object> actions = ImmutableMap<Key<?>, Object> actions =
@ -39,10 +40,12 @@ class TransactionInfoTest {
Key.create(HistoryEntry.class, 200), "fake history entry", Key.create(HistoryEntry.class, 200), "fake history entry",
Key.create(Registrar.class, 300), "fake registrar"); Key.create(Registrar.class, 300), "fake registrar");
ImmutableMap<Long, Integer> expectedValues = ImmutableMap<Long, Integer> expectedValues =
ImmutableMap.of(100L, TransactionInfo.DELETE_RANGE + 1, 200L, -1, 300L, 0); ImmutableMap.of(100L, EntityWritePriorities.DELETE_RANGE + 10, 200L, -10, 300L, 0);
for (ImmutableMap.Entry<Key<?>, Object> entry : actions.entrySet()) { for (ImmutableMap.Entry<Key<?>, Object> entry : actions.entrySet()) {
assertThat(TransactionInfo.getWeight(entry)) assertThat(
EntityWritePriorities.getEntityPriority(
entry.getKey().getKind(), Delete.SENTINEL.equals(entry.getValue())))
.isEqualTo(expectedValues.get(entry.getKey().getId())); .isEqualTo(expectedValues.get(entry.getKey().getId()));
} }
} }

View file

@ -24,16 +24,19 @@ import google.registry.model.ImmutableObject;
import google.registry.model.annotations.VirtualEntity; import google.registry.model.annotations.VirtualEntity;
import google.registry.model.common.EntityGroupRoot; import google.registry.model.common.EntityGroupRoot;
import google.registry.persistence.VKey; import google.registry.persistence.VKey;
import google.registry.schema.replay.DatastoreAndSqlEntity;
import google.registry.schema.replay.EntityTest.EntityForTesting; import google.registry.schema.replay.EntityTest.EntityForTesting;
import javax.persistence.Transient;
/** A test model object that can be persisted in any entity group. */ /** A test model object that can be persisted in any entity group. */
@Entity @Entity
@javax.persistence.Entity
@EntityForTesting @EntityForTesting
public class TestObject extends ImmutableObject { public class TestObject extends ImmutableObject implements DatastoreAndSqlEntity {
@Parent Key<EntityGroupRoot> parent; @Parent @Transient Key<EntityGroupRoot> parent;
@Id String id; @Id @javax.persistence.Id String id;
String field; String field;