From d19ed3ed09ef08c62888fa89d15d7edb7fd6821d Mon Sep 17 00:00:00 2001 From: Michael Muller Date: Wed, 17 Jun 2020 14:16:48 -0400 Subject: [PATCH] Implement a persistable Transaction object (#614) * Implement a persistable Transaction object Implement Transaction, which encapsulates a sequence of datastore mutations that can be serialized and written to the Cloud SQL Transaction table and subsequently replayed to Datastore from a backend cron job. * Changes requested in review * Add a mujtation count to the persisted format --- .../java/google/registry/model/ofy/Ofy.java | 11 + .../google/registry/persistence/VKey.java | 5 +- .../persistence/transaction/Transaction.java | 254 ++++++++++++++++++ .../transaction/TransactionTest.java | 117 ++++++++ 4 files changed, 386 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/google/registry/persistence/transaction/Transaction.java create mode 100644 core/src/test/java/google/registry/persistence/transaction/TransactionTest.java diff --git a/core/src/main/java/google/registry/model/ofy/Ofy.java b/core/src/main/java/google/registry/model/ofy/Ofy.java index 58c4263d5..0b86bf957 100644 --- a/core/src/main/java/google/registry/model/ofy/Ofy.java +++ b/core/src/main/java/google/registry/model/ofy/Ofy.java @@ -23,6 +23,7 @@ import static google.registry.util.CollectionUtils.union; import com.google.appengine.api.datastore.DatastoreFailureException; import com.google.appengine.api.datastore.DatastoreTimeoutException; +import com.google.appengine.api.datastore.Entity; import com.google.appengine.api.taskqueue.TransientFailureException; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; @@ -365,6 +366,16 @@ public class Ofy { return Key.create(info.bucketKey, CommitLogManifest.class, info.transactionTime.getMillis()); } + /** Convert an entity POJO to a datastore Entity. */ + public Entity toEntity(Object pojo) { + return ofy().save().toEntity(pojo); + } + + /** Convert a datastore entity to a POJO. */ + public Object toPojo(Entity entity) { + return ofy().load().fromEntity(entity); + } + /** * Returns the @Entity-annotated base class for an object that is either an {@code Key} or an * object of an entity class registered with Objectify. diff --git a/core/src/main/java/google/registry/persistence/VKey.java b/core/src/main/java/google/registry/persistence/VKey.java index 06404fb66..62624fc17 100644 --- a/core/src/main/java/google/registry/persistence/VKey.java +++ b/core/src/main/java/google/registry/persistence/VKey.java @@ -19,6 +19,7 @@ import static google.registry.util.PreconditionsUtils.checkArgumentNotNull; import com.googlecode.objectify.Key; import google.registry.model.ImmutableObject; +import java.io.Serializable; import java.util.Optional; /** @@ -27,7 +28,9 @@ import java.util.Optional; *

A VKey instance must contain both the JPA primary key for the referenced entity class and the * objectify key for the object. */ -public class VKey extends ImmutableObject { +public class VKey extends ImmutableObject implements Serializable { + + private static final long serialVersionUID = -5291472863840231240L; // The primary key for the referenced entity. private final Object primaryKey; diff --git a/core/src/main/java/google/registry/persistence/transaction/Transaction.java b/core/src/main/java/google/registry/persistence/transaction/Transaction.java new file mode 100644 index 000000000..176ae6610 --- /dev/null +++ b/core/src/main/java/google/registry/persistence/transaction/Transaction.java @@ -0,0 +1,254 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.persistence.transaction; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; +import static google.registry.persistence.transaction.TransactionManagerFactory.tm; + +import com.google.appengine.api.datastore.Entity; +import com.google.appengine.api.datastore.EntityTranslator; +import com.google.common.collect.ImmutableList; +import com.google.storage.onestore.v3.OnestoreEntity.EntityProto; +import google.registry.model.Buildable; +import google.registry.model.ImmutableObject; +import google.registry.model.ofy.ObjectifyService; +import google.registry.persistence.VKey; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +/** + * A SQL transaction that can be serialized and stored in its own table. + * + *

Transaction is used to store transactions committed to Cloud SQL in a Transaction table during + * the second phase of our migration, during which time we will be asynchronously replaying Cloud + * SQL transactions to datastore. + * + *

TODO(mmuller): Use these from {@link TransactionManager} to store the contents of an SQL + * transaction for asynchronous propagation to datastore. Implement a cron endpoint that reads them + * from the Transaction table and calls writeToDatastore(). + */ +public class Transaction extends ImmutableObject implements Buildable { + + // Version id for persisted objects. Use the creation date for the value, as it's reasonably + // unique and inherently informative. + private static final int VERSION_ID = 20200604; + + private transient ImmutableList mutations; + + /** Write the entire transaction to the datastore in a datastore transaction. */ + public void writeToDatastore() { + tm().transact( + () -> { + for (Mutation mutation : mutations) { + mutation.writeToDatastore(); + } + }); + } + + /** Serialize a transaction to a byte array. */ + public byte[] serialize() { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(baos); + + // Write the transaction version id. This serves as both a version id and a "magic number" to + // protect us against trying to deserialize some random byte array. + out.writeInt(VERSION_ID); + + // Write all of the mutations, preceded by their count. + out.writeInt(mutations.size()); + for (Mutation mutation : mutations) { + mutation.serializeTo(out); + } + + out.close(); + return baos.toByteArray(); + } catch (IOException e) { + throw new IllegalArgumentException(); + } + } + + static Transaction deserialize(byte[] serializedTransaction) throws IOException { + ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(serializedTransaction)); + + // Verify that the data is what we expect. + int version = in.readInt(); + checkArgument( + version == VERSION_ID, "Invalid version id. Expected %s but got %s", VERSION_ID, version); + + Transaction.Builder builder = new Transaction.Builder(); + int mutationCount = in.readInt(); + for (int i = 0; i < mutationCount; ++i) { + try { + builder.add(Mutation.deserializeFrom(in)); + } catch (EOFException e) { + throw new RuntimeException("Serialized transaction terminated prematurely", e); + } + } + if (in.read() != -1) { + throw new RuntimeException("Unread data at the end of a serialized transaction."); + } + return builder.build(); + } + + @Override + public Builder asBuilder() { + return new Builder(clone(this)); + } + + public static class Builder extends GenericBuilder { + + ImmutableList.Builder listBuilder = new ImmutableList.Builder(); + + Builder() {} + + protected Builder(Transaction instance) { + super(instance); + } + + public Builder addUpdate(Object entity) { + checkNotNull(entity); + listBuilder.add(new Update(entity)); + return thisCastToDerived(); + } + + public Builder addDelete(VKey key) { + checkNotNull(key); + listBuilder.add(new Delete(key)); + return thisCastToDerived(); + } + + /** Adds a mutation (mainly intended for serialization) */ + Builder add(Mutation mutation) { + checkNotNull(mutation); + listBuilder.add(mutation); + return thisCastToDerived(); + } + + @Override + public Transaction build() { + getInstance().mutations = listBuilder.build(); + return super.build(); + } + } + + /** Base class for database record mutations. */ + public abstract static class Mutation { + + enum Type { + UPDATE, + DELETE + }; + + /** Write the changes in the mutation to the datastore. */ + public abstract void writeToDatastore(); + + /** Serialize the mutation to the output stream. */ + public abstract void serializeTo(ObjectOutputStream out) throws IOException; + + /** Deserialize a mutation from the input stream. */ + public static Mutation deserializeFrom(ObjectInputStream in) throws IOException { + try { + Type type = (Type) in.readObject(); + switch (type) { + case UPDATE: + return Update.deserializeFrom(in); + case DELETE: + return Delete.deserializeFrom(in); + default: + throw new IllegalArgumentException("Unknown enum value: " + type); + } + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(e); + } + } + } + + /** + * Record update. + * + *

Note that we don't have to distinguish between add and update, since this is for replay into + * the datastore which makes no such distinction. + * + *

Update serializes its entity using Objectify serialization. + */ + public static class Update extends Mutation { + private Object entity; + + Update(Object entity) { + this.entity = entity; + } + + @Override + public void writeToDatastore() { + ofyTm().saveNewOrUpdate(entity); + } + + @Override + public void serializeTo(ObjectOutputStream out) throws IOException { + out.writeObject(Type.UPDATE); + Entity realEntity = ObjectifyService.ofy().toEntity(entity); + EntityProto proto = EntityTranslator.convertToPb(realEntity); + out.write(VERSION_ID); + proto.writeDelimitedTo(out); + } + + public static Update deserializeFrom(ObjectInputStream in) throws IOException { + EntityProto proto = new EntityProto(); + proto.parseDelimitedFrom(in); + return new Update(ObjectifyService.ofy().toPojo(EntityTranslator.createFromPb(proto))); + } + } + + /** + * Record deletion. + * + *

Delete serializes its VKey using Java native serialization. + */ + public static class Delete extends Mutation { + private final VKey key; + + Delete(VKey key) { + this.key = key; + } + + @Override + public void writeToDatastore() { + ofyTm().delete(key); + } + + @Override + public void serializeTo(ObjectOutputStream out) throws IOException { + out.writeObject(Type.DELETE); + + // Java object serialization works for this. + out.writeObject(key); + } + + public static Delete deserializeFrom(ObjectInputStream in) throws IOException { + try { + return new Delete((VKey) in.readObject()); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(e); + } + } + } +} diff --git a/core/src/test/java/google/registry/persistence/transaction/TransactionTest.java b/core/src/test/java/google/registry/persistence/transaction/TransactionTest.java new file mode 100644 index 000000000..b59098032 --- /dev/null +++ b/core/src/test/java/google/registry/persistence/transaction/TransactionTest.java @@ -0,0 +1,117 @@ +// Copyright 2020 The Nomulus Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package google.registry.persistence.transaction; + +import static com.google.common.truth.Truth.assertThat; +import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm; +import static org.junit.Assert.assertThrows; + +import com.googlecode.objectify.Key; +import com.googlecode.objectify.annotation.Entity; +import com.googlecode.objectify.annotation.Id; +import google.registry.model.ImmutableObject; +import google.registry.persistence.VKey; +import google.registry.testing.AppEngineRule; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.io.StreamCorruptedException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class TransactionTest { + + @RegisterExtension + public final AppEngineRule appEngine = + AppEngineRule.builder() + .withDatastoreAndCloudSql() + .withOfyTestEntities(TestEntity.class) + .withJpaUnitTestEntities(TestEntity.class) + .build(); + + TestEntity fooEntity, barEntity; + + public TransactionTest() {} + + @BeforeEach + public void setUp() { + fooEntity = new TestEntity("foo"); + barEntity = new TestEntity("bar"); + } + + @Test + public void testTransactionReplay() { + Transaction txn = new Transaction.Builder().addUpdate(fooEntity).addUpdate(barEntity).build(); + txn.writeToDatastore(); + + ofyTm() + .transact( + () -> { + assertThat(ofyTm().load(fooEntity.key())).isEqualTo(fooEntity); + assertThat(ofyTm().load(barEntity.key())).isEqualTo(barEntity); + }); + + txn = new Transaction.Builder().addDelete(barEntity.key()).build(); + txn.writeToDatastore(); + assertThat(ofyTm().checkExists(barEntity.key())).isEqualTo(false); + } + + @Test + public void testSerialization() throws Exception { + Transaction txn = new Transaction.Builder().addUpdate(barEntity).build(); + txn.writeToDatastore(); + + txn = new Transaction.Builder().addUpdate(fooEntity).addDelete(barEntity.key()).build(); + txn = Transaction.deserialize(txn.serialize()); + + txn.writeToDatastore(); + + ofyTm() + .transact( + () -> { + assertThat(ofyTm().load(fooEntity.key())).isEqualTo(fooEntity); + assertThat(ofyTm().checkExists(barEntity.key())).isEqualTo(false); + }); + } + + @Test + public void testDeserializationErrors() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(baos); + out.writeInt(12345); + out.close(); + assertThrows(IllegalArgumentException.class, () -> Transaction.deserialize(baos.toByteArray())); + + // Test with a short byte array. + assertThrows( + StreamCorruptedException.class, () -> Transaction.deserialize(new byte[] {1, 2, 3, 4})); + } + + @Entity(name = "TestEntity") + @javax.persistence.Entity(name = "TestEntity") + private static class TestEntity extends ImmutableObject { + @Id @javax.persistence.Id private String name; + + private TestEntity() {} + + private TestEntity(String name) { + this.name = name; + } + + public VKey key() { + return VKey.create(TestEntity.class, name, Key.create(this)); + } + } +}