diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateDatabasePipeline.java b/core/src/main/java/google/registry/beam/comparedb/ValidateDatabasePipeline.java index 8dc2d9a8a..941cdab89 100644 --- a/core/src/main/java/google/registry/beam/comparedb/ValidateDatabasePipeline.java +++ b/core/src/main/java/google/registry/beam/comparedb/ValidateDatabasePipeline.java @@ -18,6 +18,7 @@ import static com.google.common.base.Verify.verify; import static org.apache.beam.sdk.values.TypeDescriptors.strings; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.common.flogger.FluentLogger; import google.registry.beam.common.RegistryPipelineOptions; import google.registry.beam.common.RegistryPipelineWorkerInitializer; @@ -29,10 +30,12 @@ import google.registry.model.domain.DomainHistory; import google.registry.model.replay.SqlEntity; import google.registry.persistence.PersistenceModule.JpaTransactionManagerType; import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; +import google.registry.util.SystemClock; import java.io.Serializable; import java.util.Optional; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; @@ -77,12 +80,22 @@ public class ValidateDatabasePipeline { "Comparing datastore export at %s and commitlog timestamp %s.", mostRecentExport.exportDir(), latestCommitLogTime); + Optional outputPath = + Optional.ofNullable(options.getDiffOutputGcsBucket()) + .map( + bucket -> + String.format( + "gs://%s/validate_database/%s/diffs.txt", + bucket, new SystemClock().nowUtc())); + outputPath.ifPresent(path -> logger.atInfo().log("Discrepancies will be logged to %s", path)); + setupPipeline( pipeline, Optional.ofNullable(options.getSqlSnapshotId()), mostRecentExport, latestCommitLogTime, - Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse)); + Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse), + outputPath); pipeline.run(); } @@ -92,7 +105,8 @@ public class ValidateDatabasePipeline { Optional sqlSnapshotId, DatastoreSnapshotInfo mostRecentExport, DateTime latestCommitLogTime, - Optional compareStartTime) { + Optional compareStartTime, + Optional diffOutputPath) { pipeline .getCoderRegistry() .registerCoderForClass(SqlEntity.class, SerializableCoder.of(Serializable.class)); @@ -117,19 +131,41 @@ public class ValidateDatabasePipeline { datastoreSnapshot.getAll().keySet().equals(cloudSqlSnapshot.getAll().keySet()), "Expecting the same set of types in both snapshots."); + PCollectionList diffLogs = PCollectionList.empty(pipeline); + for (Class clazz : SqlSnapshots.ALL_SQL_ENTITIES) { TupleTag tag = ValidateSqlUtils.createSqlEntityTupleTag(clazz); verify( datastoreSnapshot.has(tag), "Missing %s in Datastore snapshot.", clazz.getSimpleName()); verify(cloudSqlSnapshot.has(tag), "Missing %s in Cloud SQL snapshot.", clazz.getSimpleName()); - PCollectionList.of(datastoreSnapshot.get(tag)) - .and(cloudSqlSnapshot.get(tag)) - .apply("Combine from both snapshots: " + clazz.getSimpleName(), Flatten.pCollections()) + diffLogs = + diffLogs.and( + PCollectionList.of(datastoreSnapshot.get(tag)) + .and(cloudSqlSnapshot.get(tag)) + .apply( + "Combine from both snapshots: " + clazz.getSimpleName(), + Flatten.pCollections()) + .apply( + "Assign primary key to merged " + clazz.getSimpleName(), + WithKeys.of(ValidateDatabasePipeline::getPrimaryKeyString) + .withKeyType(strings())) + .apply("Group by primary key " + clazz.getSimpleName(), GroupByKey.create()) + .apply("Compare " + clazz.getSimpleName(), ParDo.of(new CompareSqlEntity()))); + } + if (diffOutputPath.isPresent()) { + diffLogs + .apply("Gather diff logs", Flatten.pCollections()) .apply( - "Assign primary key to merged " + clazz.getSimpleName(), - WithKeys.of(ValidateDatabasePipeline::getPrimaryKeyString).withKeyType(strings())) - .apply("Group by primary key " + clazz.getSimpleName(), GroupByKey.create()) - .apply("Compare " + clazz.getSimpleName(), ParDo.of(new CompareSqlEntity())); + "Output diffs", + TextIO.write() + .to(diffOutputPath.get()) + /** + * Output to a single file for ease of use since diffs should be few. If this + * assumption turns out not to be false, user should abort the pipeline and + * investigate why. + */ + .withoutSharding() + .withDelimiter((Strings.repeat("-", 80) + "\n").toCharArray())); } } diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateDatabasePipelineOptions.java b/core/src/main/java/google/registry/beam/comparedb/ValidateDatabasePipelineOptions.java index f2222efe0..d6ede2b51 100644 --- a/core/src/main/java/google/registry/beam/comparedb/ValidateDatabasePipelineOptions.java +++ b/core/src/main/java/google/registry/beam/comparedb/ValidateDatabasePipelineOptions.java @@ -46,4 +46,10 @@ public interface ValidateDatabasePipelineOptions extends RegistryPipelineOptions String getComparisonStartTimestamp(); void setComparisonStartTimestamp(String comparisonStartTimestamp); + + @Description("The GCS bucket where discrepancies found during comparison should be logged.") + @Nullable + String getDiffOutputGcsBucket(); + + void setDiffOutputGcsBucket(String gcsBucket); } diff --git a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlUtils.java b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlUtils.java index eeca07e1a..db5cd45d7 100644 --- a/core/src/main/java/google/registry/beam/comparedb/ValidateSqlUtils.java +++ b/core/src/main/java/google/registry/beam/comparedb/ValidateSqlUtils.java @@ -20,7 +20,6 @@ import static google.registry.persistence.transaction.TransactionManagerFactory. import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.flogger.FluentLogger; import google.registry.beam.initsql.Transforms; import google.registry.config.RegistryEnvironment; import google.registry.model.EppResource; @@ -37,6 +36,7 @@ import google.registry.model.poll.PollMessage; import google.registry.model.registrar.Registrar; import google.registry.model.replay.SqlEntity; import google.registry.model.reporting.HistoryEntry; +import google.registry.util.DiffUtils; import java.lang.reflect.Field; import java.math.BigInteger; import java.util.HashMap; @@ -54,7 +54,6 @@ import org.apache.beam.sdk.values.TupleTag; /** Helpers for use by {@link ValidateDatabasePipeline}. */ @DeleteAfterMigration final class ValidateSqlUtils { - private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private ValidateSqlUtils() {} @@ -98,15 +97,13 @@ final class ValidateSqlUtils { return new TupleTag(actualType.getSimpleName()) {}; } - static class CompareSqlEntity extends DoFn>, Void> { + static class CompareSqlEntity extends DoFn>, String> { private final HashMap totalCounters = new HashMap<>(); private final HashMap missingCounters = new HashMap<>(); private final HashMap unequalCounters = new HashMap<>(); private final HashMap badEntityCounters = new HashMap<>(); private final HashMap duplicateEntityCounters = new HashMap<>(); - private volatile boolean logPrinted = false; - private String getCounterKey(Class clazz) { return PollMessage.class.isAssignableFrom(clazz) ? "PollMessage" : clazz.getSimpleName(); } @@ -124,39 +121,36 @@ final class ValidateSqlUtils { counterKey, Metrics.counter("CompareDB", "Duplicate Entities:" + counterKey)); } + String duplicateEntityLog(String key, ImmutableList entities) { + return String.format("%s: %d entities.", key, entities.size()); + } + + String unmatchedEntityLog(String key, SqlEntity entry) { + // For a PollMessage only found in Datastore, key is not enough to query for it. + return String.format("Missing in one DB:\n%s", entry instanceof PollMessage ? entry : key); + } + /** * A rudimentary debugging helper that prints the first pair of unequal entities in each worker. * This will be removed when we start exporting such entities to GCS. */ - void logDiff(String key, Object entry0, Object entry1) { - if (logPrinted) { - return; - } - logPrinted = true; + String unEqualEntityLog(String key, SqlEntity entry0, SqlEntity entry1) { Map fields0 = ((ImmutableObject) entry0).toDiffableFieldMap(); Map fields1 = ((ImmutableObject) entry1).toDiffableFieldMap(); - StringBuilder sb = new StringBuilder(); - fields0.forEach( - (field, value) -> { - if (fields1.containsKey(field)) { - if (!Objects.equals(value, fields1.get(field))) { - sb.append(field + " not match: " + value + " -> " + fields1.get(field) + "\n"); - } - } else { - sb.append(field + "Not found in entity 2\n"); - } - }); - fields1.forEach( - (field, value) -> { - if (!fields0.containsKey(field)) { - sb.append(field + "Not found in entity 1\n"); - } - }); - logger.atWarning().log(key + " " + sb.toString()); + return key + " " + DiffUtils.prettyPrintEntityDeepDiff(fields0, fields1); + } + + String badEntitiesLog(String key, SqlEntity entry0, SqlEntity entry1) { + Map fields0 = ((ImmutableObject) entry0).toDiffableFieldMap(); + Map fields1 = ((ImmutableObject) entry1).toDiffableFieldMap(); + return String.format( + "Failed to parse one or both entities for key %s:\n%s\n", + key, DiffUtils.prettyPrintEntityDeepDiff(fields0, fields1)); } @ProcessElement - public void processElement(@Element KV> kv) { + public void processElement( + @Element KV> kv, OutputReceiver out) { ImmutableList entities = ImmutableList.copyOf(kv.getValue()); verify(!entities.isEmpty(), "Can't happen: no value for key %s.", kv.getKey()); @@ -169,6 +163,7 @@ final class ValidateSqlUtils { // Duplicates may happen with Cursors if imported across projects. Its key in Datastore, the // id field, encodes the project name and is not fixed by the importing job. duplicateEntityCounters.get(counterKey).inc(); + out.output(duplicateEntityLog(kv.getKey(), entities) + "\n"); return; } @@ -177,11 +172,7 @@ final class ValidateSqlUtils { return; } missingCounters.get(counterKey).inc(); - // Temporary debugging help. See logDiff() above. - if (!logPrinted) { - logPrinted = true; - logger.atWarning().log("Unexpected single entity: %s", kv.getKey()); - } + out.output(unmatchedEntityLog(kv.getKey(), entities.get(0)) + "\n"); return; } SqlEntity entity0 = entities.get(0); @@ -198,17 +189,14 @@ final class ValidateSqlUtils { entity0 = normalizeEntity(entity0); entity1 = normalizeEntity(entity1); } catch (Exception e) { - // Temporary debugging help. See logDiff() above. - if (!logPrinted) { - logPrinted = true; - badEntityCounters.get(counterKey).inc(); - } + badEntityCounters.get(counterKey).inc(); + out.output(badEntitiesLog(kv.getKey(), entity0, entity1)); return; } if (!Objects.equals(entity0, entity1)) { unequalCounters.get(counterKey).inc(); - logDiff(kv.getKey(), entities.get(0), entities.get(1)); + out.output(unEqualEntityLog(kv.getKey(), entities.get(0), entities.get(1))); } } } diff --git a/core/src/main/java/google/registry/tools/ValidateDatabaseMigrationCommand.java b/core/src/main/java/google/registry/tools/ValidateDatabaseMigrationCommand.java index b9b9126b2..753f309d6 100644 --- a/core/src/main/java/google/registry/tools/ValidateDatabaseMigrationCommand.java +++ b/core/src/main/java/google/registry/tools/ValidateDatabaseMigrationCommand.java @@ -32,6 +32,7 @@ import google.registry.util.Clock; import google.registry.util.RequestStatusChecker; import google.registry.util.Sleeper; import java.io.IOException; +import java.util.Optional; import java.util.UUID; import javax.inject.Inject; import org.joda.time.DateTime; @@ -52,7 +53,8 @@ abstract class ValidateDatabaseMigrationCommand + "--worker-machine-type=n2-standard-8 --num-workers=8 " + "--parameters registryEnvironment=%s " + "--parameters sqlSnapshotId=%s " - + "--parameters latestCommitLogTimestamp=%s "; + + "--parameters latestCommitLogTimestamp=%s " + + "--parameters diffOutputGcsBucket=%s "; // States indicating a job is not finished yet. static final ImmutableSet DATAFLOW_JOB_RUNNING_STATES = @@ -85,6 +87,13 @@ abstract class ValidateDatabaseMigrationCommand converter = DateTimeParameter.class) DateTime comparisonStartTimestamp; + @Parameter( + names = {"-o", "--outputBucket"}, + description = + "The GCS bucket where data discrepancies are logged. " + + "It defaults to ${projectId}-beam") + String outputBucket; + @Inject Clock clock; @Inject Dataflow dataflow; @@ -140,6 +149,10 @@ abstract class ValidateDatabaseMigrationCommand getDataflowJobStatus(jobId)); } + String getOutputBucket() { + return Optional.ofNullable(outputBucket).orElse(projectId + "-beam"); + } + String getContainerSpecGcsPath() { return String.format( "%s/%s_metadata.json", stagingBucketUrl.replace("live", release), PIPELINE_NAME); @@ -156,7 +169,8 @@ abstract class ValidateDatabaseMigrationCommand jobRegion, RegistryToolEnvironment.get().name(), snapshotId, - latestCommitLogTimestamp); + latestCommitLogTimestamp, + getOutputBucket()); if (comparisonStartTimestamp == null) { return baseCommand; } @@ -173,7 +187,8 @@ abstract class ValidateDatabaseMigrationCommand .put("numWorkers", "8") .put("sqlSnapshotId", sqlSnapshotId) .put("latestCommitLogTimestamp", latestCommitLogTimestamp) - .put("registryEnvironment", RegistryToolEnvironment.get().name()); + .put("registryEnvironment", RegistryToolEnvironment.get().name()) + .put("diffOutputGcsBucket", getOutputBucket()); if (comparisonStartTimestamp != null) { paramsBuilder.put("comparisonStartTimestamp", comparisonStartTimestamp.toString()); } diff --git a/core/src/main/resources/google/registry/beam/validate_database_pipeline_metadata.json b/core/src/main/resources/google/registry/beam/validate_database_pipeline_metadata.json index 809dffb8a..8d6940958 100644 --- a/core/src/main/resources/google/registry/beam/validate_database_pipeline_metadata.json +++ b/core/src/main/resources/google/registry/beam/validate_database_pipeline_metadata.json @@ -24,7 +24,7 @@ "name": "sqlSnapshotId", "label": "The ID of an exported Cloud SQL (Postgresql) snapshot.", "helpText": "The ID of an exported Cloud SQL (Postgresql) snapshot.", - "is_optional": true + "is_optional": false }, { "name": "latestCommitLogTimestamp", @@ -37,6 +37,12 @@ "label": "Only entities updated at or after this time are included for validation.", "helpText": "The earliest entity update time allowed for inclusion in validation, in ISO8601 format.", "is_optional": true + }, + { + "name": "diffOutputGcsBucket", + "label": "The GCS bucket where data discrepancies should be output to.", + "helpText": "The GCS bucket where data discrepancies should be output to.", + "is_optional": true } ] }