Update the Datastore to SQL migration pipeline (#927)

* Update the Datastore to SQL migration pipeline

The pipeline now includes all entity types to be migrated by it, and has
completed successfully using the Sandbox data set. The running time in Sandbox
is about 3 hours, extrapolating by entity count to a 12-hour run with
production data. However, actual running time is likely to be longer since
throughput is lower with domains, which accounts for a higher percentage
of the total in production. More optimization will be needed.

The migrated data has not been validated.
This commit is contained in:
Weimin Yu 2021-01-12 18:05:46 -05:00 committed by GitHub
parent 85fa8ebbd5
commit d0831cadde
9 changed files with 1263 additions and 200 deletions

View file

@ -828,9 +828,8 @@ task buildToolImage(dependsOn: nomulus, type: Exec) {
commandLine 'docker', 'build', '-t', 'nomulus-tool', '.' commandLine 'docker', 'build', '-t', 'nomulus-tool', '.'
} }
task generateInitSqlPipelineGraph(type: Test) { task generateInitSqlPipelineGraph(type: FilteringTest) {
include "**/InitSqlPipelineGraphTest.*" tests = ['InitSqlPipelineGraphTest.createPipeline_compareGraph']
testNameIncludePatterns = ["**createPipeline_compareGraph"]
ignoreFailures = true ignoreFailures = true
} }

View file

