Implement a persistable Transaction object (#614)

* Implement a persistable Transaction object

Implement Transaction, which encapsulates a sequence of datastore mutations
that can be serialized and written to the Cloud SQL Transaction table and
subsequently replayed to Datastore from a backend cron job.

* Changes requested in review

* Add a mujtation count to the persisted format
This commit is contained in:
Michael Muller 2020-06-17 14:16:48 -04:00 committed by GitHub
parent 37797a230d
commit d19ed3ed09
4 changed files with 386 additions and 1 deletions

View file

@ -23,6 +23,7 @@ import static google.registry.util.CollectionUtils.union;
import com.google.appengine.api.datastore.DatastoreFailureException; import com.google.appengine.api.datastore.DatastoreFailureException;
import com.google.appengine.api.datastore.DatastoreTimeoutException; import com.google.appengine.api.datastore.DatastoreTimeoutException;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.taskqueue.TransientFailureException; import com.google.appengine.api.taskqueue.TransientFailureException;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
@ -365,6 +366,16 @@ public class Ofy {
return Key.create(info.bucketKey, CommitLogManifest.class, info.transactionTime.getMillis()); return Key.create(info.bucketKey, CommitLogManifest.class, info.transactionTime.getMillis());
} }
/** Convert an entity POJO to a datastore Entity. */
public Entity toEntity(Object pojo) {
return ofy().save().toEntity(pojo);
}
/** Convert a datastore entity to a POJO. */
public Object toPojo(Entity entity) {
return ofy().load().fromEntity(entity);
}
/** /**
* Returns the @Entity-annotated base class for an object that is either an {@code Key<?>} or an * Returns the @Entity-annotated base class for an object that is either an {@code Key<?>} or an
* object of an entity class registered with Objectify. * object of an entity class registered with Objectify.

View file

@ -19,6 +19,7 @@ import static google.registry.util.PreconditionsUtils.checkArgumentNotNull;
import com.googlecode.objectify.Key; import com.googlecode.objectify.Key;
import google.registry.model.ImmutableObject; import google.registry.model.ImmutableObject;
import java.io.Serializable;
import java.util.Optional; import java.util.Optional;
/** /**
@ -27,7 +28,9 @@ import java.util.Optional;
* <p>A VKey instance must contain both the JPA primary key for the referenced entity class and the * <p>A VKey instance must contain both the JPA primary key for the referenced entity class and the
* objectify key for the object. * objectify key for the object.
*/ */
public class VKey<T> extends ImmutableObject { public class VKey<T> extends ImmutableObject implements Serializable {
private static final long serialVersionUID = -5291472863840231240L;
// The primary key for the referenced entity. // The primary key for the referenced entity.
private final Object primaryKey; private final Object primaryKey;

View file

@ -0,0 +1,254 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.persistence.transaction;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.EntityTranslator;
import com.google.common.collect.ImmutableList;
import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
import google.registry.model.Buildable;
import google.registry.model.ImmutableObject;
import google.registry.model.ofy.ObjectifyService;
import google.registry.persistence.VKey;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
/**
* A SQL transaction that can be serialized and stored in its own table.
*
* <p>Transaction is used to store transactions committed to Cloud SQL in a Transaction table during
* the second phase of our migration, during which time we will be asynchronously replaying Cloud
* SQL transactions to datastore.
*
* <p>TODO(mmuller): Use these from {@link TransactionManager} to store the contents of an SQL
* transaction for asynchronous propagation to datastore. Implement a cron endpoint that reads them
* from the Transaction table and calls writeToDatastore().
*/
public class Transaction extends ImmutableObject implements Buildable {
// Version id for persisted objects. Use the creation date for the value, as it's reasonably
// unique and inherently informative.
private static final int VERSION_ID = 20200604;
private transient ImmutableList<Mutation> mutations;
/** Write the entire transaction to the datastore in a datastore transaction. */
public void writeToDatastore() {
tm().transact(
() -> {
for (Mutation mutation : mutations) {
mutation.writeToDatastore();
}
});
}
/** Serialize a transaction to a byte array. */
public byte[] serialize() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(baos);
// Write the transaction version id. This serves as both a version id and a "magic number" to
// protect us against trying to deserialize some random byte array.
out.writeInt(VERSION_ID);
// Write all of the mutations, preceded by their count.
out.writeInt(mutations.size());
for (Mutation mutation : mutations) {
mutation.serializeTo(out);
}
out.close();
return baos.toByteArray();
} catch (IOException e) {
throw new IllegalArgumentException();
}
}
static Transaction deserialize(byte[] serializedTransaction) throws IOException {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(serializedTransaction));
// Verify that the data is what we expect.
int version = in.readInt();
checkArgument(
version == VERSION_ID, "Invalid version id. Expected %s but got %s", VERSION_ID, version);
Transaction.Builder builder = new Transaction.Builder();
int mutationCount = in.readInt();
for (int i = 0; i < mutationCount; ++i) {
try {
builder.add(Mutation.deserializeFrom(in));
} catch (EOFException e) {
throw new RuntimeException("Serialized transaction terminated prematurely", e);
}
}
if (in.read() != -1) {
throw new RuntimeException("Unread data at the end of a serialized transaction.");
}
return builder.build();
}
@Override
public Builder asBuilder() {
return new Builder(clone(this));
}
public static class Builder extends GenericBuilder<Transaction, Builder> {
ImmutableList.Builder listBuilder = new ImmutableList.Builder();
Builder() {}
protected Builder(Transaction instance) {
super(instance);
}
public Builder addUpdate(Object entity) {
checkNotNull(entity);
listBuilder.add(new Update(entity));
return thisCastToDerived();
}
public Builder addDelete(VKey<?> key) {
checkNotNull(key);
listBuilder.add(new Delete(key));
return thisCastToDerived();
}
/** Adds a mutation (mainly intended for serialization) */
Builder add(Mutation mutation) {
checkNotNull(mutation);
listBuilder.add(mutation);
return thisCastToDerived();
}
@Override
public Transaction build() {
getInstance().mutations = listBuilder.build();
return super.build();
}
}
/** Base class for database record mutations. */
public abstract static class Mutation {
enum Type {
UPDATE,
DELETE
};
/** Write the changes in the mutation to the datastore. */
public abstract void writeToDatastore();
/** Serialize the mutation to the output stream. */
public abstract void serializeTo(ObjectOutputStream out) throws IOException;
/** Deserialize a mutation from the input stream. */
public static Mutation deserializeFrom(ObjectInputStream in) throws IOException {
try {
Type type = (Type) in.readObject();
switch (type) {
case UPDATE:
return Update.deserializeFrom(in);
case DELETE:
return Delete.deserializeFrom(in);
default:
throw new IllegalArgumentException("Unknown enum value: " + type);
}
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(e);
}
}
}
/**
* Record update.
*
* <p>Note that we don't have to distinguish between add and update, since this is for replay into
* the datastore which makes no such distinction.
*
* <p>Update serializes its entity using Objectify serialization.
*/
public static class Update extends Mutation {
private Object entity;
Update(Object entity) {
this.entity = entity;
}
@Override
public void writeToDatastore() {
ofyTm().saveNewOrUpdate(entity);
}
@Override
public void serializeTo(ObjectOutputStream out) throws IOException {
out.writeObject(Type.UPDATE);
Entity realEntity = ObjectifyService.ofy().toEntity(entity);
EntityProto proto = EntityTranslator.convertToPb(realEntity);
out.write(VERSION_ID);
proto.writeDelimitedTo(out);
}
public static Update deserializeFrom(ObjectInputStream in) throws IOException {
EntityProto proto = new EntityProto();
proto.parseDelimitedFrom(in);
return new Update(ObjectifyService.ofy().toPojo(EntityTranslator.createFromPb(proto)));
}
}
/**
* Record deletion.
*
* <p>Delete serializes its VKey using Java native serialization.
*/
public static class Delete extends Mutation {
private final VKey<?> key;
Delete(VKey<?> key) {
this.key = key;
}
@Override
public void writeToDatastore() {
ofyTm().delete(key);
}
@Override
public void serializeTo(ObjectOutputStream out) throws IOException {
out.writeObject(Type.DELETE);
// Java object serialization works for this.
out.writeObject(key);
}
public static Delete deserializeFrom(ObjectInputStream in) throws IOException {
try {
return new Delete((VKey) in.readObject());
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(e);
}
}
}
}

