Save db discrepancies to GCS (#1527)

* Save db discrepancies to GCS
This commit is contained in:
Weimin Yu 2022-02-28 16:52:09 -05:00 committed by GitHub
parent e47be4fa2c
commit d882847fd7
5 changed files with 104 additions and 53 deletions

View file

@ -18,6 +18,7 @@ import static com.google.common.base.Verify.verify;
import static org.apache.beam.sdk.values.TypeDescriptors.strings; import static org.apache.beam.sdk.values.TypeDescriptors.strings;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.flogger.FluentLogger; import com.google.common.flogger.FluentLogger;
import google.registry.beam.common.RegistryPipelineOptions; import google.registry.beam.common.RegistryPipelineOptions;
import google.registry.beam.common.RegistryPipelineWorkerInitializer; import google.registry.beam.common.RegistryPipelineWorkerInitializer;
@ -29,10 +30,12 @@ import google.registry.model.domain.DomainHistory;
import google.registry.model.replay.SqlEntity; import google.registry.model.replay.SqlEntity;
import google.registry.persistence.PersistenceModule.JpaTransactionManagerType; import google.registry.persistence.PersistenceModule.JpaTransactionManagerType;
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel; import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
import google.registry.util.SystemClock;
import java.io.Serializable; import java.io.Serializable;
import java.util.Optional; import java.util.Optional;
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.GroupByKey;
@ -77,12 +80,22 @@ public class ValidateDatabasePipeline {
"Comparing datastore export at %s and commitlog timestamp %s.", "Comparing datastore export at %s and commitlog timestamp %s.",
mostRecentExport.exportDir(), latestCommitLogTime); mostRecentExport.exportDir(), latestCommitLogTime);
Optional<String> outputPath =
Optional.ofNullable(options.getDiffOutputGcsBucket())
.map(
bucket ->
String.format(
"gs://%s/validate_database/%s/diffs.txt",
bucket, new SystemClock().nowUtc()));
outputPath.ifPresent(path -> logger.atInfo().log("Discrepancies will be logged to %s", path));
setupPipeline( setupPipeline(
pipeline, pipeline,
Optional.ofNullable(options.getSqlSnapshotId()), Optional.ofNullable(options.getSqlSnapshotId()),
mostRecentExport, mostRecentExport,
latestCommitLogTime, latestCommitLogTime,
Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse)); Optional.ofNullable(options.getComparisonStartTimestamp()).map(DateTime::parse),
outputPath);
pipeline.run(); pipeline.run();
} }
@ -92,7 +105,8 @@ public class ValidateDatabasePipeline {
Optional<String> sqlSnapshotId, Optional<String> sqlSnapshotId,
DatastoreSnapshotInfo mostRecentExport, DatastoreSnapshotInfo mostRecentExport,
DateTime latestCommitLogTime, DateTime latestCommitLogTime,
Optional<DateTime> compareStartTime) { Optional<DateTime> compareStartTime,
Optional<String> diffOutputPath) {
pipeline pipeline
.getCoderRegistry() .getCoderRegistry()
.registerCoderForClass(SqlEntity.class, SerializableCoder.of(Serializable.class)); .registerCoderForClass(SqlEntity.class, SerializableCoder.of(Serializable.class));
@ -117,19 +131,41 @@ public class ValidateDatabasePipeline {
datastoreSnapshot.getAll().keySet().equals(cloudSqlSnapshot.getAll().keySet()), datastoreSnapshot.getAll().keySet().equals(cloudSqlSnapshot.getAll().keySet()),
"Expecting the same set of types in both snapshots."); "Expecting the same set of types in both snapshots.");
PCollectionList<String> diffLogs = PCollectionList.empty(pipeline);
for (Class<? extends SqlEntity> clazz : SqlSnapshots.ALL_SQL_ENTITIES) { for (Class<? extends SqlEntity> clazz : SqlSnapshots.ALL_SQL_ENTITIES) {
TupleTag<SqlEntity> tag = ValidateSqlUtils.createSqlEntityTupleTag(clazz); TupleTag<SqlEntity> tag = ValidateSqlUtils.createSqlEntityTupleTag(clazz);
verify( verify(
datastoreSnapshot.has(tag), "Missing %s in Datastore snapshot.", clazz.getSimpleName()); datastoreSnapshot.has(tag), "Missing %s in Datastore snapshot.", clazz.getSimpleName());
verify(cloudSqlSnapshot.has(tag), "Missing %s in Cloud SQL snapshot.", clazz.getSimpleName()); verify(cloudSqlSnapshot.has(tag), "Missing %s in Cloud SQL snapshot.", clazz.getSimpleName());
PCollectionList.of(datastoreSnapshot.get(tag)) diffLogs =
.and(cloudSqlSnapshot.get(tag)) diffLogs.and(
.apply("Combine from both snapshots: " + clazz.getSimpleName(), Flatten.pCollections()) PCollectionList.of(datastoreSnapshot.get(tag))
.and(cloudSqlSnapshot.get(tag))
.apply(
"Combine from both snapshots: " + clazz.getSimpleName(),
Flatten.pCollections())
.apply(
"Assign primary key to merged " + clazz.getSimpleName(),
WithKeys.of(ValidateDatabasePipeline::getPrimaryKeyString)
.withKeyType(strings()))
.apply("Group by primary key " + clazz.getSimpleName(), GroupByKey.create())
.apply("Compare " + clazz.getSimpleName(), ParDo.of(new CompareSqlEntity())));
}
if (diffOutputPath.isPresent()) {
diffLogs
.apply("Gather diff logs", Flatten.pCollections())
.apply( .apply(
"Assign primary key to merged " + clazz.getSimpleName(), "Output diffs",
WithKeys.of(ValidateDatabasePipeline::getPrimaryKeyString).withKeyType(strings())) TextIO.write()
.apply("Group by primary key " + clazz.getSimpleName(), GroupByKey.create()) .to(diffOutputPath.get())
.apply("Compare " + clazz.getSimpleName(), ParDo.of(new CompareSqlEntity())); /**
* Output to a single file for ease of use since diffs should be few. If this
* assumption turns out not to be false, user should abort the pipeline and
* investigate why.
*/
.withoutSharding()
.withDelimiter((Strings.repeat("-", 80) + "\n").toCharArray()));
} }
} }