@ -26,17 +26,15 @@ final class DomainBaseUtil {
private DomainBaseUtil() {} private DomainBaseUtil() {}
/** /**
* Removes {@link google.registry.model.billing.BillingEvent.Recurring}, {@link * Removes properties that contain foreign keys from a Datastore {@link Entity} that represents an
* google.registry.model.poll.PollMessage PollMessages} and {@link * Ofy {@link google.registry.model.domain.DomainBase}. This breaks the cycle of foreign key
* google.registry.model.host.HostResource name servers} from a Datastore {@link Entity} that * constraints between entity kinds, allowing {@code DomainBases} to be inserted into the SQL
* represents an Ofy {@link google.registry.model.domain.DomainBase}. This breaks the cycle of * database. See {@link InitSqlPipeline} for a use case, where the full {@code DomainBases} are
* foreign key constraints between these entity kinds, allowing {@code DomainBases} to be inserted * written again during the last stage of the pipeline.
* into the SQL database. See {@link InitSqlPipeline} for a use case, where the full {@code
* DomainBases} are written again during the last stage of the pipeline.
* *
* <p>The returned object may be in bad state. Specifically, {@link * <p>The returned object may be in bad state. Specifically, {@link
* google.registry.model.eppcommon.StatusValue#INACTIVE} is not added after name servers are * google.registry.model.eppcommon.StatusValue#INACTIVE} is not added after name servers are
* removed. This only impacts tests. * removed. This only impacts tests that manipulate Datastore entities directly.
* *
* <p>This operation is performed on an Datastore {@link Entity} instead of Ofy Java object * <p>This operation is performed on an Datastore {@link Entity} instead of Ofy Java object
* because Objectify requires access to a Datastore service when converting an Ofy object to a * because Objectify requires access to a Datastore service when converting an Ofy object to a
@ -70,6 +68,9 @@ final class DomainBaseUtil {
domainBase.getProperties().keySet().stream() domainBase.getProperties().keySet().stream()
.filter(s -> s.startsWith("transferData.")) .filter(s -> s.startsWith("transferData."))
.forEach(s -> clone.removeProperty(s)); .forEach(s -> clone.removeProperty(s));
domainBase.getProperties().keySet().stream()
.filter(s -> s.startsWith("gracePeriods."))
.forEach(s -> clone.removeProperty(s));
return clone; return clone;
} }
} }

View file

@ -34,6 +34,7 @@ import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.RegistrarContact; import google.registry.model.registrar.RegistrarContact;
import google.registry.model.registry.Registry; import google.registry.model.registry.Registry;
import google.registry.model.reporting.HistoryEntry; import google.registry.model.reporting.HistoryEntry;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.persistence.transaction.JpaTransactionManager;
import java.io.Serializable; import java.io.Serializable;
import java.util.Collection; import java.util.Collection;
@ -77,14 +78,22 @@ import org.joda.time.DateTime;
* HistoryEntry}. * HistoryEntry}.
* <li>{@link BillingEvent.OneTime}: references {@code Registrar}, {@code DomainBase}, {@code * <li>{@link BillingEvent.OneTime}: references {@code Registrar}, {@code DomainBase}, {@code
* BillingEvent.Recurring}, {@code HistoryEntry} and {@code AllocationToken}. * BillingEvent.Recurring}, {@code HistoryEntry} and {@code AllocationToken}.
* <li>{@link BillingEvent.Modification}: SQL model TBD. Will reference {@code Registrar}, {@code
* DomainBase} and {@code BillingEvent.OneTime}.
* <li>{@link BillingEvent.Cancellation}: references {@code Registrar}, {@code DomainBase}, {@code * <li>{@link BillingEvent.Cancellation}: references {@code Registrar}, {@code DomainBase}, {@code
* BillingEvent.Recurring}, {@code BillingEvent.OneTime}, and {@code HistoryEntry}. * BillingEvent.Recurring}, {@code BillingEvent.OneTime}, and {@code HistoryEntry}.
* <li>{@link PollMessage}: references {@code Registrar}, {@code DomainBase}, {@code * <li>{@link PollMessage}: references {@code Registrar}, {@code DomainBase}, {@code
* ContactResource}, {@code HostResource}, and {@code HistoryEntry}. * ContactResource}, {@code HostResource}, and {@code HistoryEntry}.
* <li>{@link DomainBase}, original copy from Datastore. * <li>{@link DomainBase}, original copy from Datastore.
* </ol> * </ol>
*
* <p>This pipeline expects that the source Datastore has at least one entity in each of the types
* above. This assumption allows us to construct a simpler pipeline graph that can be visually
* examined, and is true in all intended use cases. However, tests must not violate this assumption
* when setting up data, otherwise they may run into foreign key constraint violations. The reason
* is that this pipeline uses the {@link Wait} transform to order the persistence by entity type.
* However, the wait is skipped if the target type has no data, resulting in subsequent entity types
* starting prematurely. E.g., if a Datastore has no {@code RegistrarContact} entities, the pipeline
* may start writing {@code DomainBase} entities before all {@code Registry}, {@code Registrar} and
* {@code ContactResource} entities have been persisted.
*/ */
public class InitSqlPipeline implements Serializable { public class InitSqlPipeline implements Serializable {
@ -93,24 +102,23 @@ public class InitSqlPipeline implements Serializable {
* DomainBase}. * DomainBase}.
*/ */
private static final ImmutableList<Class<?>> PHASE_ONE_ORDERED = private static final ImmutableList<Class<?>> PHASE_ONE_ORDERED =
ImmutableList.of(Registry.class, Registrar.class, ContactResource.class); ImmutableList.of(
Registry.class, Registrar.class, ContactResource.class, RegistrarContact.class);
/** /**
* Datastore kinds to be written to the SQL database after the cleansed version of {@link * Datastore kinds to be written to the SQL database after the cleansed version of {@link
* DomainBase}. * DomainBase}.
*
* <p>The following entities are missing from the list:
*
* <ul>
* <li>Those not modeled in JPA yet, e.g., {@code BillingEvent.Modification}.
* <li>Those waiting for sanitation, e.g., {@code HistoryEntry}, which would have duplicate keys
* after converting to SQL model.
* <li>Those that have foreign key constraints on the above.
* </ul>
*/ */
// TODO(weiminyu): add more entities when available.
private static final ImmutableList<Class<?>> PHASE_TWO_ORDERED = private static final ImmutableList<Class<?>> PHASE_TWO_ORDERED =
ImmutableList.of(HostResource.class); ImmutableList.of(
HostResource.class,
HistoryEntry.class,
AllocationToken.class,
BillingEvent.Recurring.class,
BillingEvent.OneTime.class,
BillingEvent.Cancellation.class,
PollMessage.class,
DomainBase.class);
private final InitSqlPipelineOptions options; private final InitSqlPipelineOptions options;
@ -226,7 +234,11 @@ public class InitSqlPipeline implements Serializable {
transformId, transformId,
options.getMaxConcurrentSqlWriters(), options.getMaxConcurrentSqlWriters(),
options.getSqlWriteBatchSize(), options.getSqlWriteBatchSize(),
new JpaSupplierFactory(credentialFileUrl, options.getCloudKmsProjectId(), jpaGetter))); new JpaSupplierFactory(
credentialFileUrl,
options.getCloudKmsProjectId(),
jpaGetter,
TransactionIsolationLevel.TRANSACTION_READ_UNCOMMITTED)));
} }
private static ImmutableList<String> toKindStrings(Collection<Class<?>> entityClasses) { private static ImmutableList<String> toKindStrings(Collection<Class<?>> entityClasses) {

View file

@ -16,6 +16,7 @@ package google.registry.beam.initsql;
import google.registry.beam.initsql.BeamJpaModule.JpaTransactionManagerComponent; import google.registry.beam.initsql.BeamJpaModule.JpaTransactionManagerComponent;
import google.registry.beam.initsql.Transforms.SerializableSupplier; import google.registry.beam.initsql.Transforms.SerializableSupplier;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.persistence.transaction.JpaTransactionManager;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunction;
@ -28,21 +29,32 @@ public class JpaSupplierFactory implements SerializableSupplier<JpaTransactionMa
@Nullable private final String cloudKmsProjectId; @Nullable private final String cloudKmsProjectId;
private final SerializableFunction<JpaTransactionManagerComponent, JpaTransactionManager> private final SerializableFunction<JpaTransactionManagerComponent, JpaTransactionManager>
jpaGetter; jpaGetter;
@Nullable private final TransactionIsolationLevel isolationLevelOverride;
public JpaSupplierFactory( public JpaSupplierFactory(
String credentialFileUrl, String credentialFileUrl,
@Nullable String cloudKmsProjectId, @Nullable String cloudKmsProjectId,
SerializableFunction<JpaTransactionManagerComponent, JpaTransactionManager> jpaGetter) { SerializableFunction<JpaTransactionManagerComponent, JpaTransactionManager> jpaGetter) {
this(credentialFileUrl, cloudKmsProjectId, jpaGetter, null);
}
public JpaSupplierFactory(
String credentialFileUrl,
@Nullable String cloudKmsProjectId,
SerializableFunction<JpaTransactionManagerComponent, JpaTransactionManager> jpaGetter,
@Nullable TransactionIsolationLevel isolationLevelOverride) {
this.credentialFileUrl = credentialFileUrl; this.credentialFileUrl = credentialFileUrl;
this.cloudKmsProjectId = cloudKmsProjectId; this.cloudKmsProjectId = cloudKmsProjectId;
this.jpaGetter = jpaGetter; this.jpaGetter = jpaGetter;
this.isolationLevelOverride = isolationLevelOverride;
} }
@Override @Override
public JpaTransactionManager get() { public JpaTransactionManager get() {
return jpaGetter.apply( return jpaGetter.apply(
DaggerBeamJpaModule_JpaTransactionManagerComponent.builder() DaggerBeamJpaModule_JpaTransactionManagerComponent.builder()
.beamJpaModule(new BeamJpaModule(credentialFileUrl, cloudKmsProjectId)) .beamJpaModule(
new BeamJpaModule(credentialFileUrl, cloudKmsProjectId, isolationLevelOverride))
.build()); .build());
} }
} }