View file

@ -0,0 +1,117 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.persistence.transaction;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
import static org.junit.Assert.assertThrows;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.annotation.Entity;
import com.googlecode.objectify.annotation.Id;
import google.registry.model.ImmutableObject;
import google.registry.persistence.VKey;
import google.registry.testing.AppEngineRule;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.io.StreamCorruptedException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
public class TransactionTest {
@RegisterExtension
public final AppEngineRule appEngine =
AppEngineRule.builder()
.withDatastoreAndCloudSql()
.withOfyTestEntities(TestEntity.class)
.withJpaUnitTestEntities(TestEntity.class)
.build();
TestEntity fooEntity, barEntity;
public TransactionTest() {}
@BeforeEach
public void setUp() {
fooEntity = new TestEntity("foo");
barEntity = new TestEntity("bar");
}
@Test
public void testTransactionReplay() {
Transaction txn = new Transaction.Builder().addUpdate(fooEntity).addUpdate(barEntity).build();
txn.writeToDatastore();
ofyTm()
.transact(
() -> {
assertThat(ofyTm().load(fooEntity.key())).isEqualTo(fooEntity);
assertThat(ofyTm().load(barEntity.key())).isEqualTo(barEntity);
});
txn = new Transaction.Builder().addDelete(barEntity.key()).build();
txn.writeToDatastore();
assertThat(ofyTm().checkExists(barEntity.key())).isEqualTo(false);
}
@Test
public void testSerialization() throws Exception {
Transaction txn = new Transaction.Builder().addUpdate(barEntity).build();
txn.writeToDatastore();
txn = new Transaction.Builder().addUpdate(fooEntity).addDelete(barEntity.key()).build();
txn = Transaction.deserialize(txn.serialize());
txn.writeToDatastore();
ofyTm()
.transact(
() -> {
assertThat(ofyTm().load(fooEntity.key())).isEqualTo(fooEntity);
assertThat(ofyTm().checkExists(barEntity.key())).isEqualTo(false);
});
}
@Test
public void testDeserializationErrors() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(baos);
out.writeInt(12345);
out.close();
assertThrows(IllegalArgumentException.class, () -> Transaction.deserialize(baos.toByteArray()));
// Test with a short byte array.
assertThrows(
StreamCorruptedException.class, () -> Transaction.deserialize(new byte[] {1, 2, 3, 4}));
}
@Entity(name = "TestEntity")
@javax.persistence.Entity(name = "TestEntity")
private static class TestEntity extends ImmutableObject {
@Id @javax.persistence.Id private String name;
private TestEntity() {}
private TestEntity(String name) {
this.name = name;
}
public VKey<TestEntity> key() {
return VKey.create(TestEntity.class, name, Key.create(this));
}
}
}