Compare SQL and Datastore objects in SQL->DS replay testing (#1291)

Add double-replay to the Host*Flow tests to show how this works. The
only change to the double replay itself is that now we store the
Datastore entity in the TransactionEntity object -- this is because we
use Objectify to serialize the objects into bytes and we need it to know
about the entity in question.
This commit is contained in:
gbrodman 2021-08-23 09:05:14 -06:00 committed by GitHub
parent 603a95d719
commit 5c33286056
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 131 additions and 57 deletions

View file

@ -22,6 +22,7 @@ import static google.registry.request.Action.Method.GET;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.FluentLogger;
import google.registry.model.UpdateAutoTimestamp;
import google.registry.model.common.DatabaseMigrationStateSchedule;
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection;
@ -64,7 +65,7 @@ public class ReplicateToDatastoreAction implements Runnable {
@VisibleForTesting
public List<TransactionEntity> getTransactionBatch() {
// Get the next batch of transactions that we haven't replicated.
LastSqlTransaction lastSqlTxnBeforeBatch = ofyTm().transact(() -> LastSqlTransaction.load());
LastSqlTransaction lastSqlTxnBeforeBatch = ofyTm().transact(LastSqlTransaction::load);
try {
return jpaTm()
.transactWithoutBackup(
@ -85,53 +86,60 @@ public class ReplicateToDatastoreAction implements Runnable {
/**
* Apply a transaction to Datastore, returns true if there was a fatal error and the batch should
* be aborted.
*
* <p>TODO(gbrodman): this should throw an exception on error instead since it gives more
* information and we can't rely on the caller checking the boolean result.
*/
@VisibleForTesting
public boolean applyTransaction(TransactionEntity txnEntity) {
logger.atInfo().log("Applying a single transaction Cloud SQL -> Cloud Datastore");
return ofyTm()
.transact(
() -> {
// Reload the last transaction id, which could possibly have changed.
LastSqlTransaction lastSqlTxn = LastSqlTransaction.load();
long nextTxnId = lastSqlTxn.getTransactionId() + 1;
if (nextTxnId < txnEntity.getId()) {
// We're missing a transaction. This is bad. Transaction ids are supposed to
// increase monotonically, so we abort rather than applying anything out of
// order.
logger.atSevere().log(
"Missing transaction: last transaction id = %s, next available transaction "
+ "= %s",
nextTxnId - 1, txnEntity.getId());
return true;
} else if (nextTxnId > txnEntity.getId()) {
// We've already replayed this transaction. This shouldn't happen, as GAE cron
// is supposed to avoid overruns and this action shouldn't be executed from any
// other context, but it's not harmful as we can just ignore the transaction. Log
// it so that we know about it and move on.
logger.atWarning().log(
"Ignoring transaction %s, which appears to have already been applied.",
txnEntity.getId());
try (UpdateAutoTimestamp.DisableAutoUpdateResource disabler =
UpdateAutoTimestamp.disableAutoUpdate()) {
return ofyTm()
.transact(
() -> {
// Reload the last transaction id, which could possibly have changed.
LastSqlTransaction lastSqlTxn = LastSqlTransaction.load();
long nextTxnId = lastSqlTxn.getTransactionId() + 1;
if (nextTxnId < txnEntity.getId()) {
// We're missing a transaction. This is bad. Transaction ids are supposed to
// increase monotonically, so we abort rather than applying anything out of
// order.
logger.atSevere().log(
"Missing transaction: last transaction id = %s, next available transaction "
+ "= %s",
nextTxnId - 1, txnEntity.getId());
return true;
} else if (nextTxnId > txnEntity.getId()) {
// We've already replayed this transaction. This shouldn't happen, as GAE cron
// is supposed to avoid overruns and this action shouldn't be executed from any
// other context, but it's not harmful as we can just ignore the transaction. Log
// it so that we know about it and move on.
logger.atWarning().log(
"Ignoring transaction %s, which appears to have already been applied.",
txnEntity.getId());
return false;
}
logger.atInfo().log(
"Applying transaction %s to Cloud Datastore", txnEntity.getId());
// At this point, we know txnEntity is the correct next transaction, so write it
// to datastore.
try {
Transaction.deserialize(txnEntity.getContents()).writeToDatastore();
} catch (IOException e) {
throw new RuntimeException("Error during transaction deserialization.", e);
}
// Write the updated last transaction id to datastore as part of this datastore
// transaction.
auditedOfy().save().entity(lastSqlTxn.cloneWithNewTransactionId(nextTxnId));
logger.atInfo().log(
"Finished applying single transaction Cloud SQL -> Cloud Datastore");
return false;
}
logger.atInfo().log("Applying transaction %s to Cloud Datastore", txnEntity.getId());
// At this point, we know txnEntity is the correct next transaction, so write it
// to datastore.
try {
Transaction.deserialize(txnEntity.getContents()).writeToDatastore();
} catch (IOException e) {
throw new RuntimeException("Error during transaction deserialization.", e);
}
// Write the updated last transaction id to datastore as part of this datastore
// transaction.
auditedOfy().save().entity(lastSqlTxn.cloneWithNewTransactionId(nextTxnId));
logger.atInfo().log(
"Finished applying single transaction Cloud SQL -> Cloud Datastore");
return false;
});
});
}
}
@Override

View file

@ -21,10 +21,12 @@ import static google.registry.persistence.transaction.TransactionManagerFactory.
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.EntityTranslator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
import google.registry.model.Buildable;
import google.registry.model.ImmutableObject;
import google.registry.model.replay.SqlEntity;
import google.registry.persistence.VKey;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -57,6 +59,11 @@ public class Transaction extends ImmutableObject implements Buildable {
private transient ImmutableList<Mutation> mutations;
@VisibleForTesting
public ImmutableList<Mutation> getMutations() {
return mutations;
}
/** Write the entire transaction to the datastore in a datastore transaction. */
public void writeToDatastore() {
ofyTm()
@ -128,7 +135,7 @@ public class Transaction extends ImmutableObject implements Buildable {
* Returns true if we are serializing a transaction in the current thread.
*
* <p>This should be checked by any Ofy translators prior to making any changes to an entity's
* state representation based on the assumption that we are currently pseristing the entity to
* state representation based on the assumption that we are currently persisting the entity to
* datastore.
*/
public static boolean inSerializationMode() {
@ -146,7 +153,7 @@ public class Transaction extends ImmutableObject implements Buildable {
public static class Builder extends GenericBuilder<Transaction, Builder> {
ImmutableList.Builder listBuilder = new ImmutableList.Builder();
ImmutableList.Builder<Mutation> listBuilder = new ImmutableList.Builder<>();
Builder() {}
@ -224,7 +231,8 @@ public class Transaction extends ImmutableObject implements Buildable {
private Object entity;
Update(Object entity) {
this.entity = entity;
this.entity =
(entity instanceof SqlEntity) ? ((SqlEntity) entity).toDatastoreEntity().get() : entity;
}
@Override
@ -241,6 +249,11 @@ public class Transaction extends ImmutableObject implements Buildable {
proto.writeDelimitedTo(out);
}
@VisibleForTesting
public Object getEntity() {
return entity;
}
public static Update deserializeFrom(ObjectInputStream in) throws IOException {
EntityProto proto = new EntityProto();
proto.parseDelimitedFrom(in);
@ -273,9 +286,14 @@ public class Transaction extends ImmutableObject implements Buildable {
out.writeObject(key);
}
@VisibleForTesting
public VKey<?> getKey() {
return key;
}
public static Delete deserializeFrom(ObjectInputStream in) throws IOException {
try {
return new Delete((VKey) in.readObject());
return new Delete((VKey<?>) in.readObject());
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(e);
}

View file

@ -14,6 +14,7 @@
package google.registry.persistence.transaction;
import google.registry.model.ImmutableObject;
import google.registry.model.replay.SqlOnlyEntity;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
@ -28,7 +29,7 @@ import javax.persistence.Table;
*/
@Entity
@Table(name = "Transaction")
public class TransactionEntity implements SqlOnlyEntity {
public class TransactionEntity extends ImmutableObject implements SqlOnlyEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)

View file

@ -36,7 +36,7 @@ class HostCheckFlowTest extends ResourceCheckFlowTestCase<HostCheckFlow, HostRes
@Order(value = Order.DEFAULT - 2)
@RegisterExtension
final ReplayExtension replayExtension = ReplayExtension.createWithCompare(clock);
final ReplayExtension replayExtension = ReplayExtension.createWithDoubleReplay(clock);
HostCheckFlowTest() {
setEppInput("host_check.xml");

View file

@ -67,7 +67,7 @@ class HostCreateFlowTest extends ResourceFlowTestCase<HostCreateFlow, HostResour
@Order(value = Order.DEFAULT - 2)
@RegisterExtension
final ReplayExtension replayExtension = ReplayExtension.createWithCompare(clock);
final ReplayExtension replayExtension = ReplayExtension.createWithDoubleReplay(clock);
private void setEppHostCreateInput(String hostName, String hostAddrs) {
setEppInput(

View file

@ -65,7 +65,7 @@ class HostDeleteFlowTest extends ResourceFlowTestCase<HostDeleteFlow, HostResour
@Order(value = Order.DEFAULT - 2)
@RegisterExtension
final ReplayExtension replayExtension = ReplayExtension.createWithCompare(clock);
final ReplayExtension replayExtension = ReplayExtension.createWithDoubleReplay(clock);
@BeforeEach
void initFlowTest() {

View file

@ -93,7 +93,7 @@ class HostUpdateFlowTest extends ResourceFlowTestCase<HostUpdateFlow, HostResour
@Order(value = Order.DEFAULT - 2)
@RegisterExtension
final ReplayExtension replayExtension = ReplayExtension.createWithCompare(clock);
final ReplayExtension replayExtension = ReplayExtension.createWithDoubleReplay(clock);
private void setEppHostUpdateInput(
String oldHostName, String newHostName, String ipOrStatusToAdd, String ipOrStatusToRem) {

View file

@ -15,8 +15,10 @@
package google.registry.testing;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static google.registry.model.ImmutableObjectSubject.assertAboutImmutableObjects;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
@ -28,9 +30,16 @@ import google.registry.model.ofy.ReplayQueue;
import google.registry.model.ofy.TransactionInfo;
import google.registry.model.replay.DatastoreEntity;
import google.registry.model.replay.ReplicateToDatastoreAction;
import google.registry.model.replay.SqlEntity;
import google.registry.persistence.VKey;
import google.registry.persistence.transaction.JpaTransactionManagerImpl;
import google.registry.persistence.transaction.Transaction;
import google.registry.persistence.transaction.Transaction.Delete;
import google.registry.persistence.transaction.Transaction.Mutation;
import google.registry.persistence.transaction.Transaction.Update;
import google.registry.persistence.transaction.TransactionEntity;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.junit.jupiter.api.extension.AfterEachCallback;
@ -186,12 +195,50 @@ public class ReplayExtension implements BeforeEachCallback, AfterEachCallback {
return;
}
// TODO(mmuller): Verify that all entities are the same across both databases.
for (TransactionEntity txn : sqlToDsReplicator.getTransactionBatch()) {
if (sqlToDsReplicator.applyTransaction(txn)) {
break;
List<TransactionEntity> transactionBatch;
do {
transactionBatch = sqlToDsReplicator.getTransactionBatch();
for (TransactionEntity txn : transactionBatch) {
if (sqlToDsReplicator.applyTransaction(txn)) {
throw new RuntimeException(
"Error when replaying to Datastore in tests; see logs for more details");
}
if (compare) {
ofyTm().transact(() -> compareSqlTransaction(txn));
}
clock.advanceOneMilli();
}
} while (!transactionBatch.isEmpty());
}
/** Verifies that the replaying the SQL transaction created the same entities in Datastore. */
private void compareSqlTransaction(TransactionEntity transactionEntity) {
Transaction transaction;
try {
transaction = Transaction.deserialize(transactionEntity.getContents());
} catch (IOException e) {
throw new RuntimeException("Error during transaction deserialization.", e);
}
for (Mutation mutation : transaction.getMutations()) {
if (mutation instanceof Update) {
Update update = (Update) mutation;
ImmutableObject fromTransactionEntity = (ImmutableObject) update.getEntity();
ImmutableObject fromDatastore = ofyTm().loadByEntity(fromTransactionEntity);
if (fromDatastore instanceof SqlEntity) {
// We store the Datastore entity in the transaction, so use that if necessary
fromDatastore = (ImmutableObject) ((SqlEntity) fromDatastore).toDatastoreEntity().get();
}
assertAboutImmutableObjects().that(fromDatastore).hasCorrectHashValue();
assertAboutImmutableObjects()
.that(fromDatastore)
.isEqualAcrossDatabases(fromTransactionEntity);
} else {
Delete delete = (Delete) mutation;
VKey<?> key = delete.getKey();
assertWithMessage(String.format("Expected key %s to not exist in Datastore", key))
.that(ofyTm().exists(key))
.isFalse();
}
clock.advanceOneMilli();
}
}
}