Integrate transaction persistence into JpaTM (#717)

* Integrate transaction persistence into JpaTM

Store the serialized transaction whenever we commit from the JPA transaction
manager.  This change also adds:

-   The Transaction table.
-   The TransactionEntity which is stored in it.
-   Changes to the test infrastructure to register the TransactionEntity for
    tests where we don't load the nomulus schema.
-   A new configuration variable to allow us to turn the transaction
    persistence functionality on and off (default is "off").

* Changes for review.

* Incremented sequence number of flyway file
This commit is contained in:
Michael Muller 2020-07-28 19:23:44 -04:00 committed by GitHub
parent a56713e4be
commit a802be2a9b
13 changed files with 250 additions and 16 deletions

View file

@ -1527,6 +1527,21 @@ public final class RegistryConfig {
return CONFIG_SETTINGS.get().hibernate.hikariIdleTimeout;
}
/**
* Returns whether to replicate cloud SQL transactions to datastore.
*
* <p>If true, all cloud SQL transactions will be persisted as TransactionEntity objects in the
* Transaction table and replayed against datastore in a cron job.
*/
public static boolean getCloudSqlReplicateTransactions() {
return CONFIG_SETTINGS.get().cloudSql.replicateTransactions;
}
@VisibleForTesting
public static void overrideCloudSqlReplicateTransactions(boolean replicateTransactions) {
CONFIG_SETTINGS.get().cloudSql.replicateTransactions = replicateTransactions;
}
/** Returns the roid suffix to be used for the roids of all contacts and hosts. */
public static String getContactAndHostRoidSuffix() {
return CONFIG_SETTINGS.get().registryPolicy.contactAndHostRoidSuffix;

View file

@ -122,6 +122,7 @@ public class RegistryConfigSettings {
public String jdbcUrl;
public String username;
public String instanceConnectionName;
public boolean replicateTransactions;
}
/** Configuration for Apache Beam (Cloud Dataflow). */

View file

@ -230,6 +230,9 @@ cloudSql:
username: username
# This name is used by Cloud SQL when connecting to the database.
instanceConnectionName: project-id:region:instance-id
# Set this to true to replicate cloud SQL transactions to datastore in the
# background.
replicateTransactions: false
cloudDns:
# Set both properties to null in Production.

View file

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.FluentLogger;
import google.registry.config.RegistryConfig;
import google.registry.persistence.VKey;
import google.registry.util.Clock;
import java.lang.reflect.Field;
@ -101,9 +102,9 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
EntityTransaction txn = txnInfo.entityManager.getTransaction();
try {
txn.begin();
txnInfo.inTransaction = true;
txnInfo.transactionTime = clock.nowUtc();
txnInfo.start(clock);
T result = work.get();
txnInfo.recordTransaction();
txn.commit();
return result;
} catch (RuntimeException | Error e) {
@ -177,6 +178,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
checkArgumentNotNull(entity, "entity must be specified");
assertInTransaction();
getEntityManager().persist(entity);
transactionInfo.get().addUpdate(entity);
}
@Override
@ -191,6 +193,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
checkArgumentNotNull(entity, "entity must be specified");
assertInTransaction();
getEntityManager().merge(entity);
transactionInfo.get().addUpdate(entity);
}
@Override
@ -206,6 +209,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
assertInTransaction();
checkArgument(checkExists(entity), "Given entity does not exist");
getEntityManager().merge(entity);
transactionInfo.get().addUpdate(entity);
}
@Override
@ -297,6 +301,7 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
String.format("DELETE FROM %s WHERE %s", entityType.getName(), getAndClause(entityIds));
Query query = getEntityManager().createQuery(sql);
entityIds.forEach(entityId -> query.setParameter(entityId.name, entityId.value));
transactionInfo.get().addDelete(key);
return query.executeUpdate();
}
@ -387,9 +392,23 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
boolean inTransaction = false;
DateTime transactionTime;
// Serializable representation of the transaction to be persisted in the Transaction table.
Transaction.Builder contentsBuilder;
/** Start a new transaction. */
private void start(Clock clock) {
checkArgumentNotNull(clock);
inTransaction = true;
transactionTime = clock.nowUtc();
if (RegistryConfig.getCloudSqlReplicateTransactions()) {
contentsBuilder = new Transaction.Builder();
}
}
private void clear() {
inTransaction = false;
transactionTime = null;
contentsBuilder = null;
if (entityManager != null) {
// Close this EntityManager just let the connection pool be able to reuse it, it doesn't
// close the underlying database connection.
@ -397,5 +416,26 @@ public class JpaTransactionManagerImpl implements JpaTransactionManager {
entityManager = null;
}
}
private void addUpdate(Object entity) {
if (contentsBuilder != null) {
contentsBuilder.addUpdate(entity);
}
}
private void addDelete(VKey<?> key) {
if (contentsBuilder != null) {
contentsBuilder.addDelete(key);
}
}
private void recordTransaction() {
if (contentsBuilder != null) {
Transaction persistedTxn = contentsBuilder.build();
if (!persistedTxn.isEmpty()) {
entityManager.persist(persistedTxn.toEntity());
}
}
}
}
}

