Write commit logs during SQL->DS replay (#1438)

* Write commit logs during SQL->DS replay

Previously, we had no way to ignore read-only mode while still writing
commit log backups. Now, we added this so we can write commit logs in
the SQL->DS replay.

Note:
- When moving to either of the DATASTORE_PRIMARY stages, one must
manually set the SqlReplayCheckpoint first. We don't write to SQL with
backup in this stage because we already wrote the transaction in
question to Datastore. The fact that we manually set the replay
checkpoint means that we'll ignore the extra commit logs that might
otherwise cause problems if we switched back and forth from
DATASTORE_PRIMARY to SQL_PRIMARY.

- The commit logs written during the SQL_PRIMARY phase will, ideally, be
unused. We write them here only so that in the event of a rollback to
Datastore, we will have them for RDE purposes.
This commit is contained in:
gbrodman 2021-12-01 11:31:03 -05:00 committed by GitHub
parent f054bb2694
commit 207fc49d64
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 106 additions and 67 deletions

View file

@ -45,7 +45,7 @@ public class BackupUtils {
* OutputStream} in delimited protocol buffer format.
*/
static void serializeEntity(ImmutableObject entity, OutputStream stream) throws IOException {
EntityTranslator.convertToPb(auditedOfy().saveIgnoringReadOnly().toEntity(entity))
EntityTranslator.convertToPb(auditedOfy().saveIgnoringReadOnlyWithoutBackup().toEntity(entity))
.writeDelimitedTo(stream);
}

View file

@ -74,7 +74,7 @@ public final class CommitLogCheckpointAction implements Runnable {
return;
}
auditedOfy()
.saveIgnoringReadOnly()
.saveIgnoringReadOnlyWithoutBackup()
.entities(
checkpoint, CommitLogCheckpointRoot.create(checkpoint.getCheckpointTime()));
// Enqueue a diff task between previous and current checkpoints.

View file

@ -264,7 +264,7 @@ public class ReplayCommitLogsToSqlAction implements Runnable {
.ifPresent(
sqlEntity -> {
sqlEntity.beforeSqlSaveOnReplay();
jpaTm().putIgnoringReadOnly(sqlEntity);
jpaTm().putIgnoringReadOnlyWithoutBackup(sqlEntity);
});
} else {
// this should never happen, but we shouldn't fail on it
@ -297,7 +297,7 @@ public class ReplayCommitLogsToSqlAction implements Runnable {
&& !DatastoreOnlyEntity.class.isAssignableFrom(entityClass)
&& entityClass.getAnnotation(javax.persistence.Entity.class) != null) {
ReplaySpecializer.beforeSqlDelete(entityVKey);
jpaTm().deleteIgnoringReadOnly(entityVKey);
jpaTm().deleteIgnoringReadOnlyWithoutBackup(entityVKey);
}
} catch (Throwable t) {
logger.atSevere().log("Error when deleting key %s.", entityVKey);

View file

@ -228,8 +228,8 @@ public abstract class EppResource extends BackupGroupRoot implements Buildable {
/** Used when replaying from SQL to DS to populate the Datastore indexes. */
protected void saveIndexesToDatastore() {
ofyTm().putIgnoringReadOnly(ForeignKeyIndex.create(this, getDeletionTime()));
ofyTm().putIgnoringReadOnly(EppResourceIndex.create(Key.create(this)));
ofyTm().putIgnoringReadOnlyWithBackup(ForeignKeyIndex.create(this, getDeletionTime()));
ofyTm().putIgnoringReadOnlyWithBackup(EppResourceIndex.create(Key.create(this)));
}
/** EppResources that are loaded via foreign keys should implement this marker interface. */

View file

@ -204,7 +204,7 @@ public class DatabaseMigrationStateSchedule extends CrossTldSingleton implements
MigrationState.DATASTORE_ONLY,
"migrationTransitionMap must start with DATASTORE_ONLY");
validateTransitionAtCurrentTime(transitions);
jpaTm().putIgnoringReadOnly(new DatabaseMigrationStateSchedule(transitions));
jpaTm().putIgnoringReadOnlyWithoutBackup(new DatabaseMigrationStateSchedule(transitions));
CACHE.invalidateAll();
}

View file

@ -356,13 +356,25 @@ public class DatastoreTransactionManager implements TransactionManager {
}
@Override
public void putIgnoringReadOnly(Object entity) {
syncIfTransactionless(getOfy().saveIgnoringReadOnly().entities(toDatastoreEntity(entity)));
public void putIgnoringReadOnlyWithoutBackup(Object entity) {
syncIfTransactionless(
getOfy().saveIgnoringReadOnlyWithoutBackup().entities(toDatastoreEntity(entity)));
}
@Override
public void deleteIgnoringReadOnly(VKey<?> key) {
syncIfTransactionless(getOfy().deleteIgnoringReadOnly().key(key.getOfyKey()));
public void deleteIgnoringReadOnlyWithoutBackup(VKey<?> key) {
syncIfTransactionless(getOfy().deleteIgnoringReadOnlyWithoutBackup().key(key.getOfyKey()));
}
/** Performs the write ignoring read-only restrictions and also writes commit logs. */
public void putIgnoringReadOnlyWithBackup(Object entity) {
syncIfTransactionless(
getOfy().saveIgnoringReadOnlyWithBackup().entities(toDatastoreEntity(entity)));
}
/** Performs the delete ignoring read-only restrictions and also writes commit logs. */
public void deleteIgnoringReadOnlyWithBackup(VKey<?> key) {
syncIfTransactionless(getOfy().deleteIgnoringReadOnlyWithBackup().key(key.getOfyKey()));
}
/**

View file

@ -133,15 +133,7 @@ public class Ofy {
*/
public Deleter delete() {
assertNotReadOnlyMode();
return new AugmentedDeleter() {
@Override
protected void handleDeletion(Iterable<Key<?>> keys) {
assertInTransaction();
checkState(Streams.stream(keys).allMatch(Objects::nonNull), "Can't delete a null key.");
checkProhibitedAnnotations(keys, NotBackedUp.class, VirtualEntity.class);
TRANSACTION_INFO.get().putDeletes(keys);
}
};
return deleteIgnoringReadOnlyWithBackup();
}
/**
@ -151,7 +143,7 @@ public class Ofy {
*/
public Deleter deleteWithoutBackup() {
assertNotReadOnlyMode();
return deleteIgnoringReadOnly();
return deleteIgnoringReadOnlyWithoutBackup();
}
/**
@ -162,6 +154,41 @@ public class Ofy {
*/
public Saver save() {
assertNotReadOnlyMode();
return saveIgnoringReadOnlyWithBackup();
}
/**
* Save, without any augmentations except to check that we're not saving any virtual entities.
*
* <p>No backups get written.
*/
public Saver saveWithoutBackup() {
assertNotReadOnlyMode();
return saveIgnoringReadOnlyWithoutBackup();
}
/** Save, ignoring any backups or any read-only settings. */
public Saver saveIgnoringReadOnlyWithoutBackup() {
return new AugmentedSaver() {
@Override
protected void handleSave(Iterable<?> entities) {
checkProhibitedAnnotations(entities, VirtualEntity.class);
}
};
}
/** Delete, ignoring any backups or any read-only settings. */
public Deleter deleteIgnoringReadOnlyWithoutBackup() {
return new AugmentedDeleter() {
@Override
protected void handleDeletion(Iterable<Key<?>> keys) {
checkProhibitedAnnotations(keys, VirtualEntity.class);
}
};
}
/** Save, ignoring any read-only settings (but still write commit logs). */
public Saver saveIgnoringReadOnlyWithBackup() {
return new AugmentedSaver() {
@Override
protected void handleSave(Iterable<?> entities) {
@ -175,32 +202,15 @@ public class Ofy {
};
}
/**
* Save, without any augmentations except to check that we're not saving any virtual entities.
*
* <p>No backups get written.
*/
public Saver saveWithoutBackup() {
assertNotReadOnlyMode();
return saveIgnoringReadOnly();
}
/** Save, ignoring any backups or any read-only settings. */
public Saver saveIgnoringReadOnly() {
return new AugmentedSaver() {
@Override
protected void handleSave(Iterable<?> entities) {
checkProhibitedAnnotations(entities, VirtualEntity.class);
}
};
}
/** Delete, ignoring any backups or any read-only settings. */
public Deleter deleteIgnoringReadOnly() {
/** Delete, ignoring any read-only settings (but still write commit logs). */
public Deleter deleteIgnoringReadOnlyWithBackup() {
return new AugmentedDeleter() {
@Override
protected void handleDeletion(Iterable<Key<?>> keys) {
checkProhibitedAnnotations(keys, VirtualEntity.class);
assertInTransaction();
checkState(Streams.stream(keys).allMatch(Objects::nonNull), "Can't delete a null key.");
checkProhibitedAnnotations(keys, NotBackedUp.class, VirtualEntity.class);
TRANSACTION_INFO.get().putDeletes(keys);
}
};
}

View file

@ -149,7 +149,7 @@ public class ReplicateToDatastoreAction implements Runnable {
// Write the updated last transaction id to Datastore as part of this Datastore
// transaction.
auditedOfy()
.saveIgnoringReadOnly()
.saveIgnoringReadOnlyWithoutBackup()
.entity(lastSqlTxn.cloneWithNewTransactionId(nextTxnId));
logger.atInfo().log(
"Finished applying single transaction Cloud SQL -> Cloud Datastore.");

View file

@ -41,6 +41,6 @@ public class SqlReplayCheckpoint extends CrossTldSingleton implements SqlOnlyEnt
SqlReplayCheckpoint checkpoint = new SqlReplayCheckpoint();
checkpoint.lastReplayTime = lastReplayTime;
// this will overwrite the existing object due to the constant revisionId
jpaTm().putIgnoringReadOnly(checkpoint);
jpaTm().putIgnoringReadOnlyWithoutBackup(checkpoint);
}
}

View file

@ -293,7 +293,7 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri
create(resourceName, scope, requestStatusChecker.getLogId(), now, leaseLength);
// Locks are not parented under an EntityGroupRoot (so as to avoid write
// contention) and don't need to be backed up.
transactionManager.putIgnoringReadOnly(newLock);
transactionManager.putIgnoringReadOnlyWithoutBackup(newLock);
return AcquireResult.create(now, lock, newLock, lockState);
};
@ -325,7 +325,7 @@ public class Lock extends ImmutableObject implements DatastoreAndSqlEntity, Seri
// Use deleteIgnoringReadOnly() so that we don't create a commit log entry for deleting
// the lock.
logger.atInfo().log("Deleting lock: %s", lockId);
transactionManager.deleteIgnoringReadOnly(key);
transactionManager.deleteIgnoringReadOnlyWithoutBackup(key);
lockMetrics.recordRelease(
resourceName,

View file

@ -637,7 +637,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
}
@Override
public void putIgnoringReadOnly(Object entity) {
public void putIgnoringReadOnlyWithoutBackup(Object entity) {
checkArgumentNotNull(entity);
if (isEntityOfIgnoredClass(entity)) {
return;
@ -652,7 +652,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
}
@Override
public void deleteIgnoringReadOnly(VKey<?> key) {
public void deleteIgnoringReadOnlyWithoutBackup(VKey<?> key) {
checkArgumentNotNull(key, "key must be specified");
assertInTransaction();
if (IGNORED_ENTITY_CLASSES.contains(key.getKind())) {

View file

@ -242,7 +242,7 @@ public class Transaction extends ImmutableObject implements Buildable {
if (entity instanceof DatastoreEntity) {
((DatastoreEntity) entity).beforeDatastoreSaveOnReplay();
}
ofyTm().putIgnoringReadOnly(entity);
ofyTm().putIgnoringReadOnlyWithBackup(entity);
}
@Override
@ -280,7 +280,7 @@ public class Transaction extends ImmutableObject implements Buildable {
@Override
public void writeToDatastore() {
ofyTm().deleteIgnoringReadOnly(key);
ofyTm().deleteIgnoringReadOnlyWithBackup(key);
}
@Override

View file

@ -312,9 +312,9 @@ public interface TransactionManager {
/** Returns true if the transaction manager is DatastoreTransactionManager, false otherwise. */
boolean isOfy();
/** Performs the given write ignoring any read-only restrictions, for use only in replay. */
void putIgnoringReadOnly(Object entity);
/** Performs the write ignoring any read-only restrictions or backup, for use only in replay. */
void putIgnoringReadOnlyWithoutBackup(Object entity);
/** Performs the given delete ignoring any read-only restrictions, for use only in replay. */
void deleteIgnoringReadOnly(VKey<?> key);
/** Performs the delete ignoring any read-only restrictions or backup, for use only in replay. */
void deleteIgnoringReadOnlyWithoutBackup(VKey<?> key);
}

View file

@ -382,10 +382,10 @@ public class ReplayCommitLogsToSqlActionTest {
// even though the domain came first in the file
// 2. that the allocation token delete occurred after the insertions
InOrder inOrder = Mockito.inOrder(spy);
inOrder.verify(spy).putIgnoringReadOnly(any(ContactResource.class));
inOrder.verify(spy).putIgnoringReadOnly(any(DomainBase.class));
inOrder.verify(spy).deleteIgnoringReadOnly(toDelete.createVKey());
inOrder.verify(spy).putIgnoringReadOnly(any(SqlReplayCheckpoint.class));
inOrder.verify(spy).putIgnoringReadOnlyWithoutBackup(any(ContactResource.class));
inOrder.verify(spy).putIgnoringReadOnlyWithoutBackup(any(DomainBase.class));
inOrder.verify(spy).deleteIgnoringReadOnlyWithoutBackup(toDelete.createVKey());
inOrder.verify(spy).putIgnoringReadOnlyWithoutBackup(any(SqlReplayCheckpoint.class));
}
@Test
@ -424,8 +424,8 @@ public class ReplayCommitLogsToSqlActionTest {
// deletes have higher weight
ArgumentCaptor<Object> putCaptor = ArgumentCaptor.forClass(Object.class);
InOrder inOrder = Mockito.inOrder(spy);
inOrder.verify(spy).deleteIgnoringReadOnly(contact.createVKey());
inOrder.verify(spy).putIgnoringReadOnly(putCaptor.capture());
inOrder.verify(spy).deleteIgnoringReadOnlyWithoutBackup(contact.createVKey());
inOrder.verify(spy).putIgnoringReadOnlyWithoutBackup(putCaptor.capture());
assertThat(putCaptor.getValue().getClass()).isEqualTo(ContactResource.class);
assertThat(jpaTm().transact(() -> jpaTm().loadByKey(contact.createVKey()).getEmailAddress()))
.isEqualTo("replay@example.tld");
@ -467,9 +467,9 @@ public class ReplayCommitLogsToSqlActionTest {
});
runAndAssertSuccess(now.minusMinutes(1), 1, 1);
// jpaTm()::putIgnoringReadOnly should only have been called with the checkpoint and the lock
verify(spy, times(2)).putIgnoringReadOnly(any(SqlReplayCheckpoint.class));
verify(spy).putIgnoringReadOnly(any(Lock.class));
verify(spy, times(3)).putIgnoringReadOnly(any());
verify(spy, times(2)).putIgnoringReadOnlyWithoutBackup(any(SqlReplayCheckpoint.class));
verify(spy).putIgnoringReadOnlyWithoutBackup(any(Lock.class));
verify(spy, times(3)).putIgnoringReadOnlyWithoutBackup(any());
}
@Test

View file

@ -14,7 +14,9 @@
package google.registry.persistence.transaction;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.model.ofy.ObjectifyService.auditedOfy;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -23,6 +25,8 @@ import com.googlecode.objectify.Key;
import com.googlecode.objectify.annotation.Entity;
import com.googlecode.objectify.annotation.Id;
import google.registry.model.ImmutableObject;
import google.registry.model.ofy.CommitLogManifest;
import google.registry.model.ofy.CommitLogMutation;
import google.registry.model.ofy.Ofy;
import google.registry.persistence.VKey;
import google.registry.testing.AppEngineExtension;
@ -33,6 +37,8 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.StreamCorruptedException;
import java.util.Comparator;
import java.util.List;
import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -41,7 +47,8 @@ import org.junit.jupiter.api.extension.RegisterExtension;
class TransactionTest {
private final FakeClock fakeClock = new FakeClock(DateTime.parse("2000-01-01TZ"));
private final FakeClock fakeClock =
new FakeClock(DateTime.parse("2000-01-01TZ")).setAutoIncrementByOneMilli();
@RegisterExtension
final AppEngineExtension appEngine =
@ -83,6 +90,16 @@ class TransactionTest {
txn = new Transaction.Builder().addDelete(barEntity.key()).build();
txn.writeToDatastore();
assertThat(ofyTm().exists(barEntity.key())).isEqualTo(false);
assertThat(
auditedOfy().load().type(CommitLogMutation.class).list().stream()
.map(clm -> auditedOfy().load().<TestEntity>fromEntity(clm.getEntity()))
.collect(toImmutableSet()))
.containsExactly(fooEntity, barEntity);
List<CommitLogManifest> manifests = auditedOfy().load().type(CommitLogManifest.class).list();
manifests.sort(Comparator.comparing(CommitLogManifest::getCommitTime));
assertThat(manifests.get(0).getDeletions()).isEmpty();
assertThat(manifests.get(1).getDeletions()).containsExactly(Key.create(barEntity));
}
@Test

View file

@ -1488,7 +1488,7 @@ public class DatabaseHelper {
.transact(
() ->
jpaTm()
.putIgnoringReadOnly(
.putIgnoringReadOnlyWithoutBackup(
new DatabaseMigrationStateSchedule(
DatabaseMigrationStateSchedule.DEFAULT_TRANSITION_MAP)));
DatabaseMigrationStateSchedule.CACHE.invalidateAll();