View file

@ -17,11 +17,9 @@ package google.registry.beam.initsql;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp; import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp;
import static google.registry.beam.initsql.BackupPaths.getExportFilePatterns; import static google.registry.beam.initsql.BackupPaths.getExportFilePatterns;
import static google.registry.model.ofy.ObjectifyService.ofy; import static google.registry.model.ofy.ObjectifyService.ofy;
import static google.registry.persistence.JpaRetries.isFailedTxnRetriable;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.persistence.transaction.TransactionManagerFactory.setJpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.setJpaTm;
import static google.registry.util.DateTimeUtils.START_OF_TIME; import static google.registry.util.DateTimeUtils.START_OF_TIME;
@ -38,27 +36,34 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams; import com.google.common.collect.Streams;
import com.googlecode.objectify.Key;
import google.registry.backup.AppEngineEnvironment; import google.registry.backup.AppEngineEnvironment;
import google.registry.backup.CommitLogImports; import google.registry.backup.CommitLogImports;
import google.registry.backup.VersionedEntity; import google.registry.backup.VersionedEntity;
import google.registry.model.domain.DomainBase; import google.registry.model.domain.DomainBase;
import google.registry.model.ofy.ObjectifyService; import google.registry.model.ofy.ObjectifyService;
import google.registry.model.reporting.HistoryEntry;
import google.registry.persistence.transaction.JpaTransactionManager; import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.schema.replay.DatastoreAndSqlEntity;
import google.registry.schema.replay.SqlEntity;
import google.registry.tools.LevelDbLogReader; import google.registry.tools.LevelDbLogReader;
import google.registry.util.SystemSleeper;
import java.io.Serializable; import java.io.Serializable;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier; import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO; import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile; import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Flatten;
@ -78,7 +83,6 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration;
/** /**
* {@link PTransform Pipeline transforms} used in pipelines that load from both Datastore export * {@link PTransform Pipeline transforms} used in pipelines that load from both Datastore export
@ -288,7 +292,7 @@ public final class Transforms {
maxWriters, maxWriters,
batchSize, batchSize,
jpaSupplier, jpaSupplier,
(e) -> ofy().toPojo(e.getEntity().get()), Transforms::convertVersionedEntityToSqlEntity,
TypeDescriptor.of(VersionedEntity.class)); TypeDescriptor.of(VersionedEntity.class));
} }
@ -329,11 +333,50 @@ public final class Transforms {
.apply("Batch output by shard " + transformId, GroupIntoBatches.ofSize(batchSize)) .apply("Batch output by shard " + transformId, GroupIntoBatches.ofSize(batchSize))
.apply( .apply(
"Write in batch for " + transformId, "Write in batch for " + transformId,
ParDo.of(new SqlBatchWriter<T>(jpaSupplier, jpaConverter))); ParDo.of(new SqlBatchWriter<T>(transformId, jpaSupplier, jpaConverter)));
} }
}; };
} }
private static Key toOfyKey(Object ofyEntity) {
return Key.create(ofyEntity);
}
private static boolean isMigratable(Entity entity) {
if (entity.getKind().equals("HistoryEntry")) {
// DOMAIN_APPLICATION_CREATE is deprecated type and should not be migrated.
// The Enum name DOMAIN_APPLICATION_CREATE no longer exists in Java and cannot
// be deserialized.
return !Objects.equals(entity.getProperty("type"), "DOMAIN_APPLICATION_CREATE");
}
return true;
}
private static SqlEntity toSqlEntity(Object ofyEntity) {
if (ofyEntity instanceof HistoryEntry) {
HistoryEntry ofyHistory = (HistoryEntry) ofyEntity;
return (SqlEntity) ofyHistory.toChildHistoryEntity();
}
return ((DatastoreAndSqlEntity) ofyEntity).toSqlEntity().get();
}
/**
* Converts a {@link VersionedEntity} to an JPA entity for persistence.
*
* @return An object to be persisted to SQL, or null if the input is not to be migrated. (Not
* using Optional in return because as a one-use method, we do not want to invest the effort
* to make Optional work with BEAM)
*/
@Nullable
private static Object convertVersionedEntityToSqlEntity(VersionedEntity dsEntity) {
return dsEntity
.getEntity()
.filter(Transforms::isMigratable)
.map(e -> ofy().toPojo(e))
.map(Transforms::toSqlEntity)
.orElse(null);
}
/** Interface for serializable {@link Supplier suppliers}. */ /** Interface for serializable {@link Supplier suppliers}. */
public interface SerializableSupplier<T> extends Supplier<T>, Serializable {} public interface SerializableSupplier<T> extends Supplier<T>, Serializable {}
@ -428,22 +471,22 @@ public final class Transforms {
private static int instanceCount = 0; private static int instanceCount = 0;
private static JpaTransactionManager originalJpa; private static JpaTransactionManager originalJpa;
private Counter counter;
private final SerializableSupplier<JpaTransactionManager> jpaSupplier; private final SerializableSupplier<JpaTransactionManager> jpaSupplier;
private final SerializableFunction<T, Object> jpaConverter; private final SerializableFunction<T, Object> jpaConverter;
private transient SystemSleeper sleeper;
SqlBatchWriter( SqlBatchWriter(
String type,
SerializableSupplier<JpaTransactionManager> jpaSupplier, SerializableSupplier<JpaTransactionManager> jpaSupplier,
SerializableFunction<T, Object> jpaConverter) { SerializableFunction<T, Object> jpaConverter) {
counter = Metrics.counter("SQL_WRITE", type);
this.jpaSupplier = jpaSupplier; this.jpaSupplier = jpaSupplier;
this.jpaConverter = jpaConverter; this.jpaConverter = jpaConverter;
} }
@Setup @Setup
public void setup() { public void setup() {
sleeper = new SystemSleeper();
try (AppEngineEnvironment env = new AppEngineEnvironment()) { try (AppEngineEnvironment env = new AppEngineEnvironment()) {
ObjectifyService.initOfy(); ObjectifyService.initOfy();
} }
@ -474,31 +517,29 @@ public final class Transforms {
ImmutableList<Object> ofyEntities = ImmutableList<Object> ofyEntities =
Streams.stream(kv.getValue()) Streams.stream(kv.getValue())
.map(this.jpaConverter::apply) .map(this.jpaConverter::apply)
// TODO(b/177340730): post migration delete the line below.
.filter(Objects::nonNull)
.collect(ImmutableList.toImmutableList()); .collect(ImmutableList.toImmutableList());
retry(() -> jpaTm().transact(() -> jpaTm().putAll(ofyEntities))); try {
jpaTm().transact(() -> jpaTm().putAll(ofyEntities));
counter.inc(ofyEntities.size());
} catch (RuntimeException e) {
processSingly(ofyEntities);
}
} }
} }
// TODO(b/160632289): Enhance Retrier and use it here. /**
private void retry(Runnable runnable) { * Writes entities in a failed batch one by one to identify the first bad entity and throws a
int maxAttempts = 5; * {@link RuntimeException} on it.
int initialDelayMillis = 100; */
double jitterRatio = 0.2; private void processSingly(ImmutableList<Object> ofyEntities) {
for (Object ofyEntity : ofyEntities) {
for (int attempt = 0; attempt < maxAttempts; attempt++) {
try { try {
runnable.run(); jpaTm().transact(() -> jpaTm().put(ofyEntity));
return; counter.inc();
} catch (Throwable throwable) { } catch (RuntimeException e) {
if (!isFailedTxnRetriable(throwable)) { throw new RuntimeException(toOfyKey(ofyEntity).toString(), e);
throwIfUnchecked(throwable);
throw new RuntimeException(throwable);
}
int sleepMillis = (1 << attempt) * initialDelayMillis;
int jitter =
ThreadLocalRandom.current().nextInt((int) (sleepMillis * jitterRatio))
- (int) (sleepMillis * jitterRatio / 2);
sleeper.sleepUninterruptibly(Duration.millis(sleepMillis + jitter));
} }
} }
} }

