From ad5bde4adbcfbfd79e794282ee14eaf6da0a71d6 Mon Sep 17 00:00:00 2001 From: Pavlo Tkach <3469726+ptkach@users.noreply.github.com> Date: Fri, 18 Nov 2022 11:13:41 -0500 Subject: [PATCH] Restore SelfAllocateId for RDE pipeline (#1853) --- .../beam/common/RegistryPipelineOptions.java | 12 +++++ .../RegistryPipelineWorkerInitializer.java | 11 +++++ .../google/registry/beam/rde/RdePipeline.java | 7 +++ .../java/google/registry/model/IdService.java | 47 +++++++++++++++++-- .../google/registry/rde/RdeStagingAction.java | 4 -- 5 files changed, 72 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java index bbd3e4806..74186e2e4 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java +++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java @@ -16,6 +16,7 @@ package google.registry.beam.common; import google.registry.beam.common.RegistryJpaIO.Write; import google.registry.config.RegistryEnvironment; +import google.registry.model.annotations.DeleteAfterMigration; import google.registry.persistence.PersistenceModule.JpaTransactionManagerType; import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import java.util.Objects; @@ -65,6 +66,17 @@ public interface RegistryPipelineOptions extends GcpOptions { void setSqlWriteShards(int maxConcurrentSqlWriters); + @DeleteAfterMigration + @Description( + "Whether to use self allocated primary IDs when building entities. This should only be used" + + " when the IDs are not significant and the resulting entities are not persisted back to" + + " the database. Use with caution as self allocated IDs are not unique across workers," + + " and persisting entities with these IDs can be dangerous.") + @Default.Boolean(false) + boolean getUseSelfAllocatedId(); + + void setUseSelfAllocatedId(boolean useSelfAllocatedId); + static RegistryPipelineComponent toRegistryPipelineComponent(RegistryPipelineOptions options) { return DaggerRegistryPipelineComponent.builder() .isolationOverride(options.getIsolationOverride()) diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java index 776be56b4..baaa861c1 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java +++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java @@ -22,6 +22,7 @@ import dagger.Lazy; import google.registry.config.RegistryEnvironment; import google.registry.config.SystemPropertySetter; import google.registry.model.AppEngineEnvironment; +import google.registry.model.IdService; import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.persistence.transaction.TransactionManagerFactory; import org.apache.beam.sdk.harness.JvmInitializer; @@ -67,5 +68,15 @@ public class RegistryPipelineWorkerInitializer implements JvmInitializer { new AppEngineEnvironment("s~" + registryPipelineComponent.getProjectId()) .setEnvironmentForAllThreads(); SystemPropertySetter.PRODUCTION_IMPL.setProperty(PROPERTY, "true"); + // Use self-allocated IDs if requested. Note that this inevitably results in duplicate IDs from + // multiple workers, which can also collide with existing IDs in the database. So they cannot be + // dependent upon for comparison or anything significant. The resulting entities can never be + // persisted back into the database. This is a stop-gap measure that should only be used when + // you need to create Buildables in Beam, but do not have control over how the IDs are + // allocated, and you don't care about the generated IDs as long + // as you can build the entities. + if (registryOptions.getUseSelfAllocatedId()) { + IdService.setForceUseSelfAllocatedId(); + } } } diff --git a/core/src/main/java/google/registry/beam/rde/RdePipeline.java b/core/src/main/java/google/registry/beam/rde/RdePipeline.java index 3a5e5acef..73d5bb1c9 100644 --- a/core/src/main/java/google/registry/beam/rde/RdePipeline.java +++ b/core/src/main/java/google/registry/beam/rde/RdePipeline.java @@ -690,6 +690,13 @@ public class RdePipeline implements Serializable { RdePipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(RdePipelineOptions.class); + // We need to self allocate the IDs because the pipeline creates EPP resources from history + // entries and projects them to watermark. These buildable entities would otherwise request an + // ID from datastore, which Beam does not have access to. The IDs are not included in the + // deposits or are these entities persisted back to the database, so it is OK to use a self + // allocated ID to get around the limitations of beam. + options.setUseSelfAllocatedId(true); + RegistryPipelineOptions.validateRegistryPipelineOptions(options); options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED); DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run(); diff --git a/core/src/main/java/google/registry/model/IdService.java b/core/src/main/java/google/registry/model/IdService.java index b445cbb1e..a375d89fc 100644 --- a/core/src/main/java/google/registry/model/IdService.java +++ b/core/src/main/java/google/registry/model/IdService.java @@ -14,15 +14,20 @@ // package google.registry.model; +import static com.google.common.base.Preconditions.checkState; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static org.joda.time.DateTimeZone.UTC; import com.google.appengine.api.datastore.DatastoreServiceFactory; +import com.google.common.flogger.FluentLogger; +import google.registry.beam.common.RegistryPipelineWorkerInitializer; import google.registry.config.RegistryEnvironment; import google.registry.model.annotations.DeleteAfterMigration; import google.registry.model.common.DatabaseMigrationStateSchedule; import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState; import java.math.BigInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import org.joda.time.DateTime; /** @@ -31,6 +36,35 @@ import org.joda.time.DateTime; @DeleteAfterMigration public final class IdService { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + // TODO(ptkach): remove once the Cloud SQL sequence-based method is live in production + private static boolean forceUseSelfAllocateId = false; + + public static void setForceUseSelfAllocatedId() { + checkState( + "true".equals(System.getProperty(RegistryPipelineWorkerInitializer.PROPERTY, "false")), + "Can only set ID supplier in a Beam pipeline"); + logger.atWarning().log("Using ID supplier override!"); + IdService.forceUseSelfAllocateId = true; + } + + private static class SelfAllocatedIdSupplier implements Supplier { + + private static final SelfAllocatedIdSupplier INSTANCE = new SelfAllocatedIdSupplier(); + + /** Counts of used ids for self allocating IDs. */ + private static final AtomicLong nextSelfAllocatedId = new AtomicLong(1); // ids cannot be zero + + private static SelfAllocatedIdSupplier getInstance() { + return INSTANCE; + } + + @Override + public Long get() { + return nextSelfAllocatedId.getAndIncrement(); + } + } /** * A SQL Sequence based ID allocator that generates an ID from a monotonically increasing atomic * {@link long} @@ -67,10 +101,13 @@ public final class IdService { private IdService() {} public static long allocateId() { - return (DatabaseMigrationStateSchedule.getValueAtTime(DateTime.now(UTC)) - .equals(MigrationState.SEQUENCE_BASED_ALLOCATE_ID) - || RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get())) - ? getSequenceBasedId() - : getDatastoreBasedId(); + if (DatabaseMigrationStateSchedule.getValueAtTime(DateTime.now(UTC)) + .equals(MigrationState.SEQUENCE_BASED_ALLOCATE_ID) + || RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get())) { + return getSequenceBasedId(); + } else if (IdService.forceUseSelfAllocateId) { + return SelfAllocatedIdSupplier.getInstance().get(); + } + return getDatastoreBasedId(); } } diff --git a/core/src/main/java/google/registry/rde/RdeStagingAction.java b/core/src/main/java/google/registry/rde/RdeStagingAction.java index a5c0c9ed5..120126f13 100644 --- a/core/src/main/java/google/registry/rde/RdeStagingAction.java +++ b/core/src/main/java/google/registry/rde/RdeStagingAction.java @@ -50,7 +50,6 @@ import google.registry.model.domain.Domain; import google.registry.model.host.Host; import google.registry.model.rde.RdeMode; import google.registry.model.registrar.Registrar; -import google.registry.persistence.PersistenceModule.JpaTransactionManagerType; import google.registry.request.Action; import google.registry.request.HttpException.BadRequestException; import google.registry.request.Parameter; @@ -291,9 +290,6 @@ public final class RdeStagingAction implements Runnable { .put("registryEnvironment", RegistryEnvironment.get().name()) .put("workerMachineType", machineType) .put("numWorkers", String.valueOf(numWorkers)) - .put( - "jpaTransactionManagerType", - JpaTransactionManagerType.READ_ONLY_REPLICA.toString()) // TODO (jianglai): Investigate turning off public IPs (for which // there is a quota) in order to increase the total number of // workers allowed (also under quota).