Preserve update_time when replicating to SQL (#1316)

* Preserve update_time when replicating to SQL

Prevent InitSqlPipeline from changing the UpdateAutoTimestamp fields in
entities.
This commit is contained in:
Weimin Yu 2021-09-13 16:55:20 -04:00 committed by GitHub
parent ac21ee4151
commit 0fe2e6c976
3 changed files with 35 additions and 4 deletions

View file

@ -22,6 +22,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import google.registry.backup.AppEngineEnvironment;
import google.registry.beam.common.RegistryQuery.CriteriaQuerySupplier;
import google.registry.model.UpdateAutoTimestamp;
import google.registry.model.UpdateAutoTimestamp.DisableAutoUpdateResource;
import google.registry.model.ofy.ObjectifyService;
import google.registry.model.replay.SqlEntity;
import google.registry.persistence.transaction.JpaTransactionManager;
@ -274,6 +276,12 @@ public final class RegistryJpaIO {
public abstract SerializableFunction<T, Object> jpaConverter();
/**
* Signal to the writer that the {@link UpdateAutoTimestamp} property should be allowed to
* manipulate its value before persistence. The default value is {@code true}.
*/
abstract boolean withUpdateAutoTimestamp();
public Write<T> withName(String name) {
return toBuilder().name(name).build();
}
@ -294,6 +302,10 @@ public final class RegistryJpaIO {
return toBuilder().jpaConverter(jpaConverter).build();
}
public Write<T> disableUpdateAutoTimestamp() {
return toBuilder().withUpdateAutoTimestamp(false).build();
}
abstract Builder<T> toBuilder();
@Override
@ -310,7 +322,7 @@ public final class RegistryJpaIO {
GroupIntoBatches.<Integer, T>ofSize(batchSize()).withShardedKey())
.apply(
"Write in batch for " + name(),
ParDo.of(new SqlBatchWriter<>(name(), jpaConverter())));
ParDo.of(new SqlBatchWriter<>(name(), jpaConverter(), withUpdateAutoTimestamp())));
}
static <T> Builder<T> builder() {
@ -318,7 +330,8 @@ public final class RegistryJpaIO {
.name(DEFAULT_NAME)
.batchSize(DEFAULT_BATCH_SIZE)
.shards(DEFAULT_SHARDS)
.jpaConverter(x -> x);
.jpaConverter(x -> x)
.withUpdateAutoTimestamp(true);
}
@AutoValue.Builder
@ -332,6 +345,8 @@ public final class RegistryJpaIO {
abstract Builder<T> jpaConverter(SerializableFunction<T, Object> jpaConverter);
abstract Builder<T> withUpdateAutoTimestamp(boolean withUpdateAutoTimestamp);
abstract Write<T> build();
}
}
@ -340,10 +355,13 @@ public final class RegistryJpaIO {
private static class SqlBatchWriter<T> extends DoFn<KV<ShardedKey<Integer>, Iterable<T>>, Void> {
private final Counter counter;
private final SerializableFunction<T, Object> jpaConverter;
private final boolean withAutoTimestamp;
SqlBatchWriter(String type, SerializableFunction<T, Object> jpaConverter) {
SqlBatchWriter(
String type, SerializableFunction<T, Object> jpaConverter, boolean withAutoTimestamp) {
counter = Metrics.counter("SQL_WRITE", type);
this.jpaConverter = jpaConverter;
this.withAutoTimestamp = withAutoTimestamp;
}
@Setup
@ -358,6 +376,16 @@ public final class RegistryJpaIO {
@ProcessElement
public void processElement(@Element KV<ShardedKey<Integer>, Iterable<T>> kv) {
if (withAutoTimestamp) {
actuallyProcessElement(kv);
return;
}
try (DisableAutoUpdateResource disable = UpdateAutoTimestamp.disableAutoUpdate()) {
actuallyProcessElement(kv);
}
}
private void actuallyProcessElement(@Element KV<ShardedKey<Integer>, Iterable<T>> kv) {
try (AppEngineEnvironment env = new AppEngineEnvironment()) {
ImmutableList<Object> entities =
Streams.stream(kv.getValue())

View file

@ -219,7 +219,8 @@ public class InitSqlPipeline implements Serializable {
.withName(transformId)
.withBatchSize(options.getSqlWriteBatchSize())
.withShards(options.getSqlWriteShards())
.withJpaConverter(Transforms::convertVersionedEntityToSqlEntity));
.withJpaConverter(Transforms::convertVersionedEntityToSqlEntity)
.disableUpdateAutoTimestamp());
}
private static ImmutableList<String> toKindStrings(Collection<Class<?>> entityClasses) {

View file

@ -306,6 +306,7 @@ class InitSqlPipelineTest {
.build());
exportDir = store.export(exportRootDir.getAbsolutePath(), ALL_KINDS, ImmutableSet.of());
commitLogDir = Files.createDirectory(tmpDir.resolve("commits")).toFile();
fakeClock.advanceOneMilli();
}
}
@ -362,6 +363,7 @@ class InitSqlPipelineTest {
.isEqualTo(expected.getAutorenewPollMessage().getOfyKey());
assertThat(actual.getDeletePollMessage().getOfyKey())
.isEqualTo(expected.getDeletePollMessage().getOfyKey());
assertThat(actual.getUpdateTimestamp()).isEqualTo(expected.getUpdateTimestamp());
// TODO(weiminyu): check gracePeriods and transferData when it is easier to do
}
}