View file

@ -179,6 +179,7 @@ public class DomainBaseUtilTest {
.setNameservers(ImmutableSet.of()) .setNameservers(ImmutableSet.of())
.setDeletePollMessage(null) .setDeletePollMessage(null)
.setTransferData(null) .setTransferData(null)
.setGracePeriods(ImmutableSet.of())
.build(); .build();
DomainBase domainTransformedByUtil = DomainBase domainTransformedByUtil =
(DomainBase) ofy().toPojo(DomainBaseUtil.removeBillingAndPollAndHosts(domainEntity)); (DomainBase) ofy().toPojo(DomainBaseUtil.removeBillingAndPollAndHosts(domainEntity));
@ -199,6 +200,7 @@ public class DomainBaseUtilTest {
.setNameservers(ImmutableSet.of()) .setNameservers(ImmutableSet.of())
.setDeletePollMessage(null) .setDeletePollMessage(null)
.setTransferData(null) .setTransferData(null)
.setGracePeriods(ImmutableSet.of())
.build(); .build();
Entity entityWithoutFkeys = tm().transact(() -> ofy().toEntity(domainWithoutFKeys)); Entity entityWithoutFkeys = tm().transact(() -> ofy().toEntity(domainWithoutFKeys));
DomainBase domainTransformedByUtil = DomainBase domainTransformedByUtil =

View file

@ -17,9 +17,12 @@ package google.registry.beam.initsql;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static google.registry.model.ImmutableObjectSubject.assertAboutImmutableObjects; import static google.registry.model.ImmutableObjectSubject.assertAboutImmutableObjects;
import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence; import static google.registry.model.ImmutableObjectSubject.immutableObjectCorrespondence;
import static google.registry.model.domain.token.AllocationToken.TokenType.SINGLE_USE;
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm; import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
import static google.registry.testing.DatabaseHelper.newRegistry; import static google.registry.testing.DatabaseHelper.newRegistry;
import static google.registry.testing.DatabaseHelper.persistResource; import static google.registry.testing.DatabaseHelper.persistResource;
import static google.registry.testing.DatabaseHelper.persistSimpleResource;
import static google.registry.util.DateTimeUtils.END_OF_TIME;
import static google.registry.util.DateTimeUtils.START_OF_TIME; import static google.registry.util.DateTimeUtils.START_OF_TIME;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
@ -27,7 +30,10 @@ import com.google.common.collect.ImmutableSet;
import com.googlecode.objectify.Key; import com.googlecode.objectify.Key;
import google.registry.backup.AppEngineEnvironment; import google.registry.backup.AppEngineEnvironment;
import google.registry.beam.TestPipelineExtension; import google.registry.beam.TestPipelineExtension;
import google.registry.flows.domain.DomainFlowUtils;
import google.registry.model.billing.BillingEvent; import google.registry.model.billing.BillingEvent;
import google.registry.model.billing.BillingEvent.Flag;
import google.registry.model.billing.BillingEvent.Reason;
import google.registry.model.contact.ContactResource; import google.registry.model.contact.ContactResource;
import google.registry.model.domain.DesignatedContact; import google.registry.model.domain.DesignatedContact;
import google.registry.model.domain.DomainAuthInfo; import google.registry.model.domain.DomainAuthInfo;
@ -36,6 +42,7 @@ import google.registry.model.domain.GracePeriod;
import google.registry.model.domain.launch.LaunchNotice; import google.registry.model.domain.launch.LaunchNotice;
import google.registry.model.domain.rgp.GracePeriodStatus; import google.registry.model.domain.rgp.GracePeriodStatus;
import google.registry.model.domain.secdns.DelegationSignerData; import google.registry.model.domain.secdns.DelegationSignerData;
import google.registry.model.domain.token.AllocationToken;
import google.registry.model.eppcommon.AuthInfo.PasswordAuth; import google.registry.model.eppcommon.AuthInfo.PasswordAuth;
import google.registry.model.eppcommon.StatusValue; import google.registry.model.eppcommon.StatusValue;
import google.registry.model.eppcommon.Trid; import google.registry.model.eppcommon.Trid;
@ -43,6 +50,7 @@ import google.registry.model.host.HostResource;
import google.registry.model.ofy.Ofy; import google.registry.model.ofy.Ofy;
import google.registry.model.poll.PollMessage; import google.registry.model.poll.PollMessage;
import google.registry.model.registrar.Registrar; import google.registry.model.registrar.Registrar;
import google.registry.model.registrar.RegistrarContact;
import google.registry.model.registry.Registry; import google.registry.model.registry.Registry;
import google.registry.model.reporting.HistoryEntry; import google.registry.model.reporting.HistoryEntry;
import google.registry.model.transfer.DomainTransferData; import google.registry.model.transfer.DomainTransferData;
@ -58,6 +66,7 @@ import java.io.File;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.joda.money.Money;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Order;
@ -69,14 +78,24 @@ import org.junit.jupiter.api.io.TempDir;
class InitSqlPipelineTest { class InitSqlPipelineTest {
private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z"); private static final DateTime START_TIME = DateTime.parse("2000-01-01T00:00:00.0Z");
/**
* All kinds of entities to be set up in the Datastore. Must contain all kinds known to {@link
* InitSqlPipeline}.
*/
private static final ImmutableList<Class<?>> ALL_KINDS = private static final ImmutableList<Class<?>> ALL_KINDS =
ImmutableList.of( ImmutableList.of(
Registry.class, Registry.class,
Registrar.class, Registrar.class,
ContactResource.class, ContactResource.class,
HostResource.class, RegistrarContact.class,
DomainBase.class, DomainBase.class,
HistoryEntry.class); HostResource.class,
HistoryEntry.class,
AllocationToken.class,
BillingEvent.Recurring.class,
BillingEvent.OneTime.class,
BillingEvent.Cancellation.class,
PollMessage.class);
private transient FakeClock fakeClock = new FakeClock(START_TIME); private transient FakeClock fakeClock = new FakeClock(START_TIME);
@ -152,16 +171,75 @@ class InitSqlPipelineTest {
.setCreationClientId(registrar1.getClientId()) .setCreationClientId(registrar1.getClientId())
.setPersistedCurrentSponsorClientId(registrar1.getClientId()) .setPersistedCurrentSponsorClientId(registrar1.getClientId())
.build()); .build());
historyEntry = persistResource(new HistoryEntry.Builder().setParent(domainKey).build()); persistSimpleResource(
new RegistrarContact.Builder()
.setParent(registrar1)
.setName("John Abused")
.setEmailAddress("johnabuse@example.com")
.setVisibleInWhoisAsAdmin(true)
.setVisibleInWhoisAsTech(false)
.setPhoneNumber("+1.2125551213")
.setFaxNumber("+1.2125551213")
.setTypes(ImmutableSet.of(RegistrarContact.Type.ABUSE, RegistrarContact.Type.ADMIN))
.build());
historyEntry =
persistResource(
new HistoryEntry.Builder()
.setParent(domainKey)
.setModificationTime(fakeClock.nowUtc())
.setType(HistoryEntry.Type.DOMAIN_CREATE)
.build());
persistResource(
new AllocationToken.Builder().setToken("abc123").setTokenType(SINGLE_USE).build());
Key<HistoryEntry> historyEntryKey = Key.create(historyEntry); Key<HistoryEntry> historyEntryKey = Key.create(historyEntry);
Key<BillingEvent.OneTime> oneTimeBillKey = BillingEvent.OneTime onetimeBillEvent =
Key.create(historyEntryKey, BillingEvent.OneTime.class, 1); new BillingEvent.OneTime.Builder()
VKey<BillingEvent.Recurring> recurringBillKey = .setId(1)
VKey.from(Key.create(historyEntryKey, BillingEvent.Recurring.class, 2)); .setReason(Reason.RENEW)
VKey<PollMessage.Autorenew> autorenewPollKey = .setTargetId("example.com")
VKey.from(Key.create(historyEntryKey, PollMessage.Autorenew.class, 3)); .setClientId("TheRegistrar")
VKey<PollMessage.OneTime> onetimePollKey = .setCost(Money.parse("USD 44.00"))
VKey.from(Key.create(historyEntryKey, PollMessage.OneTime.class, 1)); .setPeriodYears(4)
.setEventTime(fakeClock.nowUtc())
.setBillingTime(fakeClock.nowUtc())
.setParent(historyEntryKey)
.build();
persistResource(onetimeBillEvent);
Key<BillingEvent.OneTime> oneTimeBillKey = Key.create(onetimeBillEvent);
BillingEvent.Recurring recurringBillEvent =
new BillingEvent.Recurring.Builder()
.setId(2)
.setReason(Reason.RENEW)
.setFlags(ImmutableSet.of(Flag.AUTO_RENEW))
.setTargetId("example.com")
.setClientId("TheRegistrar")
.setEventTime(fakeClock.nowUtc())
.setRecurrenceEndTime(END_OF_TIME)
.setParent(historyEntryKey)
.build();
persistResource(recurringBillEvent);
VKey<BillingEvent.Recurring> recurringBillKey = recurringBillEvent.createVKey();
PollMessage.Autorenew autorenewPollMessage =
new PollMessage.Autorenew.Builder()
.setId(3L)
.setTargetId("example.com")
.setClientId("TheRegistrar")
.setEventTime(fakeClock.nowUtc())
.setMsg("Domain was auto-renewed.")
.setParent(historyEntry)
.build();
persistResource(autorenewPollMessage);
VKey<PollMessage.Autorenew> autorenewPollKey = autorenewPollMessage.createVKey();
PollMessage.OneTime oneTimePollMessage =
new PollMessage.OneTime.Builder()
.setId(1L)
.setParent(historyEntry)
.setEventTime(fakeClock.nowUtc())
.setClientId("TheRegistrar")
.setMsg(DomainFlowUtils.COLLISION_MESSAGE)
.build();
persistResource(oneTimePollMessage);
VKey<PollMessage.OneTime> onetimePollKey = oneTimePollMessage.createVKey();
domain = domain =
persistResource( persistResource(
new DomainBase.Builder() new DomainBase.Builder()
@ -220,6 +298,16 @@ class InitSqlPipelineTest {
"TheRegistrar", "TheRegistrar",
null)) null))
.build()); .build());
persistResource(
new BillingEvent.Cancellation.Builder()
.setReason(Reason.RENEW)
.setTargetId(domain.getDomainName())
.setClientId(domain.getCurrentSponsorClientId())
.setEventTime(fakeClock.nowUtc())
.setBillingTime(fakeClock.nowUtc())
.setRecurringEventKey(recurringBillEvent.createVKey())
.setParent(historyEntryKey)
.build());
exportDir = store.export(exportRootDir.getAbsolutePath(), ALL_KINDS, ImmutableSet.of()); exportDir = store.export(exportRootDir.getAbsolutePath(), ALL_KINDS, ImmutableSet.of());
commitLogDir = Files.createDirectory(tmpDir.resolve("commits")).toFile(); commitLogDir = Files.createDirectory(tmpDir.resolve("commits")).toFile();
} }
@ -248,8 +336,7 @@ class InitSqlPipelineTest {
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(ContactResource.class))) assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(ContactResource.class)))
.comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp")) .comparingElementsUsing(immutableObjectCorrespondence("revisions", "updateTimestamp"))
.containsExactly(contact1, contact2); .containsExactly(contact1, contact2);
assertCleansedDomainEquals( assertDomainEquals(jpaTm().transact(() -> jpaTm().loadByKey(domain.createVKey())), domain);
jpaTm().transact(() -> jpaTm().loadByKey(domain.createVKey())), domain);
} }
} }
@ -261,29 +348,26 @@ class InitSqlPipelineTest {
.isEqualTo(expected.getSuperordinateDomain().getSqlKey()); .isEqualTo(expected.getSuperordinateDomain().getSqlKey());
} }
private static void assertCleansedDomainEquals(DomainBase actual, DomainBase expected) { private static void assertDomainEquals(DomainBase actual, DomainBase expected) {
assertAboutImmutableObjects() assertAboutImmutableObjects()
.that(actual) .that(actual)
.isEqualExceptFields( .isEqualExceptFields(
expected, expected,
"adminContact",
"registrantContact",
"gracePeriods",
"dsData",
"allContacts",
"revisions", "revisions",
"updateTimestamp", "updateTimestamp",
"autorenewBillingEvent",
"autorenewBillingEventHistoryId",
"autorenewPollMessage", "autorenewPollMessage",
"autorenewPollMessageHistoryId",
"deletePollMessage", "deletePollMessage",
"deletePollMessageHistoryId",
"nsHosts", "nsHosts",
"gracePeriods",
"transferData"); "transferData");
assertThat(actual.getAdminContact().getSqlKey()) assertThat(actual.getAdminContact().getSqlKey())
.isEqualTo(expected.getAdminContact().getSqlKey()); .isEqualTo(expected.getAdminContact().getSqlKey());
assertThat(actual.getRegistrant().getSqlKey()).isEqualTo(expected.getRegistrant().getSqlKey()); assertThat(actual.getRegistrant().getSqlKey()).isEqualTo(expected.getRegistrant().getSqlKey());
// TODO(weiminyu): compare gracePeriods, allContacts and dsData, when SQL model supports them. assertThat(actual.getNsHosts()).isEqualTo(expected.getNsHosts());
assertThat(actual.getAutorenewPollMessage().getOfyKey())
.isEqualTo(expected.getAutorenewPollMessage().getOfyKey());
assertThat(actual.getDeletePollMessage().getOfyKey())
.isEqualTo(expected.getDeletePollMessage().getOfyKey());
// TODO(weiminyu): check gracePeriods and transferData when it is easier to do
} }
} }

Binary file not shown.

Before

Width:  |  Height:  |  Size: 929 KiB

After

Width:  |  Height:  |  Size: 1.1 MiB

Before After
Before After