From 0fe2e6c97658de8afd692d7b6ee6f73e01b8ca5c Mon Sep 17 00:00:00 2001 From: Weimin Yu Date: Mon, 13 Sep 2021 16:55:20 -0400 Subject: [PATCH] Preserve update_time when replicating to SQL (#1316) * Preserve update_time when replicating to SQL Prevent InitSqlPipeline from changing the UpdateAutoTimestamp fields in entities. --- .../registry/beam/common/RegistryJpaIO.java | 34 +++++++++++++++++-- .../beam/initsql/InitSqlPipeline.java | 3 +- .../beam/initsql/InitSqlPipelineTest.java | 2 ++ 3 files changed, 35 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java index 2744195e5..aea0902ec 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java +++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java @@ -22,6 +22,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Streams; import google.registry.backup.AppEngineEnvironment; import google.registry.beam.common.RegistryQuery.CriteriaQuerySupplier; +import google.registry.model.UpdateAutoTimestamp; +import google.registry.model.UpdateAutoTimestamp.DisableAutoUpdateResource; import google.registry.model.ofy.ObjectifyService; import google.registry.model.replay.SqlEntity; import google.registry.persistence.transaction.JpaTransactionManager; @@ -274,6 +276,12 @@ public final class RegistryJpaIO { public abstract SerializableFunction jpaConverter(); + /** + * Signal to the writer that the {@link UpdateAutoTimestamp} property should be allowed to + * manipulate its value before persistence. The default value is {@code true}. + */ + abstract boolean withUpdateAutoTimestamp(); + public Write withName(String name) { return toBuilder().name(name).build(); } @@ -294,6 +302,10 @@ public final class RegistryJpaIO { return toBuilder().jpaConverter(jpaConverter).build(); } + public Write disableUpdateAutoTimestamp() { + return toBuilder().withUpdateAutoTimestamp(false).build(); + } + abstract Builder toBuilder(); @Override @@ -310,7 +322,7 @@ public final class RegistryJpaIO { GroupIntoBatches.ofSize(batchSize()).withShardedKey()) .apply( "Write in batch for " + name(), - ParDo.of(new SqlBatchWriter<>(name(), jpaConverter()))); + ParDo.of(new SqlBatchWriter<>(name(), jpaConverter(), withUpdateAutoTimestamp()))); } static Builder builder() { @@ -318,7 +330,8 @@ public final class RegistryJpaIO { .name(DEFAULT_NAME) .batchSize(DEFAULT_BATCH_SIZE) .shards(DEFAULT_SHARDS) - .jpaConverter(x -> x); + .jpaConverter(x -> x) + .withUpdateAutoTimestamp(true); } @AutoValue.Builder @@ -332,6 +345,8 @@ public final class RegistryJpaIO { abstract Builder jpaConverter(SerializableFunction jpaConverter); + abstract Builder withUpdateAutoTimestamp(boolean withUpdateAutoTimestamp); + abstract Write build(); } } @@ -340,10 +355,13 @@ public final class RegistryJpaIO { private static class SqlBatchWriter extends DoFn, Iterable>, Void> { private final Counter counter; private final SerializableFunction jpaConverter; + private final boolean withAutoTimestamp; - SqlBatchWriter(String type, SerializableFunction jpaConverter) { + SqlBatchWriter( + String type, SerializableFunction jpaConverter, boolean withAutoTimestamp) { counter = Metrics.counter("SQL_WRITE", type); this.jpaConverter = jpaConverter; + this.withAutoTimestamp = withAutoTimestamp; } @Setup @@ -358,6 +376,16 @@ public final class RegistryJpaIO { @ProcessElement public void processElement(@Element KV, Iterable> kv) { + if (withAutoTimestamp) { + actuallyProcessElement(kv); + return; + } + try (DisableAutoUpdateResource disable = UpdateAutoTimestamp.disableAutoUpdate()) { + actuallyProcessElement(kv); + } + } + + private void actuallyProcessElement(@Element KV, Iterable> kv) { try (AppEngineEnvironment env = new AppEngineEnvironment()) { ImmutableList entities = Streams.stream(kv.getValue()) diff --git a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java index c4e32fbf7..87ae6788a 100644 --- a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java +++ b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java @@ -219,7 +219,8 @@ public class InitSqlPipeline implements Serializable { .withName(transformId) .withBatchSize(options.getSqlWriteBatchSize()) .withShards(options.getSqlWriteShards()) - .withJpaConverter(Transforms::convertVersionedEntityToSqlEntity)); + .withJpaConverter(Transforms::convertVersionedEntityToSqlEntity) + .disableUpdateAutoTimestamp()); } private static ImmutableList toKindStrings(Collection> entityClasses) { diff --git a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java index c3d7e40cc..733b22ed5 100644 --- a/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java +++ b/core/src/test/java/google/registry/beam/initsql/InitSqlPipelineTest.java @@ -306,6 +306,7 @@ class InitSqlPipelineTest { .build()); exportDir = store.export(exportRootDir.getAbsolutePath(), ALL_KINDS, ImmutableSet.of()); commitLogDir = Files.createDirectory(tmpDir.resolve("commits")).toFile(); + fakeClock.advanceOneMilli(); } } @@ -362,6 +363,7 @@ class InitSqlPipelineTest { .isEqualTo(expected.getAutorenewPollMessage().getOfyKey()); assertThat(actual.getDeletePollMessage().getOfyKey()) .isEqualTo(expected.getDeletePollMessage().getOfyKey()); + assertThat(actual.getUpdateTimestamp()).isEqualTo(expected.getUpdateTimestamp()); // TODO(weiminyu): check gracePeriods and transferData when it is easier to do } }