View file

@ -46,4 +46,10 @@ public interface ValidateDatabasePipelineOptions extends RegistryPipelineOptions
String getComparisonStartTimestamp(); String getComparisonStartTimestamp();
void setComparisonStartTimestamp(String comparisonStartTimestamp); void setComparisonStartTimestamp(String comparisonStartTimestamp);
@Description("The GCS bucket where discrepancies found during comparison should be logged.")
@Nullable
String getDiffOutputGcsBucket();
void setDiffOutputGcsBucket(String gcsBucket);
} }

View file

@ -20,7 +20,6 @@ import static google.registry.persistence.transaction.TransactionManagerFactory.
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.FluentLogger;
import google.registry.beam.initsql.Transforms; import google.registry.beam.initsql.Transforms;
import google.registry.config.RegistryEnvironment; import google.registry.config.RegistryEnvironment;
import google.registry.model.EppResource; import google.registry.model.EppResource;
@ -37,6 +36,7 @@ import google.registry.model.poll.PollMessage;
import google.registry.model.registrar.Registrar; import google.registry.model.registrar.Registrar;
import google.registry.model.replay.SqlEntity; import google.registry.model.replay.SqlEntity;
import google.registry.model.reporting.HistoryEntry; import google.registry.model.reporting.HistoryEntry;
import google.registry.util.DiffUtils;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.math.BigInteger; import java.math.BigInteger;
import java.util.HashMap; import java.util.HashMap;
@ -54,7 +54,6 @@ import org.apache.beam.sdk.values.TupleTag;
/** Helpers for use by {@link ValidateDatabasePipeline}. */ /** Helpers for use by {@link ValidateDatabasePipeline}. */
@DeleteAfterMigration @DeleteAfterMigration
final class ValidateSqlUtils { final class ValidateSqlUtils {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private ValidateSqlUtils() {} private ValidateSqlUtils() {}
@ -98,15 +97,13 @@ final class ValidateSqlUtils {
return new TupleTag<SqlEntity>(actualType.getSimpleName()) {}; return new TupleTag<SqlEntity>(actualType.getSimpleName()) {};
} }
static class CompareSqlEntity extends DoFn<KV<String, Iterable<SqlEntity>>, Void> { static class CompareSqlEntity extends DoFn<KV<String, Iterable<SqlEntity>>, String> {
private final HashMap<String, Counter> totalCounters = new HashMap<>(); private final HashMap<String, Counter> totalCounters = new HashMap<>();
private final HashMap<String, Counter> missingCounters = new HashMap<>(); private final HashMap<String, Counter> missingCounters = new HashMap<>();
private final HashMap<String, Counter> unequalCounters = new HashMap<>(); private final HashMap<String, Counter> unequalCounters = new HashMap<>();
private final HashMap<String, Counter> badEntityCounters = new HashMap<>(); private final HashMap<String, Counter> badEntityCounters = new HashMap<>();
private final HashMap<String, Counter> duplicateEntityCounters = new HashMap<>(); private final HashMap<String, Counter> duplicateEntityCounters = new HashMap<>();
private volatile boolean logPrinted = false;
private String getCounterKey(Class<?> clazz) { private String getCounterKey(Class<?> clazz) {
return PollMessage.class.isAssignableFrom(clazz) ? "PollMessage" : clazz.getSimpleName(); return PollMessage.class.isAssignableFrom(clazz) ? "PollMessage" : clazz.getSimpleName();
} }
@ -124,39 +121,36 @@ final class ValidateSqlUtils {
counterKey, Metrics.counter("CompareDB", "Duplicate Entities:" + counterKey)); counterKey, Metrics.counter("CompareDB", "Duplicate Entities:" + counterKey));
} }
String duplicateEntityLog(String key, ImmutableList<SqlEntity> entities) {
return String.format("%s: %d entities.", key, entities.size());
}
String unmatchedEntityLog(String key, SqlEntity entry) {
// For a PollMessage only found in Datastore, key is not enough to query for it.
return String.format("Missing in one DB:\n%s", entry instanceof PollMessage ? entry : key);
}
/** /**
* A rudimentary debugging helper that prints the first pair of unequal entities in each worker. * A rudimentary debugging helper that prints the first pair of unequal entities in each worker.
* This will be removed when we start exporting such entities to GCS. * This will be removed when we start exporting such entities to GCS.
*/ */
void logDiff(String key, Object entry0, Object entry1) { String unEqualEntityLog(String key, SqlEntity entry0, SqlEntity entry1) {
if (logPrinted) {
return;
}
logPrinted = true;
Map<String, Object> fields0 = ((ImmutableObject) entry0).toDiffableFieldMap(); Map<String, Object> fields0 = ((ImmutableObject) entry0).toDiffableFieldMap();
Map<String, Object> fields1 = ((ImmutableObject) entry1).toDiffableFieldMap(); Map<String, Object> fields1 = ((ImmutableObject) entry1).toDiffableFieldMap();
StringBuilder sb = new StringBuilder(); return key + " " + DiffUtils.prettyPrintEntityDeepDiff(fields0, fields1);
fields0.forEach( }
(field, value) -> {
if (fields1.containsKey(field)) { String badEntitiesLog(String key, SqlEntity entry0, SqlEntity entry1) {
if (!Objects.equals(value, fields1.get(field))) { Map<String, Object> fields0 = ((ImmutableObject) entry0).toDiffableFieldMap();
sb.append(field + " not match: " + value + " -> " + fields1.get(field) + "\n"); Map<String, Object> fields1 = ((ImmutableObject) entry1).toDiffableFieldMap();
} return String.format(
} else { "Failed to parse one or both entities for key %s:\n%s\n",
sb.append(field + "Not found in entity 2\n"); key, DiffUtils.prettyPrintEntityDeepDiff(fields0, fields1));
}
});
fields1.forEach(
(field, value) -> {
if (!fields0.containsKey(field)) {
sb.append(field + "Not found in entity 1\n");
}
});
logger.atWarning().log(key + " " + sb.toString());
} }
@ProcessElement @ProcessElement
public void processElement(@Element KV<String, Iterable<SqlEntity>> kv) { public void processElement(
@Element KV<String, Iterable<SqlEntity>> kv, OutputReceiver<String> out) {
ImmutableList<SqlEntity> entities = ImmutableList.copyOf(kv.getValue()); ImmutableList<SqlEntity> entities = ImmutableList.copyOf(kv.getValue());
verify(!entities.isEmpty(), "Can't happen: no value for key %s.", kv.getKey()); verify(!entities.isEmpty(), "Can't happen: no value for key %s.", kv.getKey());
@ -169,6 +163,7 @@ final class ValidateSqlUtils {
// Duplicates may happen with Cursors if imported across projects. Its key in Datastore, the // Duplicates may happen with Cursors if imported across projects. Its key in Datastore, the
// id field, encodes the project name and is not fixed by the importing job. // id field, encodes the project name and is not fixed by the importing job.
duplicateEntityCounters.get(counterKey).inc(); duplicateEntityCounters.get(counterKey).inc();
out.output(duplicateEntityLog(kv.getKey(), entities) + "\n");
return; return;
} }
@ -177,11 +172,7 @@ final class ValidateSqlUtils {
return; return;
} }
missingCounters.get(counterKey).inc(); missingCounters.get(counterKey).inc();
// Temporary debugging help. See logDiff() above. out.output(unmatchedEntityLog(kv.getKey(), entities.get(0)) + "\n");
if (!logPrinted) {
logPrinted = true;
logger.atWarning().log("Unexpected single entity: %s", kv.getKey());
}
return; return;
} }
SqlEntity entity0 = entities.get(0); SqlEntity entity0 = entities.get(0);
@ -198,17 +189,14 @@ final class ValidateSqlUtils {
entity0 = normalizeEntity(entity0); entity0 = normalizeEntity(entity0);
entity1 = normalizeEntity(entity1); entity1 = normalizeEntity(entity1);
} catch (Exception e) { } catch (Exception e) {
// Temporary debugging help. See logDiff() above. badEntityCounters.get(counterKey).inc();
if (!logPrinted) { out.output(badEntitiesLog(kv.getKey(), entity0, entity1));
logPrinted = true;
badEntityCounters.get(counterKey).inc();
}
return; return;
} }
if (!Objects.equals(entity0, entity1)) { if (!Objects.equals(entity0, entity1)) {
unequalCounters.get(counterKey).inc(); unequalCounters.get(counterKey).inc();
logDiff(kv.getKey(), entities.get(0), entities.get(1)); out.output(unEqualEntityLog(kv.getKey(), entities.get(0), entities.get(1)));
} }
} }
} }

