Always use SQL based ID allocation (#1899)

We've been using it in production for three weeks now. Everything seems
to be working fine. Removing the code related to checking the migration
state and using the override.
This commit is contained in:
Lai Jiang 2023-01-10 09:22:01 -05:00 committed by GitHub
parent 2294c77306
commit 15d0507914
4 changed files with 5 additions and 100 deletions

View file

@ -15,7 +15,6 @@
package google.registry.beam.common;
import google.registry.config.RegistryEnvironment;
import google.registry.model.annotations.DeleteAfterMigration;
import google.registry.persistence.PersistenceModule.JpaTransactionManagerType;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import java.util.Objects;
@ -57,17 +56,6 @@ public interface RegistryPipelineOptions extends GcpOptions {
void setSqlWriteBatchSize(int sqlWriteBatchSize);
@DeleteAfterMigration
@Description(
"Whether to use self allocated primary IDs when building entities. This should only be used"
+ " when the IDs are not significant and the resulting entities are not persisted back to"
+ " the database. Use with caution as self allocated IDs are not unique across workers,"
+ " and persisting entities with these IDs can be dangerous.")
@Default.Boolean(false)
boolean getUseSelfAllocatedId();
void setUseSelfAllocatedId(boolean useSelfAllocatedId);
static RegistryPipelineComponent toRegistryPipelineComponent(RegistryPipelineOptions options) {
return DaggerRegistryPipelineComponent.builder()
.isolationOverride(options.getIsolationOverride())

View file

@ -21,7 +21,6 @@ import com.google.common.flogger.FluentLogger;
import dagger.Lazy;
import google.registry.config.RegistryEnvironment;
import google.registry.config.SystemPropertySetter;
import google.registry.model.IdService;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.persistence.transaction.TransactionManagerFactory;
import org.apache.beam.sdk.harness.JvmInitializer;
@ -63,15 +62,5 @@ public class RegistryPipelineWorkerInitializer implements JvmInitializer {
}
TransactionManagerFactory.setJpaTmOnBeamWorker(transactionManagerLazy::get);
SystemPropertySetter.PRODUCTION_IMPL.setProperty(PROPERTY, "true");
// Use self-allocated IDs if requested. Note that this inevitably results in duplicate IDs from
// multiple workers, which can also collide with existing IDs in the database. So they cannot be
// dependent upon for comparison or anything significant. The resulting entities can never be
// persisted back into the database. This is a stop-gap measure that should only be used when
// you need to create Buildables in Beam, but do not have control over how the IDs are
// allocated, and you don't care about the generated IDs as long
// as you can build the entities.
if (registryOptions.getUseSelfAllocatedId()) {
IdService.setForceUseSelfAllocatedId();
}
}
}

View file

@ -687,13 +687,6 @@ public class RdePipeline implements Serializable {
RdePipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(RdePipelineOptions.class);
// We need to self allocate the IDs because the pipeline creates EPP resources from history
// entries and projects them to watermark. These buildable entities would otherwise request an
// ID from datastore, which Beam does not have access to. The IDs are not included in the
// deposits or are these entities persisted back to the database, so it is OK to use a self
// allocated ID to get around the limitations of beam.
options.setUseSelfAllocatedId(true);
RegistryPipelineOptions.validateRegistryPipelineOptions(options);
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED);
DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run();

View file

@ -14,62 +14,25 @@
//
package google.registry.model;
import static com.google.common.base.Preconditions.checkState;
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
import static org.joda.time.DateTimeZone.UTC;
import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.common.flogger.FluentLogger;
import google.registry.beam.common.RegistryPipelineWorkerInitializer;
import google.registry.config.RegistryEnvironment;
import google.registry.model.common.DatabaseMigrationStateSchedule;
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
import java.math.BigInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.joda.time.DateTime;
/**
* Allocates a {@link long} to use as a {@code @Id}, (part) of the primary SQL key for an entity.
*/
public final class IdService {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private IdService() {}
// TODO(ptkach): remove once the Cloud SQL sequence-based method is live in production
private static boolean forceUseSelfAllocateId = false;
public static void setForceUseSelfAllocatedId() {
checkState(
"true".equals(System.getProperty(RegistryPipelineWorkerInitializer.PROPERTY, "false")),
"Can only set ID supplier in a Beam pipeline");
logger.atWarning().log("Using ID supplier override!");
IdService.forceUseSelfAllocateId = true;
}
private static class SelfAllocatedIdSupplier implements Supplier<Long> {
private static final SelfAllocatedIdSupplier INSTANCE = new SelfAllocatedIdSupplier();
/** Counts of used ids for self allocating IDs. */
private static final AtomicLong nextSelfAllocatedId = new AtomicLong(1); // ids cannot be zero
private static SelfAllocatedIdSupplier getInstance() {
return INSTANCE;
}
@Override
public Long get() {
return nextSelfAllocatedId.getAndIncrement();
}
}
/**
* A SQL Sequence based ID allocator that generates an ID from a monotonically increasing atomic
* {@link long}
* A SQL Sequence based ID allocator that generates an ID from a monotonically increasing {@link
* AtomicLong}
*
* <p>The generated IDs are project-wide unique
* <p>The generated IDs are project-wide unique.
*/
private static Long getSequenceBasedId() {
public static long allocateId() {
return tm().transact(
() ->
(BigInteger)
@ -78,32 +41,4 @@ public final class IdService {
.getSingleResult())
.longValue();
}
// TODO(ptkach): Remove once all instances switch to sequenceBasedId
/**
* A Datastore based ID allocator that generates an ID from a monotonically increasing atomic
* {@link long}
*
* <p>The generated IDs are project-wide unique
*/
private static Long getDatastoreBasedId() {
return DatastoreServiceFactory.getDatastoreService()
.allocateIds("common", 1)
.iterator()
.next()
.getId();
}
private IdService() {}
public static long allocateId() {
if (DatabaseMigrationStateSchedule.getValueAtTime(DateTime.now(UTC))
.equals(MigrationState.SEQUENCE_BASED_ALLOCATE_ID)
|| RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get())) {
return getSequenceBasedId();
} else if (IdService.forceUseSelfAllocateId) {
return SelfAllocatedIdSupplier.getInstance().get();
}
return getDatastoreBasedId();
}
}