Add a beforeSqlSave callback to ReplaySpecializer (#1062)

* Add a beforeSqlSave callback to ReplaySpecializer

When in the Datastore-primary and SQL-secondary stage, we will want to
save the EppResource-at-this-point-in-time field in the *History
objects so that later on we can examine the *History objects to see what
the resource looked like at that point in time.

Without this PR, the full object at that point in time would be lost
during the asynchronous replay since Datastore doesn't know about it.

In addition, we modify the HistoryEntry weight / priority so that
additions to it come after the additions to the resource off of which it
is based. As a result, we need to DEFER some foreign keys so that we can
write the billing / poll message objects before the history object that
they're referencing.
This commit is contained in:
gbrodman 2021-04-12 12:11:20 -04:00 committed by GitHub
parent 286b8dd268
commit 101c8a5db7
14 changed files with 202 additions and 93 deletions

View file

@ -154,7 +154,13 @@ public class ReplayCommitLogsToSqlAction implements Runnable {
Object ofyPojo = ofy().toPojo(entity);
if (ofyPojo instanceof DatastoreEntity) {
DatastoreEntity datastoreEntity = (DatastoreEntity) ofyPojo;
datastoreEntity.toSqlEntity().ifPresent(jpaTm()::put);
datastoreEntity
.toSqlEntity()
.ifPresent(
sqlEntity -> {
ReplaySpecializer.beforeSqlSave(sqlEntity);
jpaTm().put(sqlEntity);
});
} else {
// this should never happen, but we shouldn't fail on it
logger.atSevere().log(

View file

@ -14,6 +14,8 @@
package google.registry.model.contact;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.annotation.EntitySubclass;
import google.registry.model.ImmutableObject;
@ -114,6 +116,12 @@ public class ContactHistory extends HistoryEntry implements SqlEntity {
return Optional.of(asHistoryEntry());
}
// Used to fill out the contactBase field during asynchronous replay
public static void beforeSqlSave(ContactHistory contactHistory) {
contactHistory.contactBase =
jpaTm().loadByKey(VKey.createSql(ContactResource.class, contactHistory.getContactRepoId()));
}
/** Class to represent the composite primary key of {@link ContactHistory} entity. */
public static class ContactHistoryId extends ImmutableObject implements Serializable {

View file

@ -15,6 +15,7 @@
package google.registry.model.domain;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.util.CollectionUtils.nullToEmptyImmutableCopy;
import com.google.common.collect.ImmutableSet;
@ -263,6 +264,12 @@ public class DomainHistory extends HistoryEntry implements SqlEntity {
return Optional.of(asHistoryEntry());
}
// Used to fill out the domainContent field during asynchronous replay
public static void beforeSqlSave(DomainHistory domainHistory) {
domainHistory.domainContent =
jpaTm().loadByKey(VKey.createSql(DomainBase.class, domainHistory.getDomainRepoId()));
}
/** Class to represent the composite primary key of {@link DomainHistory} entity. */
public static class DomainHistoryId extends ImmutableObject implements Serializable {

View file

@ -14,6 +14,8 @@
package google.registry.model.host;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import com.googlecode.objectify.Key;
import com.googlecode.objectify.annotation.EntitySubclass;
import google.registry.model.ImmutableObject;
@ -115,6 +117,12 @@ public class HostHistory extends HistoryEntry implements SqlEntity {
return Optional.of(asHistoryEntry());
}
// Used to fill out the hostBase field during asynchronous replay
public static void beforeSqlSave(HostHistory hostHistory) {
hostHistory.hostBase =
jpaTm().loadByKey(VKey.createSql(HostResource.class, hostHistory.getHostRepoId()));
}
/** Class to represent the composite primary key of {@link HostHistory} entity. */
public static class HostHistoryId extends ImmutableObject implements Serializable {

View file

@ -41,10 +41,11 @@ public class EntityWritePriorities {
*/
static final ImmutableMap<String, Integer> CLASS_PRIORITIES =
ImmutableMap.of(
"ContactResource", -15,
"HistoryEntry", -10,
"AllocationToken", -9,
"DomainBase", 10);
"ContactResource", 8,
"HostResource", 9,
"DomainBase", 10,
"HistoryEntry", 20);
// The beginning of the range of priority numbers reserved for delete. This must be greater than
// any of the values in CLASS_PRIORITIES by enough overhead to accommodate any negative values in

View file

@ -26,21 +26,30 @@ import java.lang.reflect.Method;
* to invoke special class methods if they are present.
*/
public class ReplaySpecializer {
public static void beforeSqlDelete(VKey<?> key) {
invokeMethod(key.getKind(), "beforeSqlDelete", key);
}
public static void beforeSqlSave(SqlEntity sqlEntity) {
invokeMethod(sqlEntity.getClass(), "beforeSqlSave", sqlEntity);
}
private static <T> void invokeMethod(Class<T> clazz, String methodName, Object argument) {
try {
Method method = key.getKind().getMethod("beforeSqlDelete", VKey.class);
method.invoke(null, new Object[] {key});
Method method = clazz.getMethod(methodName, argument.getClass());
method.invoke(null, argument);
} catch (NoSuchMethodException e) {
// Ignore, this just means that the class doesn't need this hook.
} catch (IllegalAccessException e) {
throw new RuntimeException(
"beforeSqlDelete() method is defined for class "
+ key.getKind().getName()
+ " but is not public.",
String.format(
"%s() method is defined for class %s but is not public.",
methodName, clazz.getName()),
e);
} catch (InvocationTargetException e) {
throw new RuntimeException(
"beforeSqlDelete() method for class " + key.getKind().getName() + " threw an exception.",
String.format("%s() method for class %s threw an exception", methodName, clazz.getName()),
e);
}
}

View file

@ -125,6 +125,7 @@ public class ReplayCommitLogsToSqlActionTest {
action.diffLister.gcsBucket = GCS_BUCKET;
action.diffLister.executor = newDirectExecutorService();
RegistryConfig.overrideCloudSqlReplayCommitLogs(true);
TestObject.beforeSqlSaveCallCount = 0;
TestObject.beforeSqlDeleteCallCount = 0;
}
@ -442,6 +443,21 @@ public class ReplayCommitLogsToSqlActionTest {
.isEqualTo("Can't acquire SQL commit log replay lock, aborting.");
}
@Test
void testSuccess_beforeSqlSaveCallback() throws Exception {
DateTime now = fakeClock.nowUtc();
Key<CommitLogBucket> bucketKey = getBucketKey(1);
Key<CommitLogManifest> manifestKey = CommitLogManifest.createKey(bucketKey, now);
jpaTm().transact(() -> SqlReplayCheckpoint.set(now.minusMinutes(1).minusMillis(1)));
saveDiffFile(
gcsService,
createCheckpoint(now.minusMinutes(1)),
CommitLogManifest.create(bucketKey, now, null),
CommitLogMutation.create(manifestKey, TestObject.create("a")));
runAndAssertSuccess(now.minusMinutes(1));
assertThat(TestObject.beforeSqlSaveCallCount).isEqualTo(1);
}
@Test
void testSuccess_deleteSqlCallback() throws Exception {
DateTime now = fakeClock.nowUtc();

View file

@ -40,7 +40,7 @@ class EntityWritePrioritiesTest {
Key.create(HistoryEntry.class, 200), "fake history entry",
Key.create(Registrar.class, 300), "fake registrar");
ImmutableMap<Long, Integer> expectedValues =
ImmutableMap.of(100L, EntityWritePriorities.DELETE_RANGE + 10, 200L, -10, 300L, 0);
ImmutableMap.of(100L, EntityWritePriorities.DELETE_RANGE - 20, 200L, 20, 300L, 0);
for (ImmutableMap.Entry<Key<?>, Object> entry : actions.entrySet()) {
assertThat(

View file

@ -34,6 +34,7 @@ import javax.persistence.Transient;
@EntityForTesting
public class TestObject extends ImmutableObject implements DatastoreAndSqlEntity {
public static int beforeSqlSaveCallCount;
public static int beforeSqlDeleteCallCount;
@Parent @Transient Key<EntityGroupRoot> parent;
@ -74,6 +75,10 @@ public class TestObject extends ImmutableObject implements DatastoreAndSqlEntity
beforeSqlDeleteCallCount++;
}
public static void beforeSqlSave(TestObject testObject) {
beforeSqlSaveCallCount++;
}
/** A test @VirtualEntity model object, which should not be persisted. */
@Entity
@VirtualEntity