Restore SelfAllocateId for RDE pipeline (#1853)

This commit is contained in:
Pavlo Tkach 2022-11-18 11:13:41 -05:00 committed by GitHub
parent 22ef38fc3f
commit ad5bde4adb
5 changed files with 72 additions and 9 deletions

View file

@ -16,6 +16,7 @@ package google.registry.beam.common;
import google.registry.beam.common.RegistryJpaIO.Write;
import google.registry.config.RegistryEnvironment;
import google.registry.model.annotations.DeleteAfterMigration;
import google.registry.persistence.PersistenceModule.JpaTransactionManagerType;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import java.util.Objects;
@ -65,6 +66,17 @@ public interface RegistryPipelineOptions extends GcpOptions {
void setSqlWriteShards(int maxConcurrentSqlWriters);
@DeleteAfterMigration
@Description(
"Whether to use self allocated primary IDs when building entities. This should only be used"
+ " when the IDs are not significant and the resulting entities are not persisted back to"
+ " the database. Use with caution as self allocated IDs are not unique across workers,"
+ " and persisting entities with these IDs can be dangerous.")
@Default.Boolean(false)
boolean getUseSelfAllocatedId();
void setUseSelfAllocatedId(boolean useSelfAllocatedId);
static RegistryPipelineComponent toRegistryPipelineComponent(RegistryPipelineOptions options) {
return DaggerRegistryPipelineComponent.builder()
.isolationOverride(options.getIsolationOverride())

View file

@ -22,6 +22,7 @@ import dagger.Lazy;
import google.registry.config.RegistryEnvironment;
import google.registry.config.SystemPropertySetter;
import google.registry.model.AppEngineEnvironment;
import google.registry.model.IdService;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.persistence.transaction.TransactionManagerFactory;
import org.apache.beam.sdk.harness.JvmInitializer;
@ -67,5 +68,15 @@ public class RegistryPipelineWorkerInitializer implements JvmInitializer {
new AppEngineEnvironment("s~" + registryPipelineComponent.getProjectId())
.setEnvironmentForAllThreads();
SystemPropertySetter.PRODUCTION_IMPL.setProperty(PROPERTY, "true");
// Use self-allocated IDs if requested. Note that this inevitably results in duplicate IDs from
// multiple workers, which can also collide with existing IDs in the database. So they cannot be
// dependent upon for comparison or anything significant. The resulting entities can never be
// persisted back into the database. This is a stop-gap measure that should only be used when
// you need to create Buildables in Beam, but do not have control over how the IDs are
// allocated, and you don't care about the generated IDs as long
// as you can build the entities.
if (registryOptions.getUseSelfAllocatedId()) {
IdService.setForceUseSelfAllocatedId();
}
}
}

View file

@ -690,6 +690,13 @@ public class RdePipeline implements Serializable {
RdePipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(RdePipelineOptions.class);
// We need to self allocate the IDs because the pipeline creates EPP resources from history
// entries and projects them to watermark. These buildable entities would otherwise request an
// ID from datastore, which Beam does not have access to. The IDs are not included in the
// deposits or are these entities persisted back to the database, so it is OK to use a self
// allocated ID to get around the limitations of beam.
options.setUseSelfAllocatedId(true);
RegistryPipelineOptions.validateRegistryPipelineOptions(options);
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED);
DaggerRdePipeline_RdePipelineComponent.builder().options(options).build().rdePipeline().run();

View file

@ -14,15 +14,20 @@
//
package google.registry.model;
import static com.google.common.base.Preconditions.checkState;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static org.joda.time.DateTimeZone.UTC;
import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.common.flogger.FluentLogger;
import google.registry.beam.common.RegistryPipelineWorkerInitializer;
import google.registry.config.RegistryEnvironment;
import google.registry.model.annotations.DeleteAfterMigration;
import google.registry.model.common.DatabaseMigrationStateSchedule;
import google.registry.model.common.DatabaseMigrationStateSchedule.MigrationState;
import java.math.BigInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.joda.time.DateTime;
/**
@ -31,6 +36,35 @@ import org.joda.time.DateTime;
@DeleteAfterMigration
public final class IdService {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
// TODO(ptkach): remove once the Cloud SQL sequence-based method is live in production
private static boolean forceUseSelfAllocateId = false;
public static void setForceUseSelfAllocatedId() {
checkState(
"true".equals(System.getProperty(RegistryPipelineWorkerInitializer.PROPERTY, "false")),
"Can only set ID supplier in a Beam pipeline");
logger.atWarning().log("Using ID supplier override!");
IdService.forceUseSelfAllocateId = true;
}
private static class SelfAllocatedIdSupplier implements Supplier<Long> {
private static final SelfAllocatedIdSupplier INSTANCE = new SelfAllocatedIdSupplier();
/** Counts of used ids for self allocating IDs. */
private static final AtomicLong nextSelfAllocatedId = new AtomicLong(1); // ids cannot be zero
private static SelfAllocatedIdSupplier getInstance() {
return INSTANCE;
}
@Override
public Long get() {
return nextSelfAllocatedId.getAndIncrement();
}
}
/**
* A SQL Sequence based ID allocator that generates an ID from a monotonically increasing atomic
* {@link long}
@ -67,10 +101,13 @@ public final class IdService {
private IdService() {}
public static long allocateId() {
return (DatabaseMigrationStateSchedule.getValueAtTime(DateTime.now(UTC))
.equals(MigrationState.SEQUENCE_BASED_ALLOCATE_ID)
|| RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get()))
? getSequenceBasedId()
: getDatastoreBasedId();
if (DatabaseMigrationStateSchedule.getValueAtTime(DateTime.now(UTC))
.equals(MigrationState.SEQUENCE_BASED_ALLOCATE_ID)
|| RegistryEnvironment.UNITTEST.equals(RegistryEnvironment.get())) {
return getSequenceBasedId();
} else if (IdService.forceUseSelfAllocateId) {
return SelfAllocatedIdSupplier.getInstance().get();
}
return getDatastoreBasedId();
}
}

View file

@ -50,7 +50,6 @@ import google.registry.model.domain.Domain;
import google.registry.model.host.Host;
import google.registry.model.rde.RdeMode;
import google.registry.model.registrar.Registrar;
import google.registry.persistence.PersistenceModule.JpaTransactionManagerType;
import google.registry.request.Action;
import google.registry.request.HttpException.BadRequestException;
import google.registry.request.Parameter;
@ -291,9 +290,6 @@ public final class RdeStagingAction implements Runnable {
.put("registryEnvironment", RegistryEnvironment.get().name())
.put("workerMachineType", machineType)
.put("numWorkers", String.valueOf(numWorkers))
.put(
"jpaTransactionManagerType",
JpaTransactionManagerType.READ_ONLY_REPLICA.toString())
// TODO (jianglai): Investigate turning off public IPs (for which
// there is a quota) in order to increase the total number of
// workers allowed (also under quota).