diff --git a/core/build.gradle b/core/build.gradle
index f918c353d..79f65dc0e 100644
--- a/core/build.gradle
+++ b/core/build.gradle
@@ -765,11 +765,6 @@ createUberJar(
// User should install gcloud and login to GCP before invoking this tasks.
if (environment == 'alpha') {
def pipelines = [
- initSql :
- [
- mainClass: 'google.registry.beam.initsql.InitSqlPipeline',
- metaData : 'google/registry/beam/init_sql_pipeline_metadata.json'
- ],
bulkDeleteDatastore:
[
mainClass: 'google.registry.beam.datastore.BulkDeleteDatastorePipeline',
@@ -888,48 +883,6 @@ task buildToolImage(dependsOn: nomulus, type: Exec) {
commandLine 'docker', 'build', '-t', 'nomulus-tool', '.'
}
-task generateInitSqlPipelineGraph(type: FilteringTest) {
- tests = ['InitSqlPipelineGraphTest.createPipeline_compareGraph']
- ignoreFailures = true
-}
-
-task updateInitSqlPipelineGraph(type: Copy) {
- def graphRelativePath = 'google/registry/beam/initsql/'
- from ("${projectDir}/build/resources/test/${graphRelativePath}") {
- include 'pipeline_curr.dot'
- rename 'curr', 'golden'
- }
- into "src/test/resources/${graphRelativePath}"
-
- dependsOn generateInitSqlPipelineGraph
-
- doLast {
- if (com.google.common.base.Strings.isNullOrEmpty(project.dot_path)) {
- getLogger().info('Property dot_path is null. Not creating image for pipeline graph.')
- }
- def dotPath = project.dot_path
- if (!new File(dotPath).exists()) {
- throw new RuntimeException(
- """\
- ${dotPath} not found. Make sure graphviz is installed
- and the dot_path property is set correctly."""
- .stripIndent())
- }
- def goldenGraph = "src/test/resources/${graphRelativePath}/pipeline_golden.dot"
- def goldenImage = "src/test/resources/${graphRelativePath}/pipeline_golden.png"
- def cmd = "${dotPath} -Tpng -o \"${goldenImage}\" \"${goldenGraph}\""
- try {
- rootProject.ext.execInBash(cmd, projectDir)
- } catch (Throwable throwable) {
- throw new RuntimeException(
- """\
- Failed to generate golden image with command ${cmd}
- Error: ${throwable.getMessage()}
- """)
- }
- }
-}
-
// Build the devtool jar.
createUberJar(
'devtool',
diff --git a/core/src/main/java/google/registry/backup/CommitLogImports.java b/core/src/main/java/google/registry/backup/CommitLogImports.java
deleted file mode 100644
index 9c98cdee6..000000000
--- a/core/src/main/java/google/registry/backup/CommitLogImports.java
+++ /dev/null
@@ -1,140 +0,0 @@
-// Copyright 2020 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.backup;
-
-import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.collect.ImmutableList.toImmutableList;
-import static google.registry.backup.BackupUtils.createDeserializingIterator;
-
-import com.google.common.collect.ImmutableList;
-import google.registry.model.ImmutableObject;
-import google.registry.model.annotations.DeleteAfterMigration;
-import google.registry.model.ofy.CommitLogCheckpoint;
-import google.registry.model.ofy.CommitLogManifest;
-import google.registry.model.ofy.CommitLogMutation;
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.Iterator;
-
-/**
- * Helpers for reading CommitLog records from a file.
- *
- *
This class is adapted from {@link RestoreCommitLogsAction}, and will be used in the initial
- * population of the Cloud SQL database.
- */
-@DeleteAfterMigration
-public final class CommitLogImports {
-
- private CommitLogImports() {}
-
- /**
- * Returns entities in an {@code inputStream} (from a single CommitLog file) as an {@link
- * ImmutableList} of {@link ImmutableList}s of {@link VersionedEntity} records where the inner
- * lists each consist of one transaction. Upon completion the {@code inputStream} is closed.
- *
- *
The returned list may be empty, since CommitLogs are written at fixed intervals regardless
- * if actual changes exist. Each sublist, however, will not be empty.
- *
- *
A CommitLog file starts with a {@link CommitLogCheckpoint}, followed by (repeated)
- * subsequences of [{@link CommitLogManifest}, [{@link CommitLogMutation}] ...]. Each subsequence
- * represents the changes in one transaction. The {@code CommitLogManifest} contains deleted
- * entity keys, whereas each {@code CommitLogMutation} contains one whole entity.
- */
- static ImmutableList> loadEntitiesByTransaction(
- InputStream inputStream) {
- try (InputStream input = new BufferedInputStream(inputStream)) {
- Iterator commitLogs = createDeserializingIterator(input, false);
- checkState(commitLogs.hasNext());
- checkState(commitLogs.next() instanceof CommitLogCheckpoint);
-
- ImmutableList.Builder> resultBuilder =
- new ImmutableList.Builder<>();
- ImmutableList.Builder currentTransactionBuilder =
- new ImmutableList.Builder<>();
-
- while (commitLogs.hasNext()) {
- ImmutableObject currentObject = commitLogs.next();
- if (currentObject instanceof CommitLogManifest) {
- // CommitLogManifest means we are starting a new transaction
- addIfNonempty(resultBuilder, currentTransactionBuilder);
- currentTransactionBuilder = new ImmutableList.Builder<>();
- VersionedEntity.fromManifest((CommitLogManifest) currentObject)
- .forEach(currentTransactionBuilder::add);
- } else if (currentObject instanceof CommitLogMutation) {
- currentTransactionBuilder.add(
- VersionedEntity.fromMutation((CommitLogMutation) currentObject));
- } else {
- throw new IllegalStateException(
- String.format("Unknown entity type %s in commit logs", currentObject.getClass()));
- }
- }
- // Add the last transaction in (if it's not empty)
- addIfNonempty(resultBuilder, currentTransactionBuilder);
- return resultBuilder.build();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Returns entities in an {@code inputStream} (from a single CommitLog file) as an {@link
- * ImmutableList} of {@link VersionedEntity} records. Upon completion the {@code inputStream} is
- * closed.
- *
- *
The returned list may be empty, since CommitLogs are written at fixed intervals regardless
- * if actual changes exist.
- *
- *
A CommitLog file starts with a {@link CommitLogCheckpoint}, followed by (repeated)
- * subsequences of [{@link CommitLogManifest}, [{@link CommitLogMutation}] ...]. Each subsequence
- * represents the changes in one transaction. The {@code CommitLogManifest} contains deleted
- * entity keys, whereas each {@code CommitLogMutation} contains one whole entity.
- */
- static ImmutableList loadEntities(InputStream inputStream) {
- return loadEntitiesByTransaction(inputStream).stream()
- .flatMap(ImmutableList::stream)
- .collect(toImmutableList());
- }
-
- /** Covenience method that adapts {@link #loadEntities(InputStream)} to a {@link File}. */
- public static ImmutableList loadEntities(File commitLogFile) {
- try {
- return loadEntities(new FileInputStream(commitLogFile));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Covenience method that adapts {@link #loadEntities(InputStream)} to a {@link
- * ReadableByteChannel}.
- */
- public static ImmutableList loadEntities(ReadableByteChannel channel) {
- return loadEntities(Channels.newInputStream(channel));
- }
-
- private static void addIfNonempty(
- ImmutableList.Builder> resultBuilder,
- ImmutableList.Builder currentTransactionBuilder) {
- ImmutableList currentTransaction = currentTransactionBuilder.build();
- if (!currentTransaction.isEmpty()) {
- resultBuilder.add(currentTransaction);
- }
- }
-}
diff --git a/core/src/main/java/google/registry/batch/RefreshDnsOnHostRenameAction.java b/core/src/main/java/google/registry/batch/RefreshDnsOnHostRenameAction.java
deleted file mode 100644
index 947250076..000000000
--- a/core/src/main/java/google/registry/batch/RefreshDnsOnHostRenameAction.java
+++ /dev/null
@@ -1,379 +0,0 @@
-// Copyright 2017 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.batch;
-
-import static com.google.appengine.api.taskqueue.QueueConstants.maxLeaseCount;
-import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.ImmutableList.toImmutableList;
-import static google.registry.batch.AsyncTaskEnqueuer.PARAM_HOST_KEY;
-import static google.registry.batch.AsyncTaskEnqueuer.PARAM_REQUESTED_TIME;
-import static google.registry.batch.AsyncTaskEnqueuer.QUEUE_ASYNC_HOST_RENAME;
-import static google.registry.batch.AsyncTaskMetrics.OperationType.DNS_REFRESH;
-import static google.registry.mapreduce.inputs.EppResourceInputs.createEntityInput;
-import static google.registry.model.EppResourceUtils.getLinkedDomainKeys;
-import static google.registry.model.EppResourceUtils.isActive;
-import static google.registry.model.EppResourceUtils.isDeleted;
-import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
-import static google.registry.util.DateTimeUtils.latestOf;
-import static java.util.concurrent.TimeUnit.DAYS;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static java.util.logging.Level.INFO;
-import static java.util.logging.Level.SEVERE;
-import static org.joda.time.Duration.standardHours;
-
-import com.google.appengine.api.taskqueue.LeaseOptions;
-import com.google.appengine.api.taskqueue.Queue;
-import com.google.appengine.api.taskqueue.TaskHandle;
-import com.google.appengine.api.taskqueue.TransientFailureException;
-import com.google.appengine.tools.mapreduce.Mapper;
-import com.google.appengine.tools.mapreduce.Reducer;
-import com.google.appengine.tools.mapreduce.ReducerInput;
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.flogger.FluentLogger;
-import google.registry.batch.AsyncTaskMetrics.OperationResult;
-import google.registry.dns.DnsQueue;
-import google.registry.mapreduce.MapreduceRunner;
-import google.registry.mapreduce.inputs.NullInput;
-import google.registry.model.annotations.DeleteAfterMigration;
-import google.registry.model.domain.DomainBase;
-import google.registry.model.host.HostResource;
-import google.registry.model.server.Lock;
-import google.registry.persistence.VKey;
-import google.registry.request.Action;
-import google.registry.request.Response;
-import google.registry.request.auth.Auth;
-import google.registry.util.Clock;
-import google.registry.util.NonFinalForTesting;
-import google.registry.util.RequestStatusChecker;
-import google.registry.util.Retrier;
-import google.registry.util.SystemClock;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Optional;
-import java.util.logging.Level;
-import javax.annotation.Nullable;
-import javax.inject.Inject;
-import javax.inject.Named;
-import org.apache.http.HttpStatus;
-import org.joda.time.DateTime;
-import org.joda.time.Duration;
-
-/** Performs batched DNS refreshes for applicable domains following a host rename. */
-@Action(
- service = Action.Service.BACKEND,
- path = "/_dr/task/refreshDnsOnHostRename",
- auth = Auth.AUTH_INTERNAL_OR_ADMIN)
-@DeleteAfterMigration
-@Deprecated
-public class RefreshDnsOnHostRenameAction implements Runnable {
-
- private static final FluentLogger logger = FluentLogger.forEnclosingClass();
- private static final Duration LEASE_LENGTH = standardHours(4);
-
- @Inject AsyncTaskMetrics asyncTaskMetrics;
- @Inject Clock clock;
- @Inject MapreduceRunner mrRunner;
- @Inject @Named(QUEUE_ASYNC_HOST_RENAME) Queue pullQueue;
-
- @Inject DnsQueue dnsQueue;
- @Inject RequestStatusChecker requestStatusChecker;
- @Inject Response response;
- @Inject Retrier retrier;
- @Inject RefreshDnsOnHostRenameAction() {}
-
- @Override
- public void run() {
- // Check if the lock can be acquired, and if not, a previous run of this mapreduce is still
- // executing, so return early.
- Optional lock =
- Lock.acquire(
- RefreshDnsOnHostRenameAction.class.getSimpleName(),
- null,
- LEASE_LENGTH,
- requestStatusChecker,
- false);
-
- if (!lock.isPresent()) {
- logRespondAndUnlock(INFO, "Can't acquire lock; aborting.", lock);
- return;
- }
-
- // Lease the async tasks to process.
- LeaseOptions options =
- LeaseOptions.Builder.withCountLimit(maxLeaseCount())
- .leasePeriod(LEASE_LENGTH.getStandardSeconds(), SECONDS);
- List tasks = pullQueue.leaseTasks(options);
- asyncTaskMetrics.recordDnsRefreshBatchSize(tasks.size());
-
- // Check if there are no tasks to process, and if so, return early.
- if (tasks.isEmpty()) {
- logRespondAndUnlock(
- INFO, "No DNS refresh on host rename tasks to process in pull queue; finishing.", lock);
- return;
- }
-
- ImmutableList.Builder requestsBuilder = new ImmutableList.Builder<>();
- ImmutableList.Builder> hostKeys = new ImmutableList.Builder<>();
- final List requestsToDelete = new ArrayList<>();
-
- for (TaskHandle task : tasks) {
- try {
- DnsRefreshRequest request = DnsRefreshRequest.createFromTask(task, clock.nowUtc());
- if (request.isRefreshNeeded()) {
- requestsBuilder.add(request);
- hostKeys.add(request.hostKey());
- } else {
- // Skip hosts that are deleted.
- requestsToDelete.add(request);
- }
- } catch (Exception e) {
- logger.atSevere().withCause(e).log(
- "Could not parse DNS refresh for host request, delaying task for a day: %s", task);
- // Grab the lease for a whole day, so it won't continue throwing errors every five minutes.
- pullQueue.modifyTaskLease(task, 1L, DAYS);
- }
- }
-
- deleteTasksWithRetry(
- requestsToDelete, pullQueue, asyncTaskMetrics, retrier, OperationResult.STALE);
- ImmutableList refreshRequests = requestsBuilder.build();
- if (refreshRequests.isEmpty()) {
- logRespondAndUnlock(
- INFO, "No async DNS refreshes to process because all renamed hosts are deleted.", lock);
- } else {
- logger.atInfo().log(
- "Processing asynchronous DNS refresh for renamed hosts: %s", hostKeys.build());
- if (tm().isOfy()) {
- runMapreduce(refreshRequests, lock);
- } else {
- try {
- refreshRequests.stream()
- .flatMap(
- request ->
- getLinkedDomainKeys(request.hostKey(), request.lastUpdateTime(), null)
- .stream())
- .distinct()
- .map(domainKey -> tm().transact(() -> tm().loadByKey(domainKey).getDomainName()))
- .forEach(
- domainName -> {
- retrier.callWithRetry(
- () -> dnsQueue.addDomainRefreshTask(domainName),
- TransientFailureException.class);
- logger.atInfo().log("Enqueued DNS refresh for domain '%s'.", domainName);
- });
- deleteTasksWithRetry(
- refreshRequests,
- getQueue(QUEUE_ASYNC_HOST_RENAME),
- asyncTaskMetrics,
- retrier,
- OperationResult.SUCCESS);
- } catch (Throwable t) {
- String message = "Error refreshing DNS on host rename.";
- logger.atSevere().withCause(t).log(message);
- response.setPayload(message);
- response.setStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR);
- } finally {
- lock.get().release();
- }
- }
- }
- }
-
- private void runMapreduce(ImmutableList refreshRequests, Optional lock) {
- try {
- mrRunner
- .setJobName("Enqueue DNS refreshes for domains referencing renamed hosts")
- .setModuleName("backend")
- .setDefaultReduceShards(1)
- .runMapreduce(
- new RefreshDnsOnHostRenameMapper(refreshRequests, retrier),
- new RefreshDnsOnHostRenameReducer(refreshRequests, lock.get(), retrier),
- // Add an extra NullInput so that the reducer always fires exactly once.
- ImmutableList.of(new NullInput<>(), createEntityInput(DomainBase.class)))
- .sendLinkToMapreduceConsole(response);
- } catch (Throwable t) {
- logRespondAndUnlock(
- SEVERE, "Error starting mapreduce to refresh DNS for renamed hosts.", lock);
- }
- }
-
- private void logRespondAndUnlock(Level level, String message, Optional lock) {
- logger.at(level).log(message);
- response.setPayload(message);
- lock.ifPresent(Lock::release);
- }
-
- /** Map over domains and refresh the DNS of those that reference the renamed hosts. */
- public static class RefreshDnsOnHostRenameMapper
- extends Mapper {
-
- private static final long serialVersionUID = -5261698524424335531L;
- private static final DnsQueue dnsQueue = DnsQueue.create();
-
- private final ImmutableList refreshRequests;
- private final Retrier retrier;
-
- RefreshDnsOnHostRenameMapper(
- ImmutableList refreshRequests, Retrier retrier) {
- this.refreshRequests = refreshRequests;
- this.retrier = retrier;
- }
-
- @Override
- public final void map(@Nullable final DomainBase domain) {
- if (domain == null) {
- // Emit a single value so that the reducer always runs. The key and value don't matter.
- emit(true, true);
- return;
- }
- VKey referencingHostKey = null;
- for (DnsRefreshRequest request : refreshRequests) {
- if (isActive(domain, request.lastUpdateTime())
- && domain.getNameservers().contains(request.hostKey())) {
- referencingHostKey = request.hostKey();
- break;
- }
- }
- if (referencingHostKey != null) {
- retrier.callWithRetry(
- () -> dnsQueue.addDomainRefreshTask(domain.getDomainName()),
- TransientFailureException.class);
- logger.atInfo().log(
- "Enqueued DNS refresh for domain %s referenced by host %s.",
- domain.getDomainName(), referencingHostKey);
- getContext().incrementCounter("domains refreshed");
- } else {
- getContext().incrementCounter("domains not refreshed");
- }
-
- // Don't catch errors -- we allow the mapreduce to terminate on any errors that can't be
- // resolved by retrying the transaction. The reducer only fires if the mapper completes
- // without errors, meaning that it is acceptable to delete all tasks.
- }
- }
-
- /**
- * A reducer that always fires exactly once.
- *
- *
This is really a reducer in name only; what it's really doing is waiting for all of the
- * mapper tasks to finish, and then delete the pull queue tasks. Note that this only happens if
- * the mapper completes execution without errors.
- */
- public static class RefreshDnsOnHostRenameReducer extends Reducer {
-
- private static final long serialVersionUID = 9077366205249562118L;
-
- @NonFinalForTesting
- private static AsyncTaskMetrics asyncTaskMetrics = new AsyncTaskMetrics(new SystemClock());
-
- private final Lock lock;
- private final Retrier retrier;
- private final List refreshRequests;
-
- RefreshDnsOnHostRenameReducer(
- List refreshRequests, Lock lock, Retrier retrier) {
- this.refreshRequests = refreshRequests;
- this.lock = lock;
- this.retrier = retrier;
- }
-
- @Override
- public void reduce(Boolean key, ReducerInput values) {
- // The reduce() method is run precisely once, because the NullInput caused the mapper to emit
- // a dummy value once.
- deleteTasksWithRetry(
- refreshRequests,
- getQueue(QUEUE_ASYNC_HOST_RENAME),
- asyncTaskMetrics,
- retrier,
- OperationResult.SUCCESS);
-
- lock.release();
- }
- }
-
- /** Deletes a list of tasks from the given queue using a retrier. */
- private static void deleteTasksWithRetry(
- final List refreshRequests,
- final Queue queue,
- AsyncTaskMetrics asyncTaskMetrics,
- Retrier retrier,
- OperationResult result) {
- if (refreshRequests.isEmpty()) {
- return;
- }
- final List tasks =
- refreshRequests.stream().map(DnsRefreshRequest::task).collect(toImmutableList());
- retrier.callWithRetry(() -> queue.deleteTask(tasks), TransientFailureException.class);
- refreshRequests.forEach(
- r -> asyncTaskMetrics.recordAsyncFlowResult(DNS_REFRESH, result, r.requestedTime()));
- }
-
- /** A class that encapsulates the values of a request to refresh DNS for a renamed host. */
- @AutoValue
- abstract static class DnsRefreshRequest implements Serializable {
-
- private static final long serialVersionUID = 1772812852271288622L;
-
- abstract VKey hostKey();
-
- abstract DateTime lastUpdateTime();
- abstract DateTime requestedTime();
- abstract boolean isRefreshNeeded();
- abstract TaskHandle task();
-
- @AutoValue.Builder
- abstract static class Builder {
- abstract Builder setHostKey(VKey hostKey);
-
- abstract Builder setLastUpdateTime(DateTime lastUpdateTime);
- abstract Builder setRequestedTime(DateTime requestedTime);
- abstract Builder setIsRefreshNeeded(boolean isRefreshNeeded);
- abstract Builder setTask(TaskHandle task);
- abstract DnsRefreshRequest build();
- }
-
- /**
- * Returns a packaged-up {@link DnsRefreshRequest} parsed from a task queue task.
- */
- static DnsRefreshRequest createFromTask(TaskHandle task, DateTime now) throws Exception {
- ImmutableMap params = ImmutableMap.copyOf(task.extractParams());
- VKey hostKey =
- VKey.create(checkNotNull(params.get(PARAM_HOST_KEY), "Host to refresh not specified"));
- HostResource host =
- tm().transact(() -> tm().loadByKeyIfPresent(hostKey))
- .orElseThrow(() -> new NoSuchElementException("Host to refresh doesn't exist"));
- boolean isHostDeleted =
- isDeleted(host, latestOf(now, host.getUpdateTimestamp().getTimestamp()));
- if (isHostDeleted) {
- logger.atInfo().log("Host %s is already deleted, not refreshing DNS.", hostKey);
- }
- return new AutoValue_RefreshDnsOnHostRenameAction_DnsRefreshRequest.Builder()
- .setHostKey(hostKey)
- .setLastUpdateTime(host.getUpdateTimestamp().getTimestamp())
- .setRequestedTime(
- DateTime.parse(
- checkNotNull(params.get(PARAM_REQUESTED_TIME), "Requested time not specified")))
- .setIsRefreshNeeded(!isHostDeleted)
- .setTask(task)
- .build();
- }
- }
-}
diff --git a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java
index d170bf543..03b3501a0 100644
--- a/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java
+++ b/core/src/main/java/google/registry/beam/common/RegistryJpaIO.java
@@ -23,8 +23,6 @@ import com.google.common.collect.Streams;
import google.registry.beam.common.RegistryQuery.CriteriaQuerySupplier;
import google.registry.model.UpdateAutoTimestamp;
import google.registry.model.UpdateAutoTimestamp.DisableAutoUpdateResource;
-import google.registry.model.common.DatabaseMigrationStateSchedule;
-import google.registry.model.replay.SqlEntity;
import google.registry.persistence.transaction.JpaTransactionManager;
import google.registry.persistence.transaction.TransactionManagerFactory;
import java.io.Serializable;
@@ -235,10 +233,6 @@ public final class RegistryJpaIO {
@ProcessElement
public void processElement(OutputReceiver outputReceiver) {
- // Preload the migration schedule into cache, otherwise the cache loading query may happen
- // before the setDatabaseSnapshot call in the transaction below, causing it to fail.
- DatabaseMigrationStateSchedule.get();
-
jpaTm()
.transactNoRetry(
() -> {
@@ -433,11 +427,23 @@ public final class RegistryJpaIO {
}
}
+ /** Returns this entity's primary key field(s) in a string. */
private String toEntityKeyString(Object entity) {
- if (entity instanceof SqlEntity) {
- return ((SqlEntity) entity).getPrimaryKeyString();
+ try {
+ return jpaTm()
+ .transact(
+ () ->
+ String.format(
+ "%s_%s",
+ entity.getClass().getSimpleName(),
+ jpaTm()
+ .getEntityManager()
+ .getEntityManagerFactory()
+ .getPersistenceUnitUtil()
+ .getIdentifier(entity)));
+ } catch (IllegalArgumentException e) {
+ return "Non-SqlEntity: " + entity;
}
- return "Non-SqlEntity: " + String.valueOf(entity);
}
}
}
diff --git a/core/src/main/java/google/registry/beam/initsql/BackupPaths.java b/core/src/main/java/google/registry/beam/initsql/BackupPaths.java
deleted file mode 100644
index ea5b37f62..000000000
--- a/core/src/main/java/google/registry/beam/initsql/BackupPaths.java
+++ /dev/null
@@ -1,119 +0,0 @@
-// Copyright 2020 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam.initsql;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Strings.isNullOrEmpty;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Streams;
-import google.registry.model.annotations.DeleteAfterMigration;
-import org.joda.time.DateTime;
-
-/**
- * Helpers for determining the fully qualified paths to Nomulus backup files. A backup consists of a
- * Datastore export and Nomulus CommitLogs that overlap with the export.
- */
-@DeleteAfterMigration
-public final class BackupPaths {
-
- private BackupPaths() {}
-
- private static final String WILDCARD_CHAR = "*";
- private static final String EXPORT_PATTERN_TEMPLATE = "%s/all_namespaces/kind_%s/output-%s";
-
- public static final String COMMIT_LOG_NAME_PREFIX = "commit_diff_until_";
- private static final String COMMIT_LOG_PATTERN_TEMPLATE = "%s/" + COMMIT_LOG_NAME_PREFIX + "*";
-
- /**
- * Pattern of the per-project file with Cloud SQL connection information. To get a concrete path,
- * user needs to provide the name of the environment, alpha, crash, sandbox, or production. This
- * file is meant for applications without access to secrets stored in Datastore.
- *
- *
In production, this is an base-64 encoded encrypted file with one line, which contains
- * space-separated values of Cloud SQL instance name, login, and password.
- *
- *
A plain text may be used for tests to a local database. Replace Cloud SQL instance name with
- * JDBC URL.
- */
- private static final String SQL_CONN_INFO_FILE_PATTERN =
- "gs://domain-registry-dev-deploy/cloudsql-credentials/%s/admin_credential.enc";
-
- private static final ImmutableSet ALLOWED_ENV =
- ImmutableSet.of("alpha", "crash", "sandbox", "production");
-
- /**
- * Returns a regex pattern that matches all Datastore export files of a given {@code kind}.
- *
- * @param exportDir path to the top directory of a Datastore export
- * @param kind the 'kind' of the Datastore entity
- */
- public static String getExportFileNamePattern(String exportDir, String kind) {
- checkArgument(!isNullOrEmpty(exportDir), "Null or empty exportDir.");
- checkArgument(!isNullOrEmpty(kind), "Null or empty kind.");
- return String.format(EXPORT_PATTERN_TEMPLATE, exportDir, kind, WILDCARD_CHAR);
- }
-
- /**
- * Returns an {@link ImmutableList} of regex patterns that match all Datastore export files of the
- * given {@code kinds}.
- *
- * @param exportDir path to the top directory of a Datastore export
- * @param kinds all entity 'kinds' to be matched
- */
- public static ImmutableList getExportFilePatterns(
- String exportDir, Iterable kinds) {
- checkNotNull(kinds, "kinds");
- return Streams.stream(kinds)
- .map(kind -> getExportFileNamePattern(exportDir, kind))
- .collect(ImmutableList.toImmutableList());
- }
-
- /**
- * Returns the fully qualified path of a Datastore export file with the given {@code kind} and
- * {@code shard}.
- *
- * @param exportDir path to the top directory of a Datastore export
- * @param kind the 'kind' of the Datastore entity
- * @param shard an integer suffix of the file name
- */
- public static String getExportFileNameByShard(String exportDir, String kind, int shard) {
- checkArgument(!isNullOrEmpty(exportDir), "Null or empty exportDir.");
- checkArgument(!isNullOrEmpty(kind), "Null or empty kind.");
- checkArgument(shard >= 0, "Negative shard %s not allowed.", shard);
- return String.format(EXPORT_PATTERN_TEMPLATE, exportDir, kind, shard);
- }
-
- /** Returns an {@link ImmutableList} of regex patterns that match all CommitLog files. */
- public static ImmutableList getCommitLogFilePatterns(String commitLogDir) {
- return ImmutableList.of(String.format(COMMIT_LOG_PATTERN_TEMPLATE, commitLogDir));
- }
-
- /** Gets the Commit timestamp from a CommitLog file name. */
- public static DateTime getCommitLogTimestamp(String fileName) {
- checkArgument(!isNullOrEmpty(fileName), "Null or empty fileName.");
- int start = fileName.lastIndexOf(COMMIT_LOG_NAME_PREFIX);
- checkArgument(start >= 0, "Illegal file name %s.", fileName);
- return DateTime.parse(fileName.substring(start + COMMIT_LOG_NAME_PREFIX.length()));
- }
-
- public static ImmutableList getCloudSQLCredentialFilePatterns(String environmentName) {
- checkArgument(
- ALLOWED_ENV.contains(environmentName), "Invalid environment name %s", environmentName);
- return ImmutableList.of(String.format(SQL_CONN_INFO_FILE_PATTERN, environmentName));
- }
-}
diff --git a/core/src/main/java/google/registry/beam/initsql/DomainBaseUtil.java b/core/src/main/java/google/registry/beam/initsql/DomainBaseUtil.java
deleted file mode 100644
index a646a3d50..000000000
--- a/core/src/main/java/google/registry/beam/initsql/DomainBaseUtil.java
+++ /dev/null
@@ -1,78 +0,0 @@
-// Copyright 2020 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam.initsql;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.appengine.api.datastore.Entity;
-import google.registry.model.annotations.DeleteAfterMigration;
-import java.util.Objects;
-
-/** Helper for manipulating {@code DomainBase} when migrating from Datastore to SQL database */
-@DeleteAfterMigration
-final class DomainBaseUtil {
-
- private DomainBaseUtil() {}
-
- /**
- * Removes properties that contain foreign keys from a Datastore {@link Entity} that represents an
- * Ofy {@link google.registry.model.domain.DomainBase}. This breaks the cycle of foreign key
- * constraints between entity kinds, allowing {@code DomainBases} to be inserted into the SQL
- * database. See {@link InitSqlPipeline} for a use case, where the full {@code DomainBases} are
- * written again during the last stage of the pipeline.
- *
- *
The returned object may be in bad state. Specifically, {@link
- * google.registry.model.eppcommon.StatusValue#INACTIVE} is not added after name servers are
- * removed. This only impacts tests that manipulate Datastore entities directly.
- *
- *
This operation is performed on an Datastore {@link Entity} instead of Ofy Java object
- * because Objectify requires access to a Datastore service when converting an Ofy object to a
- * Datastore {@code Entity}. If we insist on working with Objectify objects, we face a few
- * unsatisfactory options:
- *
- *
- *
Connect to our production Datastore, which incurs unnecessary security and code health
- * risk.
- *
Connect to a separate real Datastore instance, which is a waster and overkill.
- *
Use an in-memory test Datastore, which is a project health risk in that the test
- * Datastore would be added to Nomulus' production binary unless we create a separate
- * project for this pipeline.
- *
- *
- *
Given our use case, operating on Datastore entities is the best option.
- *
- * @throws IllegalArgumentException if input does not represent a DomainBase
- */
- static Entity removeBillingAndPollAndHosts(Entity domainBase) {
- checkNotNull(domainBase, "domainBase");
- checkArgument(
- Objects.equals(domainBase.getKind(), "DomainBase"),
- "Expecting DomainBase, got %s",
- domainBase.getKind());
- Entity clone = domainBase.clone();
- clone.removeProperty("autorenewBillingEvent");
- clone.removeProperty("autorenewPollMessage");
- clone.removeProperty("deletePollMessage");
- clone.removeProperty("nsHosts");
- domainBase.getProperties().keySet().stream()
- .filter(s -> s.startsWith("transferData."))
- .forEach(s -> clone.removeProperty(s));
- domainBase.getProperties().keySet().stream()
- .filter(s -> s.startsWith("gracePeriods."))
- .forEach(s -> clone.removeProperty(s));
- return clone;
- }
-}
diff --git a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java b/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java
deleted file mode 100644
index 3f17dcc3e..000000000
--- a/core/src/main/java/google/registry/beam/initsql/InitSqlPipeline.java
+++ /dev/null
@@ -1,242 +0,0 @@
-// Copyright 2020 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam.initsql;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.googlecode.objectify.Key;
-import google.registry.backup.VersionedEntity;
-import google.registry.beam.common.RegistryJpaIO;
-import google.registry.beam.initsql.Transforms.RemoveDomainBaseForeignKeys;
-import google.registry.model.annotations.DeleteAfterMigration;
-import google.registry.model.billing.BillingEvent;
-import google.registry.model.common.Cursor;
-import google.registry.model.contact.ContactResource;
-import google.registry.model.domain.DomainBase;
-import google.registry.model.domain.token.AllocationToken;
-import google.registry.model.host.HostResource;
-import google.registry.model.poll.PollMessage;
-import google.registry.model.registrar.Registrar;
-import google.registry.model.registrar.RegistrarContact;
-import google.registry.model.reporting.HistoryEntry;
-import google.registry.model.tld.Registry;
-import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Optional;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Wait;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.DateTime;
-
-/**
- * A BEAM pipeline that populates a SQL database with data from a Datastore backup.
- *
- *
This pipeline migrates EPP resources and related entities that cross-reference each other. To
- * avoid violating foreign key constraints, writes to SQL are ordered by entity kinds. In addition,
- * the {@link DomainBase} kind is written twice (see details below). The write order is presented
- * below. Although some kinds can be written concurrently, e.g. {@code ContactResource} and {@code
- * RegistrarContact}, we do not expect any performance benefit since the limiting resource is the
- * number of JDBC connections. Google internal users may refer to the design doc for more information.
- *
- *
- *
{@link Registry}: Assumes that {@code PremiumList} and {@code ReservedList} have been set
- * up in the SQL database.
- *
{@link Cursor}: Logically can depend on {@code Registry}, but without foreign key.
- *
{@link Registrar}: Logically depends on {@code Registry}, Foreign key not modeled yet.
- *
{@link HistoryEntry}: maps to one of three SQL entity types and may reference {@code
- * Registrar}, {@code ContactResource}, {@code HostResource}, and {@code DomainBase}.
- *
{@link DomainBase}, original copy from Datastore.
- *
- *
- *
This pipeline expects that the source Datastore has at least one entity in each of the types
- * above. This assumption allows us to construct a simpler pipeline graph that can be visually
- * examined, and is true in all intended use cases. However, tests must not violate this assumption
- * when setting up data, otherwise they may run into foreign key constraint violations. The reason
- * is that this pipeline uses the {@link Wait} transform to order the persistence by entity type.
- * However, the wait is skipped if the target type has no data, resulting in subsequent entity types
- * starting prematurely. E.g., if a Datastore has no {@code RegistrarContact} entities, the pipeline
- * may start writing {@code DomainBase} entities before all {@code Registry}, {@code Registrar} and
- * {@code ContactResource} entities have been persisted.
- */
-@DeleteAfterMigration
-public class InitSqlPipeline implements Serializable {
-
- /**
- * Datastore kinds to be written to the SQL database before the cleansed version of {@link
- * DomainBase}.
- */
- private static final ImmutableList> PHASE_ONE_ORDERED =
- ImmutableList.of(
- Registry.class,
- Cursor.class,
- Registrar.class,
- ContactResource.class,
- RegistrarContact.class);
-
- /**
- * Datastore kinds to be written to the SQL database after the cleansed version of {@link
- * DomainBase}.
- */
- private static final ImmutableList> PHASE_TWO_ORDERED =
- ImmutableList.of(
- HostResource.class,
- HistoryEntry.class,
- AllocationToken.class,
- BillingEvent.Recurring.class,
- BillingEvent.OneTime.class,
- BillingEvent.Cancellation.class,
- PollMessage.class,
- DomainBase.class);
-
- private final InitSqlPipelineOptions options;
-
- InitSqlPipeline(InitSqlPipelineOptions options) {
- this.options = options;
- }
-
- PipelineResult run() {
- return run(Pipeline.create(options));
- }
-
- @VisibleForTesting
- PipelineResult run(Pipeline pipeline) {
- setupPipeline(pipeline);
- return pipeline.run();
- }
-
- @VisibleForTesting
- void setupPipeline(Pipeline pipeline) {
- options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_UNCOMMITTED);
- PCollectionTuple datastoreSnapshot =
- pipeline.apply(
- "Load Datastore snapshot",
- Transforms.loadDatastoreSnapshot(
- options.getDatastoreExportDir(),
- options.getCommitLogDir(),
- DateTime.parse(options.getCommitLogStartTimestamp()),
- DateTime.parse(options.getCommitLogEndTimestamp()),
- ImmutableSet.builder()
- .add("DomainBase")
- .addAll(toKindStrings(PHASE_ONE_ORDERED))
- .addAll(toKindStrings(PHASE_TWO_ORDERED))
- .build()));
-
- // Set up the pipeline to write entity kinds from PHASE_ONE_ORDERED to SQL. Return an object
- // that signals the completion of the phase.
- PCollection blocker =
- scheduleOnePhaseWrites(datastoreSnapshot, PHASE_ONE_ORDERED, Optional.empty(), null);
- blocker =
- writeToSql(
- "DomainBase without circular foreign keys",
- removeDomainBaseForeignKeys(datastoreSnapshot)
- .apply("Wait on phase one", Wait.on(blocker)));
- // Set up the pipeline to write entity kinds from PHASE_TWO_ORDERED to SQL. This phase won't
- // start until all cleansed DomainBases have been written (started by line above).
- scheduleOnePhaseWrites(
- datastoreSnapshot, PHASE_TWO_ORDERED, Optional.of(blocker), "DomainBaseNoFkeys");
- }
-
- private PCollection removeDomainBaseForeignKeys(
- PCollectionTuple datastoreSnapshot) {
- PCollection domainBases =
- datastoreSnapshot.get(Transforms.createTagForKind("DomainBase"));
- return domainBases.apply(
- "Remove circular foreign keys from DomainBase",
- ParDo.of(new RemoveDomainBaseForeignKeys()));
- }
-
- /**
- * Sets up the pipeline to write entities in {@code entityClasses} to SQL. Entities are written
- * one kind at a time based on each kind's position in {@code entityClasses}. Concurrency exists
- * within each kind.
- *
- * @param datastoreSnapshot the Datastore snapshot of all data to be migrated to SQL
- * @param entityClasses the entity types in write order
- * @param blockingPCollection the pipeline stage that blocks this phase
- * @param blockingTag description of the stage (if exists) that blocks this phase. Needed for
- * generating unique transform ids
- * @return the output {@code PCollection} from the writing of the last entity kind. Other parts of
- * the pipeline can {@link Wait} on this object
- */
- private PCollection scheduleOnePhaseWrites(
- PCollectionTuple datastoreSnapshot,
- Collection> entityClasses,
- Optional> blockingPCollection,
- String blockingTag) {
- checkArgument(!entityClasses.isEmpty(), "Each phase must have at least one kind.");
- ImmutableList> tags =
- toKindStrings(entityClasses).stream()
- .map(Transforms::createTagForKind)
- .collect(ImmutableList.toImmutableList());
-
- PCollection prev = blockingPCollection.orElse(null);
- String prevTag = blockingTag;
- for (TupleTag tag : tags) {
- PCollection curr = datastoreSnapshot.get(tag);
- if (prev != null) {
- curr = curr.apply("Wait on " + prevTag, Wait.on(prev));
- }
- prev = writeToSql(tag.getId(), curr);
- prevTag = tag.getId();
- }
- return prev;
- }
-
- private PCollection writeToSql(String transformId, PCollection data) {
- return data.apply(
- "Write to Sql: " + transformId,
- RegistryJpaIO.write()
- .withName(transformId)
- .withBatchSize(options.getSqlWriteBatchSize())
- .withShards(options.getSqlWriteShards())
- .withJpaConverter(Transforms::convertVersionedEntityToSqlEntity)
- .disableUpdateAutoTimestamp());
- }
-
- private static ImmutableList toKindStrings(Collection> entityClasses) {
- return entityClasses.stream().map(Key::getKind).collect(ImmutableList.toImmutableList());
- }
-
- public static void main(String[] args) {
- InitSqlPipelineOptions options =
- PipelineOptionsFactory.fromArgs(args).withValidation().as(InitSqlPipelineOptions.class);
- new InitSqlPipeline(options).run();
- }
-}
diff --git a/core/src/main/java/google/registry/beam/initsql/InitSqlPipelineOptions.java b/core/src/main/java/google/registry/beam/initsql/InitSqlPipelineOptions.java
deleted file mode 100644
index b4f3a1940..000000000
--- a/core/src/main/java/google/registry/beam/initsql/InitSqlPipelineOptions.java
+++ /dev/null
@@ -1,47 +0,0 @@
-// Copyright 2020 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam.initsql;
-
-import google.registry.beam.common.RegistryPipelineOptions;
-import google.registry.model.annotations.DeleteAfterMigration;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.Validation;
-
-/** Pipeline options for {@link InitSqlPipeline} */
-@DeleteAfterMigration
-public interface InitSqlPipelineOptions extends RegistryPipelineOptions {
-
- @Description("The root directory of the export to load.")
- String getDatastoreExportDir();
-
- void setDatastoreExportDir(String datastoreExportDir);
-
- @Description("The directory that contains all CommitLog files.")
- String getCommitLogDir();
-
- void setCommitLogDir(String commitLogDir);
-
- @Description("The earliest CommitLogs to load, in ISO8601 format.")
- @Validation.Required
- String getCommitLogStartTimestamp();
-
- void setCommitLogStartTimestamp(String commitLogStartTimestamp);
-
- @Description("The latest CommitLogs to load, in ISO8601 format.")
- @Validation.Required
- String getCommitLogEndTimestamp();
-
- void setCommitLogEndTimestamp(String commitLogEndTimestamp);
-}
diff --git a/core/src/main/java/google/registry/beam/initsql/README.md b/core/src/main/java/google/registry/beam/initsql/README.md
deleted file mode 100644
index 19c54200e..000000000
--- a/core/src/main/java/google/registry/beam/initsql/README.md
+++ /dev/null
@@ -1,17 +0,0 @@
-## Summary
-
-This package contains a BEAM pipeline that populates a Cloud SQL database from a
-Datastore backup. The pipeline uses an unsynchronized Datastore export and
-overlapping CommitLogs generated by the Nomulus server to recreate a consistent
-Datastore snapshot, and writes the data to a Cloud SQL instance.
-
-## Pipeline Visualization
-
-The golden flow graph of the InitSqlPipeline is saved both as a text-base
-[DOT file](../../../../../../test/resources/google/registry/beam/initsql/pipeline_golden.dot)
-and a
-[.png file](../../../../../../test/resources/google/registry/beam/initsql/pipeline_golden.png).
-A test compares the flow graph of the current pipeline with the golden graph,
-and will fail if changes are detected. When this happens, run the Gradle task
-':core:updateInitSqlPipelineGraph' to update the golden files and review the
-changes.
diff --git a/core/src/main/java/google/registry/beam/initsql/Transforms.java b/core/src/main/java/google/registry/beam/initsql/Transforms.java
deleted file mode 100644
index fa969592b..000000000
--- a/core/src/main/java/google/registry/beam/initsql/Transforms.java
+++ /dev/null
@@ -1,485 +0,0 @@
-// Copyright 2020 The Nomulus Authors. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package google.registry.beam.initsql;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static google.registry.beam.initsql.BackupPaths.getCommitLogTimestamp;
-import static google.registry.beam.initsql.BackupPaths.getExportFilePatterns;
-import static google.registry.model.ofy.ObjectifyService.auditedOfy;
-import static google.registry.util.DateTimeUtils.START_OF_TIME;
-import static google.registry.util.DateTimeUtils.isBeforeOrAt;
-import static google.registry.util.DomainNameUtils.canonicalizeHostname;
-import static java.util.Comparator.comparing;
-import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
-import static org.apache.beam.sdk.values.TypeDescriptors.strings;
-
-import avro.shaded.com.google.common.collect.Iterators;
-import com.google.appengine.api.datastore.Entity;
-import com.google.appengine.api.datastore.EntityTranslator;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Streams;
-import google.registry.backup.CommitLogImports;
-import google.registry.backup.VersionedEntity;
-import google.registry.model.annotations.DeleteAfterMigration;
-import google.registry.model.billing.BillingEvent.Flag;
-import google.registry.model.billing.BillingEvent.Reason;
-import google.registry.model.domain.DomainBase;
-import google.registry.model.replay.DatastoreAndSqlEntity;
-import google.registry.model.replay.SqlEntity;
-import google.registry.model.reporting.HistoryEntry;
-import google.registry.tools.LevelDbLogReader;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Supplier;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.Compression;
-import org.apache.beam.sdk.io.FileIO;
-import org.apache.beam.sdk.io.FileIO.ReadableFile;
-import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ProcessFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.joda.time.DateTime;
-
-/**
- * {@link PTransform Pipeline transforms} used in pipelines that load from both Datastore export
- * files and Nomulus CommitLog files.
- */
-@DeleteAfterMigration
-public final class Transforms {
-
- private Transforms() {}
-
- /**
- * The commitTimestamp assigned to all entities loaded from a Datastore export file. The exact
- * value does not matter, but it must be lower than the timestamps of real CommitLog records.
- */
- @VisibleForTesting static final long EXPORT_ENTITY_TIME_STAMP = START_OF_TIME.getMillis();
-
- /**
- * Returns a {@link TupleTag} that can be used to retrieve entities of the given {@code kind} from
- * the Datastore snapshot returned by {@link #loadDatastoreSnapshot}.
- */
- public static TupleTag createTagForKind(String kind) {
- // When used with PCollectionTuple the result must retain generic type information.
- // Both the Generic param and the empty bracket below are important.
- return new TupleTag(Transforms.class.getSimpleName() + ":" + kind) {};
- }
-
- /**
- * Composite {@link PTransform transform} that loads the Datastore snapshot right before {@code
- * commitLogToTime} for caller specified {@code kinds}. The resulting snapshot has all changes
- * that happened before {@code commitLogToTime}, and none at or after {@code commitLogToTime}.
- *
- *
Caller must provide the location of a Datastore export that started AFTER {@code
- * commitLogFromTime} and completed BEFORE {@code commitLogToTime}, as well as the root directory
- * of all CommitLog files.
- *
- *
Selection of {@code commitLogFromTime} and {@code commitLogToTime} should follow the
- * guidelines below to ensure that all incremental changes concurrent with the export are covered:
- *
- *
- *
Two or more CommitLogs should exist between {@code commitLogFromTime} and the starting
- * time of the Datastore export. This ensures that the earlier CommitLog file was complete
- * before the export started.
- *
Two or more CommitLogs should exit between the export completion time and {@code
- * commitLogToTime}.
- *
- *
- *
The output from the returned transform is a {@link PCollectionTuple} consisting of {@link
- * VersionedEntity VersionedEntities} grouped into {@link PCollection PCollections} by {@code
- * kind}.
- */
- public static PTransform loadDatastoreSnapshot(
- String exportDir,
- String commitLogDir,
- DateTime commitLogFromTime,
- DateTime commitLogToTime,
- Set kinds) {
- checkArgument(kinds != null && !kinds.isEmpty(), "At least one kind is expected.");
-
- // Create tags to collect entities by kind in final step.
- final ImmutableMap> outputTags =
- kinds.stream()
- .collect(ImmutableMap.toImmutableMap(kind -> kind, Transforms::createTagForKind));
- // Arbitrarily select one tag as mainOutTag and put the remaining ones in a TupleTagList.
- // This separation is required by ParDo's config API.
- Iterator> tagsIt = outputTags.values().iterator();
- final TupleTag mainOutputTag = tagsIt.next();
- final TupleTagList additionalTags = TupleTagList.of(ImmutableList.copyOf(tagsIt));
-
- return new PTransform() {
- @Override
- public PCollectionTuple expand(PBegin input) {
- PCollection exportedEntities =
- input
- .apply("Get export file patterns", getDatastoreExportFilePatterns(exportDir, kinds))
- .apply("Find export files", getFilesByPatterns())
- .apply("Load export data", loadExportDataFromFiles());
- PCollection commitLogEntities =
- input
- .apply("Get commitlog file patterns", getCommitLogFilePatterns(commitLogDir))
- .apply("Find commitlog files", getFilesByPatterns())
- .apply(
- "Filter commitLog by time",
- filterCommitLogsByTime(commitLogFromTime, commitLogToTime))
- .apply("Load commitlog data", loadCommitLogsFromFiles(kinds));
- return PCollectionList.of(exportedEntities)
- .and(commitLogEntities)
- .apply("Merge exports and CommitLogs", Flatten.pCollections())
- .apply(
- "Key entities by Datastore Keys",
- // Converting to KV instead of KV b/c default coder for Key
- // (SerializableCoder) is not deterministic and cannot be used with GroupBy.
- MapElements.into(kvs(strings(), TypeDescriptor.of(VersionedEntity.class)))
- .via((VersionedEntity e) -> KV.of(e.key().toString(), e)))
- .apply("Gather entities by key", GroupByKey.create())
- .apply(
- "Output latest version per entity",
- ParDo.of(
- new DoFn>, VersionedEntity>() {
- @ProcessElement
- public void processElement(
- @Element KV> kv,
- MultiOutputReceiver out) {
- Optional latest =
- Streams.stream(kv.getValue())
- .sorted(comparing(VersionedEntity::commitTimeMills).reversed())
- .findFirst();
- // Throw to abort (after default retries). Investigate, fix, and rerun.
- checkState(
- latest.isPresent(), "Unexpected key with no data", kv.getKey());
- if (latest.get().isDelete()) {
- return;
- }
- String kind = latest.get().getEntity().get().getKind();
- out.get(outputTags.get(kind)).output(latest.get());
- }
- })
- .withOutputTags(mainOutputTag, additionalTags));
- }
- };
- }
-
- /**
- * Returns a {@link PTransform transform} that can generate a collection of patterns that match
- * all Datastore CommitLog files.
- */
- public static PTransform> getCommitLogFilePatterns(
- String commitLogDir) {
- return toStringPCollection(BackupPaths.getCommitLogFilePatterns(commitLogDir));
- }
-
- /**
- * Returns a {@link PTransform transform} that can generate a collection of patterns that match
- * all Datastore export files of the given {@code kinds}.
- */
- public static PTransform> getDatastoreExportFilePatterns(
- String exportDir, Collection kinds) {
- return toStringPCollection(getExportFilePatterns(exportDir, kinds));
- }
-
- public static PTransform> getCloudSqlConnectionInfoFilePatterns(
- String gcpProjectName) {
- return toStringPCollection(BackupPaths.getCloudSQLCredentialFilePatterns(gcpProjectName));
- }
-
- /**
- * Returns a {@link PTransform} from file name patterns to file {@link Metadata Metadata records}.
- */
- public static PTransform, PCollection> getFilesByPatterns() {
- return new PTransform, PCollection>() {
- @Override
- public PCollection expand(PCollection input) {
- return input.apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW));
- }
- };
- }
-
- /**
- * Returns CommitLog files with timestamps between {@code fromTime} (inclusive) and {@code
- * endTime} (exclusive).
- */
- public static PTransform, PCollection>
- filterCommitLogsByTime(DateTime fromTime, DateTime toTime) {
- return ParDo.of(new FilterCommitLogFileByTime(fromTime, toTime));
- }
-
- /** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */
- public static PTransform, PCollection>
- loadExportDataFromFiles() {
- return processFiles(
- new BackupFileReader(
- file ->
- Iterators.transform(
- LevelDbLogReader.from(file.open()),
- (byte[] bytes) -> VersionedEntity.from(EXPORT_ENTITY_TIME_STAMP, bytes))));
- }
-
- /** Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity}. */
- public static PTransform, PCollection>
- loadCommitLogsFromFiles(Set kinds) {
- return processFiles(
- new BackupFileReader(
- file ->
- CommitLogImports.loadEntities(file.open()).stream()
- .filter(e -> kinds.contains(e.key().getKind()))
- .iterator()));
- }
-
- // Production data repair configs go below. See b/185954992. Note that the CommitLog replay
- // process does not filter out the ignored entities listed below, a mistake that we do not fix
- // for operational convenience. Instead, the Database comparison tool will filter them out. See
- // ValidateSqlUtils.java for more information.
-
- // Prober domains in bad state, without associated contacts, hosts, billings, and non-synthesized
- // history. They can be safely ignored.
- public static final ImmutableSet IGNORED_DOMAINS =
- ImmutableSet.of("6AF6D2-IQCANT", "2-IQANYT");
-
- // Prober hosts referencing phantom registrars. They and their associated history entries can be
- // safely ignored.
- public static final ImmutableSet IGNORED_HOSTS =
- ImmutableSet.of(
- "4E21_WJ0TEST-GOOGLE",
- "4E21_WJ1TEST-GOOGLE",
- "4E21_WJ2TEST-GOOGLE",
- "4E21_WJ3TEST-GOOGLE");
-
- // Prober contacts referencing phantom registrars. They and their associated history entries can
- // be safely ignored.
- public static final ImmutableSet IGNORED_CONTACTS =
- ImmutableSet.of(
- "1_WJ0TEST-GOOGLE", "1_WJ1TEST-GOOGLE", "1_WJ2TEST-GOOGLE", "1_WJ3TEST-GOOGLE");
-
- private static boolean isMigratable(Entity entity) {
- // Checks specific to production data. See b/185954992 for details.
- // The names of these bad entities in production do not conflict with other environments. For
- // simplicities sake we apply them regardless of the source of the data.
- if (entity.getKind().equals("DomainBase")
- && IGNORED_DOMAINS.contains(entity.getKey().getName())) {
- return false;
- }
- if (entity.getKind().equals("ContactResource")) {
- String roid = entity.getKey().getName();
- return !IGNORED_CONTACTS.contains(roid);
- }
- if (entity.getKind().equals("HostResource")) {
- String roid = entity.getKey().getName();
- return !IGNORED_HOSTS.contains(roid);
- }
- if (entity.getKind().equals("HistoryEntry")) {
- // DOMAIN_APPLICATION_CREATE is deprecated type and should not be migrated.
- // The Enum name DOMAIN_APPLICATION_CREATE no longer exists in Java and cannot
- // be deserialized.
- if (Objects.equals(entity.getProperty("type"), "DOMAIN_APPLICATION_CREATE")) {
- return false;
- }
-
- // Remove production bad data: Histories of ignored EPP resources:
- com.google.appengine.api.datastore.Key parentKey = entity.getKey().getParent();
- if (parentKey.getKind().equals("ContactResource")) {
- String contactRoid = parentKey.getName();
- return !IGNORED_CONTACTS.contains(contactRoid);
- }
- if (parentKey.getKind().equals("HostResource")) {
- String hostRoid = parentKey.getName();
- return !IGNORED_HOSTS.contains(hostRoid);
- }
- if (parentKey.getKind().equals("DomainBase")) {
- String domainRoid = parentKey.getName();
- return !IGNORED_DOMAINS.contains(domainRoid);
- }
- }
- return true;
- }
-
- @VisibleForTesting
- static Entity repairBadData(Entity entity) {
- if (entity.getKind().equals("Cancellation")
- && Objects.equals(entity.getProperty("reason"), "AUTO_RENEW")) {
- // AUTO_RENEW has been moved from 'reason' to flags. Change reason to RENEW and add the
- // AUTO_RENEW flag. Note: all affected entities have empty flags so we can simply assign
- // instead of append. See b/185954992.
- entity.setUnindexedProperty("reason", Reason.RENEW.name());
- entity.setUnindexedProperty("flags", ImmutableList.of(Flag.AUTO_RENEW.name()));
- } else if (entity.getKind().equals("DomainBase")) {
- // Canonicalize old domain/host names from 2016 and earlier before we were enforcing this.
- entity.setIndexedProperty(
- "fullyQualifiedDomainName",
- canonicalizeHostname((String) entity.getProperty("fullyQualifiedDomainName")));
- } else if (entity.getKind().equals("HostResource")) {
- entity.setIndexedProperty(
- "fullyQualifiedHostName",
- canonicalizeHostname((String) entity.getProperty("fullyQualifiedHostName")));
- }
- return entity;
- }
-
- private static SqlEntity toSqlEntity(Object ofyEntity) {
- if (ofyEntity instanceof HistoryEntry) {
- HistoryEntry ofyHistory = (HistoryEntry) ofyEntity;
- return (SqlEntity) ofyHistory.toChildHistoryEntity();
- }
- return ((DatastoreAndSqlEntity) ofyEntity).toSqlEntity().get();
- }
-
- /**
- * Converts a {@link VersionedEntity} to an JPA entity for persistence.
- *
- * @return An object to be persisted to SQL, or null if the input is not to be migrated. (Not
- * using Optional in return because as a one-use method, we do not want to invest the effort
- * to make Optional work with BEAM)
- */
- @Nullable
- public static SqlEntity convertVersionedEntityToSqlEntity(VersionedEntity dsEntity) {
- return dsEntity
- .getEntity()
- .filter(Transforms::isMigratable)
- .map(Transforms::repairBadData)
- .map(e -> auditedOfy().toPojo(e))
- .map(Transforms::toSqlEntity)
- .orElse(null);
- }
-
- /** Interface for serializable {@link Supplier suppliers}. */
- public interface SerializableSupplier extends Supplier, Serializable {}
-
- /**
- * Returns a {@link PTransform} that produces a {@link PCollection} containing all elements in the
- * given {@link Iterable}.
- */
- private static PTransform> toStringPCollection(
- Iterable strings) {
- return Create.of(strings).withCoder(StringUtf8Coder.of());
- }
-
- /**
- * Returns a {@link PTransform} from file {@link Metadata} to {@link VersionedEntity} using
- * caller-provided {@code transformer}.
- */
- private static PTransform, PCollection> processFiles(
- DoFn transformer) {
- return new PTransform, PCollection>() {
- @Override
- public PCollection expand(PCollection input) {
- return input
- .apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED))
- .apply(transformer.getClass().getSimpleName(), ParDo.of(transformer));
- }
- };
- }
-
- private static class FilterCommitLogFileByTime extends DoFn {
- private final DateTime fromTime;
- private final DateTime toTime;
-
- FilterCommitLogFileByTime(DateTime fromTime, DateTime toTime) {
- checkNotNull(fromTime, "fromTime");
- checkNotNull(toTime, "toTime");
- checkArgument(
- fromTime.isBefore(toTime),
- "Invalid time range: fromTime (%s) is before endTime (%s)",
- fromTime,
- toTime);
- this.fromTime = fromTime;
- this.toTime = toTime;
- }
-
- @ProcessElement
- public void processElement(@Element Metadata fileMeta, OutputReceiver out) {
- DateTime timestamp = getCommitLogTimestamp(fileMeta.resourceId().toString());
- if (isBeforeOrAt(fromTime, timestamp) && timestamp.isBefore(toTime)) {
- out.output(fileMeta);
- }
- }
- }
-
- /**
- * Reads from a Datastore backup file and converts its content into {@link VersionedEntity
- * VersionedEntities}.
- *
- *
The input file may be either a LevelDb file from a Datastore export or a CommitLog file
- * generated by the Nomulus server. In either case, the file contains variable-length records and
- * must be read sequentially from the beginning. If the read fails, the file needs to be retried
- * from the beginning.
- */
- private static class BackupFileReader extends DoFn {
- private final ProcessFunction> reader;
-
- private BackupFileReader(ProcessFunction> reader) {
- this.reader = reader;
- }
-
- @ProcessElement
- public void processElement(@Element ReadableFile file, OutputReceiver out) {
- try {
- reader.apply(file).forEachRemaining(out::output);
- } catch (Exception e) {
- // Let the pipeline use default retry strategy on the whole file. For GCP Dataflow this
- // means retrying up to 4 times (may include other files grouped with this one), and failing
- // the pipeline if no success.
- throw new RuntimeException(e);
- }
- }
- }
-
- /**
- * Removes BillingEvents, {@link google.registry.model.poll.PollMessage PollMessages} and {@link
- * google.registry.model.host.HostResource} from a {@link DomainBase}. These are circular foreign
- * key constraints that prevent migration of {@code DomainBase} to SQL databases.
- *
- *