diff --git a/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java b/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java index ccb419f31..91b269e2b 100644 --- a/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java +++ b/core/src/main/java/google/registry/model/replay/ReplicateToDatastoreAction.java @@ -22,6 +22,7 @@ import static google.registry.request.Action.Method.GET; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.flogger.FluentLogger; +import google.registry.model.UpdateAutoTimestamp; import google.registry.model.common.DatabaseMigrationStateSchedule; import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState; import google.registry.model.common.DatabaseMigrationStateSchedule.ReplayDirection; @@ -64,7 +65,7 @@ public class ReplicateToDatastoreAction implements Runnable { @VisibleForTesting public List getTransactionBatch() { // Get the next batch of transactions that we haven't replicated. - LastSqlTransaction lastSqlTxnBeforeBatch = ofyTm().transact(() -> LastSqlTransaction.load()); + LastSqlTransaction lastSqlTxnBeforeBatch = ofyTm().transact(LastSqlTransaction::load); try { return jpaTm() .transactWithoutBackup( @@ -85,53 +86,60 @@ public class ReplicateToDatastoreAction implements Runnable { /** * Apply a transaction to Datastore, returns true if there was a fatal error and the batch should * be aborted. + * + *

TODO(gbrodman): this should throw an exception on error instead since it gives more + * information and we can't rely on the caller checking the boolean result. */ @VisibleForTesting public boolean applyTransaction(TransactionEntity txnEntity) { logger.atInfo().log("Applying a single transaction Cloud SQL -> Cloud Datastore"); - return ofyTm() - .transact( - () -> { - // Reload the last transaction id, which could possibly have changed. - LastSqlTransaction lastSqlTxn = LastSqlTransaction.load(); - long nextTxnId = lastSqlTxn.getTransactionId() + 1; - if (nextTxnId < txnEntity.getId()) { - // We're missing a transaction. This is bad. Transaction ids are supposed to - // increase monotonically, so we abort rather than applying anything out of - // order. - logger.atSevere().log( - "Missing transaction: last transaction id = %s, next available transaction " - + "= %s", - nextTxnId - 1, txnEntity.getId()); - return true; - } else if (nextTxnId > txnEntity.getId()) { - // We've already replayed this transaction. This shouldn't happen, as GAE cron - // is supposed to avoid overruns and this action shouldn't be executed from any - // other context, but it's not harmful as we can just ignore the transaction. Log - // it so that we know about it and move on. - logger.atWarning().log( - "Ignoring transaction %s, which appears to have already been applied.", - txnEntity.getId()); + try (UpdateAutoTimestamp.DisableAutoUpdateResource disabler = + UpdateAutoTimestamp.disableAutoUpdate()) { + return ofyTm() + .transact( + () -> { + // Reload the last transaction id, which could possibly have changed. + LastSqlTransaction lastSqlTxn = LastSqlTransaction.load(); + long nextTxnId = lastSqlTxn.getTransactionId() + 1; + if (nextTxnId < txnEntity.getId()) { + // We're missing a transaction. This is bad. Transaction ids are supposed to + // increase monotonically, so we abort rather than applying anything out of + // order. + logger.atSevere().log( + "Missing transaction: last transaction id = %s, next available transaction " + + "= %s", + nextTxnId - 1, txnEntity.getId()); + return true; + } else if (nextTxnId > txnEntity.getId()) { + // We've already replayed this transaction. This shouldn't happen, as GAE cron + // is supposed to avoid overruns and this action shouldn't be executed from any + // other context, but it's not harmful as we can just ignore the transaction. Log + // it so that we know about it and move on. + logger.atWarning().log( + "Ignoring transaction %s, which appears to have already been applied.", + txnEntity.getId()); + return false; + } + + logger.atInfo().log( + "Applying transaction %s to Cloud Datastore", txnEntity.getId()); + + // At this point, we know txnEntity is the correct next transaction, so write it + // to datastore. + try { + Transaction.deserialize(txnEntity.getContents()).writeToDatastore(); + } catch (IOException e) { + throw new RuntimeException("Error during transaction deserialization.", e); + } + + // Write the updated last transaction id to datastore as part of this datastore + // transaction. + auditedOfy().save().entity(lastSqlTxn.cloneWithNewTransactionId(nextTxnId)); + logger.atInfo().log( + "Finished applying single transaction Cloud SQL -> Cloud Datastore"); return false; - } - - logger.atInfo().log("Applying transaction %s to Cloud Datastore", txnEntity.getId()); - - // At this point, we know txnEntity is the correct next transaction, so write it - // to datastore. - try { - Transaction.deserialize(txnEntity.getContents()).writeToDatastore(); - } catch (IOException e) { - throw new RuntimeException("Error during transaction deserialization.", e); - } - - // Write the updated last transaction id to datastore as part of this datastore - // transaction. - auditedOfy().save().entity(lastSqlTxn.cloneWithNewTransactionId(nextTxnId)); - logger.atInfo().log( - "Finished applying single transaction Cloud SQL -> Cloud Datastore"); - return false; - }); + }); + } } @Override diff --git a/core/src/main/java/google/registry/persistence/transaction/Transaction.java b/core/src/main/java/google/registry/persistence/transaction/Transaction.java index efad3050d..c77238c97 100644 --- a/core/src/main/java/google/registry/persistence/transaction/Transaction.java +++ b/core/src/main/java/google/registry/persistence/transaction/Transaction.java @@ -21,10 +21,12 @@ import static google.registry.persistence.transaction.TransactionManagerFactory. import com.google.appengine.api.datastore.Entity; import com.google.appengine.api.datastore.EntityTranslator; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.storage.onestore.v3.OnestoreEntity.EntityProto; import google.registry.model.Buildable; import google.registry.model.ImmutableObject; +import google.registry.model.replay.SqlEntity; import google.registry.persistence.VKey; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -57,6 +59,11 @@ public class Transaction extends ImmutableObject implements Buildable { private transient ImmutableList mutations; + @VisibleForTesting + public ImmutableList getMutations() { + return mutations; + } + /** Write the entire transaction to the datastore in a datastore transaction. */ public void writeToDatastore() { ofyTm() @@ -128,7 +135,7 @@ public class Transaction extends ImmutableObject implements Buildable { * Returns true if we are serializing a transaction in the current thread. * *

This should be checked by any Ofy translators prior to making any changes to an entity's - * state representation based on the assumption that we are currently pseristing the entity to + * state representation based on the assumption that we are currently persisting the entity to * datastore. */ public static boolean inSerializationMode() { @@ -146,7 +153,7 @@ public class Transaction extends ImmutableObject implements Buildable { public static class Builder extends GenericBuilder { - ImmutableList.Builder listBuilder = new ImmutableList.Builder(); + ImmutableList.Builder listBuilder = new ImmutableList.Builder<>(); Builder() {} @@ -224,7 +231,8 @@ public class Transaction extends ImmutableObject implements Buildable { private Object entity; Update(Object entity) { - this.entity = entity; + this.entity = + (entity instanceof SqlEntity) ? ((SqlEntity) entity).toDatastoreEntity().get() : entity; } @Override @@ -241,6 +249,11 @@ public class Transaction extends ImmutableObject implements Buildable { proto.writeDelimitedTo(out); } + @VisibleForTesting + public Object getEntity() { + return entity; + } + public static Update deserializeFrom(ObjectInputStream in) throws IOException { EntityProto proto = new EntityProto(); proto.parseDelimitedFrom(in); @@ -273,9 +286,14 @@ public class Transaction extends ImmutableObject implements Buildable { out.writeObject(key); } + @VisibleForTesting + public VKey getKey() { + return key; + } + public static Delete deserializeFrom(ObjectInputStream in) throws IOException { try { - return new Delete((VKey) in.readObject()); + return new Delete((VKey) in.readObject()); } catch (ClassNotFoundException e) { throw new IllegalArgumentException(e); } diff --git a/core/src/main/java/google/registry/persistence/transaction/TransactionEntity.java b/core/src/main/java/google/registry/persistence/transaction/TransactionEntity.java index 8ed18eadd..97166d532 100644 --- a/core/src/main/java/google/registry/persistence/transaction/TransactionEntity.java +++ b/core/src/main/java/google/registry/persistence/transaction/TransactionEntity.java @@ -14,6 +14,7 @@ package google.registry.persistence.transaction; +import google.registry.model.ImmutableObject; import google.registry.model.replay.SqlOnlyEntity; import javax.persistence.Entity; import javax.persistence.GeneratedValue; @@ -28,7 +29,7 @@ import javax.persistence.Table; */ @Entity @Table(name = "Transaction") -public class TransactionEntity implements SqlOnlyEntity { +public class TransactionEntity extends ImmutableObject implements SqlOnlyEntity { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) diff --git a/core/src/test/java/google/registry/flows/host/HostCheckFlowTest.java b/core/src/test/java/google/registry/flows/host/HostCheckFlowTest.java index b2c5edeb5..451689981 100644 --- a/core/src/test/java/google/registry/flows/host/HostCheckFlowTest.java +++ b/core/src/test/java/google/registry/flows/host/HostCheckFlowTest.java @@ -36,7 +36,7 @@ class HostCheckFlowTest extends ResourceCheckFlowTestCase transactionBatch; + do { + transactionBatch = sqlToDsReplicator.getTransactionBatch(); + for (TransactionEntity txn : transactionBatch) { + if (sqlToDsReplicator.applyTransaction(txn)) { + throw new RuntimeException( + "Error when replaying to Datastore in tests; see logs for more details"); + } + if (compare) { + ofyTm().transact(() -> compareSqlTransaction(txn)); + } + clock.advanceOneMilli(); + } + } while (!transactionBatch.isEmpty()); + } + + /** Verifies that the replaying the SQL transaction created the same entities in Datastore. */ + private void compareSqlTransaction(TransactionEntity transactionEntity) { + Transaction transaction; + try { + transaction = Transaction.deserialize(transactionEntity.getContents()); + } catch (IOException e) { + throw new RuntimeException("Error during transaction deserialization.", e); + } + for (Mutation mutation : transaction.getMutations()) { + if (mutation instanceof Update) { + Update update = (Update) mutation; + ImmutableObject fromTransactionEntity = (ImmutableObject) update.getEntity(); + ImmutableObject fromDatastore = ofyTm().loadByEntity(fromTransactionEntity); + if (fromDatastore instanceof SqlEntity) { + // We store the Datastore entity in the transaction, so use that if necessary + fromDatastore = (ImmutableObject) ((SqlEntity) fromDatastore).toDatastoreEntity().get(); + } + assertAboutImmutableObjects().that(fromDatastore).hasCorrectHashValue(); + assertAboutImmutableObjects() + .that(fromDatastore) + .isEqualAcrossDatabases(fromTransactionEntity); + } else { + Delete delete = (Delete) mutation; + VKey key = delete.getKey(); + assertWithMessage(String.format("Expected key %s to not exist in Datastore", key)) + .that(ofyTm().exists(key)) + .isFalse(); } - clock.advanceOneMilli(); } } }