View file

@ -109,11 +109,20 @@ public class Transaction extends ImmutableObject implements Buildable {
return builder.build();
}
/** Returns true if the transaction contains no mutations. */
public boolean isEmpty() {
return mutations.isEmpty();
}
@Override
public Builder asBuilder() {
return new Builder(clone(this));
}
public final TransactionEntity toEntity() {
return new TransactionEntity(serialize());
}
public static class Builder extends GenericBuilder<Transaction, Builder> {
ImmutableList.Builder listBuilder = new ImmutableList.Builder();

View file

@ -0,0 +1,43 @@
// Copyright 2020 The Nomulus Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package google.registry.persistence.transaction;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
/**
* Object to be stored in the transaction table.
*
* <p>This consists of a sequential identifier and a serialized {@code Tranaction} object.
*/
@Entity
@Table(name = "Transaction")
public class TransactionEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
long id;
byte[] contents;
TransactionEntity() {}
TransactionEntity(byte[] contents) {
this.contents = contents;
}
}

View file

@ -31,6 +31,7 @@
<class>google.registry.model.registrar.RegistrarContact</class>
<class>google.registry.model.registry.label.PremiumList</class>
<class>google.registry.model.reporting.Spec11ThreatMatch</class>
<class>google.registry.persistence.transaction.TransactionEntity</class>
<class>google.registry.schema.domain.RegistryLock</class>
<class>google.registry.schema.tmch.ClaimsList</class>
<class>google.registry.schema.cursor.Cursor</class>

View file

@ -42,7 +42,13 @@ public class JpaEntityCoverageExtension implements BeforeEachCallback, AfterEach
// TODO(weiminyu): update this set when entities written to Cloud SQL and tests are added.
private static final ImmutableSet<String> IGNORE_ENTITIES =
ImmutableSet.of(
"DelegationSignerData", "DesignatedContact", "GracePeriod", "RegistrarContact");
"DelegationSignerData",
"DesignatedContact",
"GracePeriod",
"RegistrarContact",
// TransactionEntity is trivial, its persistence is tested in TransactionTest.
"TransactionEntity");
private static final ImmutableSet<Class> ALL_JPA_ENTITIES =
PersistenceXmlUtility.getManagedClasses().stream()

View file

@ -165,10 +165,10 @@ abstract class JpaTransactionManagerExtension implements BeforeEachCallback, Aft
}
executeSql(readSqlInClassPath(DB_CLEANUP_SQL_PATH));
initScriptPath.ifPresent(path -> executeSql(readSqlInClassPath(path)));
if (!extraEntityClasses.isEmpty()) {
if (!includeNomulusSchema) {
File tempSqlFile = File.createTempFile("tempSqlFile", ".sql");
tempSqlFile.deleteOnExit();
exporter.export(extraEntityClasses, tempSqlFile);
exporter.export(getTestEntities(), tempSqlFile);
executeSql(new String(Files.readAllBytes(tempSqlFile.toPath()), StandardCharsets.UTF_8));
}
@ -187,11 +187,7 @@ abstract class JpaTransactionManagerExtension implements BeforeEachCallback, Aft
assertReasonableNumDbConnections();
emf =
createEntityManagerFactory(
getJdbcUrl(),
database.getUsername(),
database.getPassword(),
properties,
extraEntityClasses);
getJdbcUrl(), database.getUsername(), database.getPassword(), properties);
emfEntityHash = entityHash;
}
@ -309,11 +305,7 @@ abstract class JpaTransactionManagerExtension implements BeforeEachCallback, Aft
/** Constructs the {@link EntityManagerFactory} instance. */
private EntityManagerFactory createEntityManagerFactory(
String jdbcUrl,
String username,
String password,
ImmutableMap<String, String> configs,
ImmutableList<Class> extraEntityClasses) {
String jdbcUrl, String username, String password, ImmutableMap<String, String> configs) {
HashMap<String, String> properties = Maps.newHashMap(configs);
properties.put(Environment.URL, jdbcUrl);
properties.put(Environment.USER, username);
@ -342,7 +334,14 @@ abstract class JpaTransactionManagerExtension implements BeforeEachCallback, Aft
descriptor.getManagedClassNames().addAll(nonEntityClasses);
}
extraEntityClasses.stream().map(Class::getName).forEach(descriptor::addClasses);
getTestEntities().stream().map(Class::getName).forEach(descriptor::addClasses);
return Bootstrap.getEntityManagerFactoryBuilder(descriptor, properties).build();
}
private ImmutableList<Class> getTestEntities() {
// We have to add the TransactionEntity to extra entities, as this is required by the
// transaction replication mechanism.
return Stream.concat(extraEntityClasses.stream(), Stream.of(TransactionEntity.class))
.collect(toImmutableList());
}
}

