diff --git a/core/src/main/java/google/registry/model/EntityClasses.java b/core/src/main/java/google/registry/model/EntityClasses.java index e495adc16..009c96ffb 100644 --- a/core/src/main/java/google/registry/model/EntityClasses.java +++ b/core/src/main/java/google/registry/model/EntityClasses.java @@ -52,6 +52,7 @@ import google.registry.model.tmch.ClaimsListShard; import google.registry.model.tmch.ClaimsListShard.ClaimsListRevision; import google.registry.model.tmch.ClaimsListShard.ClaimsListSingleton; import google.registry.model.tmch.TmchCrl; +import google.registry.schema.replay.LastSqlTransaction; /** Sets of classes of the Objectify-registered entities in use throughout the model. */ public final class EntityClasses { @@ -90,6 +91,7 @@ public final class EntityClasses { HostResource.class, KmsSecret.class, KmsSecretRevision.class, + LastSqlTransaction.class, Lock.class, PollMessage.class, PollMessage.Autorenew.class, diff --git a/core/src/main/java/google/registry/model/ofy/Ofy.java b/core/src/main/java/google/registry/model/ofy/Ofy.java index 0b86bf957..d7257f5f2 100644 --- a/core/src/main/java/google/registry/model/ofy/Ofy.java +++ b/core/src/main/java/google/registry/model/ofy/Ofy.java @@ -116,7 +116,7 @@ public class Ofy { return ofy().getTransaction() != null; } - void assertInTransaction() { + public void assertInTransaction() { checkState(inTransaction(), "Must be called in a transaction"); } diff --git a/core/src/main/java/google/registry/persistence/transaction/Transaction.java b/core/src/main/java/google/registry/persistence/transaction/Transaction.java index 01a6f32e5..7bc223464 100644 --- a/core/src/main/java/google/registry/persistence/transaction/Transaction.java +++ b/core/src/main/java/google/registry/persistence/transaction/Transaction.java @@ -86,7 +86,7 @@ public class Transaction extends ImmutableObject implements Buildable { } } - static Transaction deserialize(byte[] serializedTransaction) throws IOException { + public static Transaction deserialize(byte[] serializedTransaction) throws IOException { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(serializedTransaction)); // Verify that the data is what we expect. diff --git a/core/src/main/java/google/registry/persistence/transaction/TransactionEntity.java b/core/src/main/java/google/registry/persistence/transaction/TransactionEntity.java index 483af8343..a36756fbd 100644 --- a/core/src/main/java/google/registry/persistence/transaction/TransactionEntity.java +++ b/core/src/main/java/google/registry/persistence/transaction/TransactionEntity.java @@ -34,9 +34,9 @@ public class TransactionEntity implements SqlEntity { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) - long id; + private long id; - byte[] contents; + private byte[] contents; TransactionEntity() {} @@ -48,4 +48,12 @@ public class TransactionEntity implements SqlEntity { public Optional toDatastoreEntity() { return Optional.empty(); // Not persisted in Datastore per se } + + public long getId() { + return id; + } + + public byte[] getContents() { + return contents; + } } diff --git a/core/src/main/java/google/registry/schema/replay/LastSqlTransaction.java b/core/src/main/java/google/registry/schema/replay/LastSqlTransaction.java new file mode 100644 index 000000000..ce124a076 --- /dev/null +++ b/core/src/main/java/google/registry/schema/replay/LastSqlTransaction.java @@ -0,0 +1,71 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.schema.replay; + +import static com.google.common.base.Preconditions.checkArgument; +import static google.registry.model.ofy.ObjectifyService.ofy; + +import com.google.common.annotations.VisibleForTesting; +import com.googlecode.objectify.Key; +import com.googlecode.objectify.annotation.Entity; +import com.googlecode.objectify.annotation.Id; +import google.registry.model.ImmutableObject; + +/** Datastore entity to keep track of the last SQL transaction imported into the datastore. */ +@Entity +public class LastSqlTransaction extends ImmutableObject implements DatastoreOnlyEntity { + + /** The key for this singleton. */ + public static final Key KEY = Key.create(LastSqlTransaction.class, 1); + + @SuppressWarnings("unused") + @Id + private long id = 1; + + private long transactionId; + + LastSqlTransaction() {} + + @VisibleForTesting + LastSqlTransaction(long newTransactionId) { + transactionId = newTransactionId; + } + + LastSqlTransaction cloneWithNewTransactionId(long transactionId) { + checkArgument( + transactionId > this.transactionId, + "New transaction id (%s) must be greater than the current transaction id (%s)", + transactionId, + this.transactionId); + return new LastSqlTransaction(transactionId); + } + + long getTransactionId() { + return transactionId; + } + + /** + * Loads the instance. + * + *

Must be called within an Ofy transaction. + * + *

Creates a new instance of the singleton if it is not already present in Cloud Datastore, + */ + static LastSqlTransaction load() { + ofy().assertInTransaction(); + LastSqlTransaction result = ofy().load().key(KEY).now(); + return result == null ? new LastSqlTransaction() : result; + } +} diff --git a/core/src/main/java/google/registry/schema/replay/ReplicateToDatastoreAction.java b/core/src/main/java/google/registry/schema/replay/ReplicateToDatastoreAction.java new file mode 100644 index 000000000..604c03925 --- /dev/null +++ b/core/src/main/java/google/registry/schema/replay/ReplicateToDatastoreAction.java @@ -0,0 +1,136 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.schema.replay; + +import static google.registry.model.ofy.ObjectifyService.ofy; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; +import static google.registry.request.Action.Method.GET; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.flogger.FluentLogger; +import google.registry.persistence.transaction.Transaction; +import google.registry.persistence.transaction.TransactionEntity; +import google.registry.request.Action; +import google.registry.request.auth.Auth; +import java.io.IOException; +import java.util.List; +import javax.persistence.NoResultException; + +/** Cron task to replicate from Cloud SQL to datastore. */ +@Action( + service = Action.Service.BACKEND, + path = ReplicateToDatastoreAction.PATH, + method = GET, + automaticallyPrintOk = true, + auth = Auth.AUTH_INTERNAL_OR_ADMIN) +class ReplicateToDatastoreAction implements Runnable { + public static final String PATH = "/_dr/cron/replicateToDatastore"; + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + /** + * Number of transactions to fetch from SQL. The rationale for 200 is that we're processing these + * every minute and our production instance currently does about 2 mutations per second, so this + * should generally be enough to scoop up all of the transactions for the past minute. + */ + public static final int BATCH_SIZE = 200; + + @VisibleForTesting + List getTransactionBatch() { + // Get the next batch of transactions that we haven't replicated. + LastSqlTransaction lastSqlTxnBeforeBatch = ofyTm().transact(() -> LastSqlTransaction.load()); + try { + return jpaTm() + .transact( + () -> + jpaTm() + .getEntityManager() + .createQuery( + "SELECT txn FROM TransactionEntity txn WHERE id >" + + " :lastId ORDER BY id") + .setParameter("lastId", lastSqlTxnBeforeBatch.getTransactionId()) + .setMaxResults(BATCH_SIZE) + .getResultList()); + } catch (NoResultException e) { + return ImmutableList.of(); + } + } + + /** + * Apply a transaction to datastore, returns true if there was a fatal error and the batch should + * be aborted. + */ + @VisibleForTesting + boolean applyTransaction(TransactionEntity txnEntity) { + logger.atInfo().log("Applying a single transaction Cloud SQL -> Cloud Datastore"); + return ofyTm() + .transact( + () -> { + // Reload the last transaction id, which could possibly have changed. + LastSqlTransaction lastSqlTxn = LastSqlTransaction.load(); + long nextTxnId = lastSqlTxn.getTransactionId() + 1; + if (nextTxnId < txnEntity.getId()) { + // We're missing a transaction. This is bad. Transaction ids are supposed to + // increase monotonically, so we abort rather than applying anything out of + // order. + logger.atSevere().log( + "Missing transaction: last transaction id = %s, next available transaction " + + "= %s", + nextTxnId - 1, txnEntity.getId()); + return true; + } else if (nextTxnId > txnEntity.getId()) { + // We've already replayed this transaction. This shouldn't happen, as GAE cron + // is supposed to avoid overruns and this action shouldn't be executed from any + // other context, but it's not harmful as we can just ignore the transaction. Log + // it so that we know about it and move on. + logger.atWarning().log( + "Ignoring transaction %s, which appears to have already been applied.", + txnEntity.getId()); + return false; + } + + logger.atInfo().log("Applying transaction %s to Cloud Datastore", txnEntity.getId()); + + // At this point, we know txnEntity is the correct next transaction, so write it + // to datastore. + try { + Transaction.deserialize(txnEntity.getContents()).writeToDatastore(); + } catch (IOException e) { + throw new RuntimeException("Error during transaction deserialization.", e); + } + + // Write the updated last transaction id to datastore as part of this datastore + // transaction. + ofy().save().entity(lastSqlTxn.cloneWithNewTransactionId(nextTxnId)); + logger.atInfo().log( + "Finished applying single transaction Cloud SQL -> Cloud Datastore"); + return false; + }); + } + + @Override + public void run() { + // TODO(b/181758163): Deal with objects that don't exist in Cloud SQL, e.g. ForeignKeyIndex, + // EppResourceIndex. + logger.atInfo().log("Processing transaction replay batch Cloud SQL -> Cloud Datastore"); + for (TransactionEntity txnEntity : getTransactionBatch()) { + if (applyTransaction(txnEntity)) { + break; + } + } + logger.atInfo().log("Done processing transaction replay batch Cloud SQL -> Cloud Datastore"); + } +} diff --git a/core/src/test/java/google/registry/persistence/transaction/TransactionTest.java b/core/src/test/java/google/registry/persistence/transaction/TransactionTest.java index e954bb2db..889f8b73a 100644 --- a/core/src/test/java/google/registry/persistence/transaction/TransactionTest.java +++ b/core/src/test/java/google/registry/persistence/transaction/TransactionTest.java @@ -112,7 +112,7 @@ class TransactionTest { }); TransactionEntity txnEnt = jpaTm().transact(() -> jpaTm().loadByKey(VKey.createSql(TransactionEntity.class, 1L))); - Transaction txn = Transaction.deserialize(txnEnt.contents); + Transaction txn = Transaction.deserialize(txnEnt.getContents()); txn.writeToDatastore(); ofyTm() .transact( diff --git a/core/src/test/java/google/registry/schema/replay/ReplicateToDatastoreActionTest.java b/core/src/test/java/google/registry/schema/replay/ReplicateToDatastoreActionTest.java new file mode 100644 index 000000000..cc01b7249 --- /dev/null +++ b/core/src/test/java/google/registry/schema/replay/ReplicateToDatastoreActionTest.java @@ -0,0 +1,188 @@ +// Copyright 2021 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.schema.replay; + +import static com.google.common.truth.Truth.assertThat; +import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; +import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; +import static google.registry.testing.LogsSubject.assertAboutLogs; + +import com.google.common.testing.TestLogHandler; +import com.googlecode.objectify.Key; +import com.googlecode.objectify.annotation.Entity; +import com.googlecode.objectify.annotation.Id; +import google.registry.config.RegistryConfig; +import google.registry.model.ImmutableObject; +import google.registry.persistence.VKey; +import google.registry.persistence.transaction.TransactionEntity; +import google.registry.testing.AppEngineExtension; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class ReplicateToDatastoreActionTest { + + @RegisterExtension + public final AppEngineExtension appEngine = + AppEngineExtension.builder() + .withDatastoreAndCloudSql() + .withOfyTestEntities(TestEntity.class) + .withJpaUnitTestEntities(TestEntity.class) + .build(); + + ReplicateToDatastoreAction task = new ReplicateToDatastoreAction(); + + TestLogHandler logHandler; + + public ReplicateToDatastoreActionTest() {} + + @BeforeEach + public void setUp() { + RegistryConfig.overrideCloudSqlReplicateTransactions(true); + logHandler = new TestLogHandler(); + Logger.getLogger(ReplicateToDatastoreAction.class.getCanonicalName()).addHandler(logHandler); + } + + @AfterEach + public void tearDown() { + RegistryConfig.overrideCloudSqlReplicateTransactions(false); + } + + @Test + public void testReplication() { + TestEntity foo = new TestEntity("foo"); + TestEntity bar = new TestEntity("bar"); + TestEntity baz = new TestEntity("baz"); + + jpaTm() + .transact( + () -> { + jpaTm().insert(foo); + jpaTm().insert(bar); + }); + task.run(); + + assertThat(ofyTm().transact(() -> ofyTm().loadByKey(foo.key()))).isEqualTo(foo); + assertThat(ofyTm().transact(() -> ofyTm().loadByKey(bar.key()))).isEqualTo(bar); + assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(baz.key())).isPresent()).isFalse(); + + jpaTm() + .transact( + () -> { + jpaTm().delete(bar.key()); + jpaTm().insert(baz); + }); + task.run(); + + assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(bar.key()).isPresent())).isFalse(); + assertThat(ofyTm().transact(() -> ofyTm().loadByKey(baz.key()))).isEqualTo(baz); + } + + @Test + public void testReplayFromLastTxn() { + TestEntity foo = new TestEntity("foo"); + TestEntity bar = new TestEntity("bar"); + + // Write a transaction containing "foo". + jpaTm().transact(() -> jpaTm().insert(foo)); + task.run(); + + // Verify that it propagated to datastore, then remove "foo" directly from datastore. + assertThat(ofyTm().transact(() -> ofyTm().loadByKey(foo.key()))).isEqualTo(foo); + ofyTm().transact(() -> ofyTm().delete(foo.key())); + + // Write "bar" + jpaTm().transact(() -> jpaTm().insert(bar)); + task.run(); + + // If we replayed only the most recent transaction, we should have "bar" but not "foo". + assertThat(ofyTm().transact(() -> ofyTm().loadByKey(bar.key()))).isEqualTo(bar); + assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(foo.key()).isPresent())).isFalse(); + } + + @Test + public void testUnintentionalConcurrency() { + TestEntity foo = new TestEntity("foo"); + TestEntity bar = new TestEntity("bar"); + + // Write a transaction and run just the batch fetch. + jpaTm().transact(() -> jpaTm().insert(foo)); + List txns1 = task.getTransactionBatch(); + assertThat(txns1).hasSize(1); + + // Write a second transaction and do another batch fetch. + jpaTm().transact(() -> jpaTm().insert(bar)); + List txns2 = task.getTransactionBatch(); + assertThat(txns2).hasSize(2); + + // Apply the first batch. + assertThat(task.applyTransaction(txns1.get(0))).isFalse(); + + // Remove the foo record so we can ensure that this transaction doesn't get doublle-played. + ofyTm().transact(() -> ofyTm().delete(foo.key())); + + // Apply the second batch. + for (TransactionEntity txn : txns2) { + assertThat(task.applyTransaction(txn)).isFalse(); + } + + // Verify that the first transaction didn't get replayed but the second one did. + assertThat(ofyTm().transact(() -> ofyTm().loadByKeyIfPresent(foo.key()).isPresent())).isFalse(); + assertThat(ofyTm().transact(() -> ofyTm().loadByKey(bar.key()))).isEqualTo(bar); + assertAboutLogs() + .that(logHandler) + .hasLogAtLevelWithMessage( + Level.WARNING, "Ignoring transaction 1, which appears to have already been applied."); + } + + @Test + public void testMissingTransactions() { + // Write a transaction (should have a transaction id of 1). + TestEntity foo = new TestEntity("foo"); + jpaTm().transact(() -> jpaTm().insert(foo)); + + // Force the last transaction id back to -1 so that we look for transaction 0. + ofyTm().transact(() -> ofyTm().insert(new LastSqlTransaction(-1))); + + List txns = task.getTransactionBatch(); + assertThat(txns).hasSize(1); + assertThat(task.applyTransaction(txns.get(0))).isTrue(); + assertAboutLogs() + .that(logHandler) + .hasLogAtLevelWithMessage( + Level.SEVERE, + "Missing transaction: last transaction id = -1, next available transaction = 1"); + } + + @Entity(name = "ReplicationTestEntity") + @javax.persistence.Entity(name = "TestEntity") + private static class TestEntity extends ImmutableObject { + @Id @javax.persistence.Id private String name; + + private TestEntity() {} + + private TestEntity(String name) { + this.name = name; + } + + public VKey key() { + return VKey.create(TestEntity.class, name, Key.create(this)); + } + } +} diff --git a/core/src/test/resources/google/registry/export/backup_kinds.txt b/core/src/test/resources/google/registry/export/backup_kinds.txt index 0fe660183..ca5d12ad2 100644 --- a/core/src/test/resources/google/registry/export/backup_kinds.txt +++ b/core/src/test/resources/google/registry/export/backup_kinds.txt @@ -13,6 +13,7 @@ HistoryEntry HostResource KmsSecret KmsSecretRevision +LastSqlTransaction Modification OneTime PollMessage diff --git a/core/src/test/resources/google/registry/model/schema.txt b/core/src/test/resources/google/registry/model/schema.txt index fcdecfaf9..fb8edf116 100644 --- a/core/src/test/resources/google/registry/model/schema.txt +++ b/core/src/test/resources/google/registry/model/schema.txt @@ -916,3 +916,7 @@ class google.registry.persistence.DomainHistoryVKey { java.lang.Long historyRevisionId; java.lang.String repoId; } +class google.registry.schema.replay.LastSqlTransaction { + @Id long id; + long transactionId; +}