View file

@ -32,6 +32,7 @@ import google.registry.util.Clock;
import google.registry.util.RequestStatusChecker; import google.registry.util.RequestStatusChecker;
import google.registry.util.Sleeper; import google.registry.util.Sleeper;
import java.io.IOException; import java.io.IOException;
import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import javax.inject.Inject; import javax.inject.Inject;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -52,7 +53,8 @@ abstract class ValidateDatabaseMigrationCommand
+ "--worker-machine-type=n2-standard-8 --num-workers=8 " + "--worker-machine-type=n2-standard-8 --num-workers=8 "
+ "--parameters registryEnvironment=%s " + "--parameters registryEnvironment=%s "
+ "--parameters sqlSnapshotId=%s " + "--parameters sqlSnapshotId=%s "
+ "--parameters latestCommitLogTimestamp=%s "; + "--parameters latestCommitLogTimestamp=%s "
+ "--parameters diffOutputGcsBucket=%s ";
// States indicating a job is not finished yet. // States indicating a job is not finished yet.
static final ImmutableSet<String> DATAFLOW_JOB_RUNNING_STATES = static final ImmutableSet<String> DATAFLOW_JOB_RUNNING_STATES =
@ -85,6 +87,13 @@ abstract class ValidateDatabaseMigrationCommand
converter = DateTimeParameter.class) converter = DateTimeParameter.class)
DateTime comparisonStartTimestamp; DateTime comparisonStartTimestamp;
@Parameter(
names = {"-o", "--outputBucket"},
description =
"The GCS bucket where data discrepancies are logged. "
+ "It defaults to ${projectId}-beam")
String outputBucket;
@Inject Clock clock; @Inject Clock clock;
@Inject Dataflow dataflow; @Inject Dataflow dataflow;
@ -140,6 +149,10 @@ abstract class ValidateDatabaseMigrationCommand
getDataflowJobStatus(jobId)); getDataflowJobStatus(jobId));
} }
String getOutputBucket() {
return Optional.ofNullable(outputBucket).orElse(projectId + "-beam");
}
String getContainerSpecGcsPath() { String getContainerSpecGcsPath() {
return String.format( return String.format(
"%s/%s_metadata.json", stagingBucketUrl.replace("live", release), PIPELINE_NAME); "%s/%s_metadata.json", stagingBucketUrl.replace("live", release), PIPELINE_NAME);
@ -156,7 +169,8 @@ abstract class ValidateDatabaseMigrationCommand
jobRegion, jobRegion,
RegistryToolEnvironment.get().name(), RegistryToolEnvironment.get().name(),
snapshotId, snapshotId,
latestCommitLogTimestamp); latestCommitLogTimestamp,
getOutputBucket());
if (comparisonStartTimestamp == null) { if (comparisonStartTimestamp == null) {
return baseCommand; return baseCommand;
} }
@ -173,7 +187,8 @@ abstract class ValidateDatabaseMigrationCommand
.put("numWorkers", "8") .put("numWorkers", "8")
.put("sqlSnapshotId", sqlSnapshotId) .put("sqlSnapshotId", sqlSnapshotId)
.put("latestCommitLogTimestamp", latestCommitLogTimestamp) .put("latestCommitLogTimestamp", latestCommitLogTimestamp)
.put("registryEnvironment", RegistryToolEnvironment.get().name()); .put("registryEnvironment", RegistryToolEnvironment.get().name())
.put("diffOutputGcsBucket", getOutputBucket());
if (comparisonStartTimestamp != null) { if (comparisonStartTimestamp != null) {
paramsBuilder.put("comparisonStartTimestamp", comparisonStartTimestamp.toString()); paramsBuilder.put("comparisonStartTimestamp", comparisonStartTimestamp.toString());
} }

View file

@ -24,7 +24,7 @@
"name": "sqlSnapshotId", "name": "sqlSnapshotId",
"label": "The ID of an exported Cloud SQL (Postgresql) snapshot.", "label": "The ID of an exported Cloud SQL (Postgresql) snapshot.",
"helpText": "The ID of an exported Cloud SQL (Postgresql) snapshot.", "helpText": "The ID of an exported Cloud SQL (Postgresql) snapshot.",
"is_optional": true "is_optional": false
}, },
{ {
"name": "latestCommitLogTimestamp", "name": "latestCommitLogTimestamp",
@ -37,6 +37,12 @@
"label": "Only entities updated at or after this time are included for validation.", "label": "Only entities updated at or after this time are included for validation.",
"helpText": "The earliest entity update time allowed for inclusion in validation, in ISO8601 format.", "helpText": "The earliest entity update time allowed for inclusion in validation, in ISO8601 format.",
"is_optional": true "is_optional": true
},
{
"name": "diffOutputGcsBucket",
"label": "The GCS bucket where data discrepancies should be output to.",
"helpText": "The GCS bucket where data discrepancies should be output to.",
"is_optional": true
} }
] ]
} }