View file

@ -15,16 +15,19 @@
package google.registry.persistence.transaction;
import static com.google.common.truth.Truth.assertThat;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.ofyTm;
import static org.junit.Assert.assertThrows;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.annotation.Entity;
import com.googlecode.objectify.annotation.Id;
import google.registry.config.RegistryConfig;
import google.registry.model.ImmutableObject;
import google.registry.persistence.VKey;
import google.registry.testing.AppEngineExtension;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.StreamCorruptedException;
import org.junit.jupiter.api.BeforeEach;
@ -99,6 +102,51 @@ public class TransactionTest {
StreamCorruptedException.class, () -> Transaction.deserialize(new byte[] {1, 2, 3, 4}));
}
@Test
public void testTransactionSerialization() throws IOException {
RegistryConfig.overrideCloudSqlReplicateTransactions(true);
try {
jpaTm()
.transact(
() -> {
jpaTm().saveNew(fooEntity);
jpaTm().saveNew(barEntity);
});
TransactionEntity txnEnt =
jpaTm().transact(() -> jpaTm().load(VKey.createSql(TransactionEntity.class, 1L)));
Transaction txn = Transaction.deserialize(txnEnt.contents);
txn.writeToDatastore();
ofyTm()
.transact(
() -> {
assertThat(ofyTm().load(fooEntity.key())).isEqualTo(fooEntity);
assertThat(ofyTm().load(barEntity.key())).isEqualTo(barEntity);
});
// Verify that no transaction was persisted for the load transaction.
assertThat(
jpaTm()
.transact(() -> jpaTm().checkExists(VKey.createSql(TransactionEntity.class, 2L))))
.isFalse();
} finally {
RegistryConfig.overrideCloudSqlReplicateTransactions(false);
}
}
@Test
public void testTransactionSerializationDisabledByDefault() {
jpaTm()
.transact(
() -> {
jpaTm().saveNew(fooEntity);
jpaTm().saveNew(barEntity);
});
assertThat(
jpaTm()
.transact(() -> jpaTm().checkExists(VKey.createSql(TransactionEntity.class, 1L))))
.isFalse();
}
@Entity(name = "TxnTestEntity")
@javax.persistence.Entity(name = "TestEntity")
private static class TestEntity extends ImmutableObject {

View file

@ -0,0 +1,19 @@
-- Copyright 2020 The Nomulus Authors. All Rights Reserved.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
create table "Transaction" (
id bigserial not null,
contents bytea,
primary key (id)
);

View file

@ -494,6 +494,12 @@ create sequence history_id_sequence start 1 increment 1;
tld text not null,
primary key (id)
);
create table "Transaction" (
id bigserial not null,
contents bytea,
primary key (id)
);
create index IDXih4b2tea127p5rb61gje6e1y2 on "BillingCancellation" (registrar_id);
create index IDX2exdfbx6oiiwnhr8j6gjpqt2j on "BillingCancellation" (event_time);
create index IDXqa3g92jc17e8dtiaviy4fet4x on "BillingCancellation" (billing_time);

View file

@ -770,6 +770,35 @@ CREATE SEQUENCE public."SafeBrowsingThreat_id_seq"
ALTER SEQUENCE public."SafeBrowsingThreat_id_seq" OWNED BY public."Spec11ThreatMatch".id;
--
-- Name: Transaction; Type: TABLE; Schema: public; Owner: -
--
CREATE TABLE public."Transaction" (
id bigint NOT NULL,
contents bytea
);
--
-- Name: Transaction_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
CREATE SEQUENCE public."Transaction_id_seq"
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: Transaction_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--
ALTER SEQUENCE public."Transaction_id_seq" OWNED BY public."Transaction".id;
--
-- Name: BillingCancellation billing_cancellation_id; Type: DEFAULT; Schema: public; Owner: -
--
@ -833,6 +862,13 @@ ALTER TABLE ONLY public."ReservedList" ALTER COLUMN revision_id SET DEFAULT next
ALTER TABLE ONLY public."Spec11ThreatMatch" ALTER COLUMN id SET DEFAULT nextval('public."SafeBrowsingThreat_id_seq"'::regclass);
--
-- Name: Transaction id; Type: DEFAULT; Schema: public; Owner: -
--
ALTER TABLE ONLY public."Transaction" ALTER COLUMN id SET DEFAULT nextval('public."Transaction_id_seq"'::regclass);
--
-- Name: BillingCancellation BillingCancellation_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
@ -1001,6 +1037,14 @@ ALTER TABLE ONLY public."Spec11ThreatMatch"
ADD CONSTRAINT "SafeBrowsingThreat_pkey" PRIMARY KEY (id);
--
-- Name: Transaction Transaction_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public."Transaction"
ADD CONSTRAINT "Transaction_pkey" PRIMARY KEY (id);
--
-- Name: RegistryLock idx_registry_lock_repo_id_revision_id; Type: CONSTRAINT; Schema: public; Owner: -
--