diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java index f4f350d6a..4384d5088 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java +++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineOptions.java @@ -15,7 +15,6 @@ package google.registry.beam.common; import google.registry.config.RegistryEnvironment; -import google.registry.model.annotations.DeleteAfterMigration; import google.registry.persistence.PersistenceModule.JpaTransactionManagerType; import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import java.util.Objects; @@ -57,17 +56,6 @@ public interface RegistryPipelineOptions extends GcpOptions { void setSqlWriteBatchSize(int sqlWriteBatchSize); - @DeleteAfterMigration - @Description( - "Whether to use self allocated primary IDs when building entities. This should only be used" - + " when the IDs are not significant and the resulting entities are not persisted back to" - + " the database. Use with caution as self allocated IDs are not unique across workers," - + " and persisting entities with these IDs can be dangerous.") - @Default.Boolean(false) - boolean getUseSelfAllocatedId(); - - void setUseSelfAllocatedId(boolean useSelfAllocatedId); - static RegistryPipelineComponent toRegistryPipelineComponent(RegistryPipelineOptions options) { return DaggerRegistryPipelineComponent.builder() .isolationOverride(options.getIsolationOverride()) diff --git a/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java b/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java index 0f849aa42..fe054ec6e 100644 --- a/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java +++ b/core/src/main/java/google/registry/beam/common/RegistryPipelineWorkerInitializer.java @@ -21,7 +21,6 @@ import com.google.common.flogger.FluentLogger; import dagger.Lazy; import google.registry.config.RegistryEnvironment; import google.registry.config.SystemPropertySetter; -import google.registry.model.IdService; import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.persistence.transaction.TransactionManagerFactory; import org.apache.beam.sdk.harness.JvmInitializer; @@ -63,15 +62,5 @@ public class RegistryPipelineWorkerInitializer implements JvmInitializer { } TransactionManagerFactory.setJpaTmOnBeamWorker(transactionManagerLazy::get); SystemPropertySetter.PRODUCTION_IMPL.setProperty(PROPERTY, "true"); - // Use self-allocated IDs if requested. Note that this inevitably results in duplicate IDs from - // multiple workers, which can also collide with existing IDs in the database. So they cannot be - // dependent upon for comparison or anything significant. The resulting entities can never be - // persisted back into the database. This is a stop-gap measure that should only be used when - // you need to create Buildables in Beam, but do not have control over how the IDs are - // allocated, and you don't care about the generated IDs as long - // as you can build the entities. - if (registryOptions.getUseSelfAllocatedId()) { - IdService.setForceUseSelfAllocatedId(); - } } } diff --git a/core/src/main/java/google/registry/beam/rde/RdePipeline.java b/core/src/main/java/google/registry/beam/rde/RdePipeline.java index 247409c5a..87d3148a6 100644 --- a/core/src/main/java/google/registry/beam/rde/RdePipeline.java +++ b/core/src/main/java/google/registry/beam/rde/RdePipeline.java @@ -687,13 +687,6 @@ public class RdePipeline implements Serializable { RdePipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(RdePipelineOptions.class); - // We need to self allocate the IDs because the pipeline creates EPP resources from history - // entries and projects them to watermark. These buildable entities would otherwise request an - // ID from datastore, which Beam does not have access to. The IDs are not included in the - // deposits or are these entities persisted back to the database, so it is OK to use a self - // allocated ID to get around the limitations of beam. - options.setUseSelfAllocatedId(true); - RegistryPipelineOptions.validateRegistryPipelineOptions(options); options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED); DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run(); diff --git a/core/src/main/java/google/registry/model/IdService.java b/core/src/main/java/google/registry/model/IdService.java index 1e87baa19..64a233caa 100644 --- a/core/src/main/java/google/registry/model/IdService.java +++ b/core/src/main/java/google/registry/model/IdService.java @@ -14,62 +14,25 @@ // package google.registry.model; -import static com.google.common.base.Preconditions.checkState; import static google.registry.persistence.transaction.TransactionManagerFactory.tm; -import static org.joda.time.DateTimeZone.UTC; -import com.google.appengine.api.datastore.DatastoreServiceFactory; -import com.google.common.flogger.FluentLogger; -import google.registry.beam.common.RegistryPipelineWorkerInitializer; -import google.registry.config.RegistryEnvironment; -import google.registry.model.common.DatabaseMigrationStateSchedule; -import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState; import java.math.BigInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; -import org.joda.time.DateTime; /** * Allocates a {@link long} to use as a {@code @Id}, (part) of the primary SQL key for an entity. */ public final class IdService { - private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + private IdService() {} - // TODO(ptkach): remove once the Cloud SQL sequence-based method is live in production - private static boolean forceUseSelfAllocateId = false; - - public static void setForceUseSelfAllocatedId() { - checkState( - "true".equals(System.getProperty(RegistryPipelineWorkerInitializer.PROPERTY, "false")), - "Can only set ID supplier in a Beam pipeline"); - logger.atWarning().log("Using ID supplier override!"); - IdService.forceUseSelfAllocateId = true; - } - - private static class SelfAllocatedIdSupplier implements Supplier { - - private static final SelfAllocatedIdSupplier INSTANCE = new SelfAllocatedIdSupplier(); - - /** Counts of used ids for self allocating IDs. */ - private static final AtomicLong nextSelfAllocatedId = new AtomicLong(1); // ids cannot be zero - - private static SelfAllocatedIdSupplier getInstance() { - return INSTANCE; - } - - @Override - public Long get() { - return nextSelfAllocatedId.getAndIncrement(); - } - } /** - * A SQL Sequence based ID allocator that generates an ID from a monotonically increasing atomic - * {@link long} + * A SQL Sequence based ID allocator that generates an ID from a monotonically increasing {@link + * AtomicLong} * - *

The generated IDs are project-wide unique + *

The generated IDs are project-wide unique. */ - private static Long getSequenceBasedId() { + public static long allocateId() { return tm().transact( () -> (BigInteger) @@ -78,32 +41,4 @@ public final class IdService { .getSingleResult()) .longValue(); } - - // TODO(ptkach): Remove once all instances switch to sequenceBasedId - /** - * A Datastore based ID allocator that generates an ID from a monotonically increasing atomic - * {@link long} - * - *

The generated IDs are project-wide unique - */ - private static Long getDatastoreBasedId() { - return DatastoreServiceFactory.getDatastoreService() - .allocateIds("common", 1) - .iterator() - .next() - .getId(); - } - - private IdService() {} - - public static long allocateId() { - if (DatabaseMigrationStateSchedule.getValueAtTime(DateTime.now(UTC)) - .equals(MigrationState.SEQUENCE_BASED_ALLOCATE_ID) - || RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get())) { - return getSequenceBasedId(); - } else if (IdService.forceUseSelfAllocateId) { - return SelfAllocatedIdSupplier.getInstance().get(); - } - return getDatastoreBasedId(); - } }