mirror of
https://github.com/google/nomulus.git
synced 2025-07-21 18:26:12 +02:00
Fix appId during cross-project commitlog imports (#1213)
* Fix appId during cross-project commitlog imports When importing commit logs from another project, we must override the appId in every entity key instances. The fixEntity method in the EntityImports class is a straightforward translation of the python function of the same name used by the storage team.
This commit is contained in:
parent
e38be0576d
commit
07e2a71433
7 changed files with 244 additions and 10 deletions
|
@ -56,12 +56,16 @@ public class BackupUtils {
|
||||||
*
|
*
|
||||||
* <p>The iterator reads from the stream on demand, and as such will fail if the stream is closed.
|
* <p>The iterator reads from the stream on demand, and as such will fail if the stream is closed.
|
||||||
*/
|
*/
|
||||||
public static Iterator<ImmutableObject> createDeserializingIterator(final InputStream input) {
|
public static Iterator<ImmutableObject> createDeserializingIterator(
|
||||||
|
final InputStream input, boolean withAppIdOverride) {
|
||||||
return new AbstractIterator<ImmutableObject>() {
|
return new AbstractIterator<ImmutableObject>() {
|
||||||
@Override
|
@Override
|
||||||
protected ImmutableObject computeNext() {
|
protected ImmutableObject computeNext() {
|
||||||
EntityProto proto = new EntityProto();
|
EntityProto proto = new EntityProto();
|
||||||
if (proto.parseDelimitedFrom(input)) { // False means end of stream; other errors throw.
|
if (proto.parseDelimitedFrom(input)) { // False means end of stream; other errors throw.
|
||||||
|
if (withAppIdOverride) {
|
||||||
|
proto = EntityImports.fixEntity(proto);
|
||||||
|
}
|
||||||
return auditedOfy().load().fromEntity(EntityTranslator.createFromPb(proto));
|
return auditedOfy().load().fromEntity(EntityTranslator.createFromPb(proto));
|
||||||
}
|
}
|
||||||
return endOfData();
|
return endOfData();
|
||||||
|
@ -70,6 +74,7 @@ public class BackupUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ImmutableList<ImmutableObject> deserializeEntities(byte[] bytes) {
|
public static ImmutableList<ImmutableObject> deserializeEntities(byte[] bytes) {
|
||||||
return ImmutableList.copyOf(createDeserializingIterator(new ByteArrayInputStream(bytes)));
|
return ImmutableList.copyOf(
|
||||||
|
createDeserializingIterator(new ByteArrayInputStream(bytes), false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,7 @@ public final class CommitLogImports {
|
||||||
InputStream inputStream) {
|
InputStream inputStream) {
|
||||||
try (AppEngineEnvironment appEngineEnvironment = new AppEngineEnvironment();
|
try (AppEngineEnvironment appEngineEnvironment = new AppEngineEnvironment();
|
||||||
InputStream input = new BufferedInputStream(inputStream)) {
|
InputStream input = new BufferedInputStream(inputStream)) {
|
||||||
Iterator<ImmutableObject> commitLogs = createDeserializingIterator(input);
|
Iterator<ImmutableObject> commitLogs = createDeserializingIterator(input, false);
|
||||||
checkState(commitLogs.hasNext());
|
checkState(commitLogs.hasNext());
|
||||||
checkState(commitLogs.next() instanceof CommitLogCheckpoint);
|
checkState(commitLogs.next() instanceof CommitLogCheckpoint);
|
||||||
|
|
||||||
|
|
115
core/src/main/java/google/registry/backup/EntityImports.java
Normal file
115
core/src/main/java/google/registry/backup/EntityImports.java
Normal file
|
@ -0,0 +1,115 @@
|
||||||
|
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package google.registry.backup;
|
||||||
|
|
||||||
|
import com.google.apphosting.api.ApiProxy;
|
||||||
|
import com.google.storage.onestore.v3.OnestoreEntity;
|
||||||
|
import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
|
||||||
|
import com.google.storage.onestore.v3.OnestoreEntity.Path;
|
||||||
|
import com.google.storage.onestore.v3.OnestoreEntity.Property.Meaning;
|
||||||
|
import com.google.storage.onestore.v3.OnestoreEntity.PropertyValue.ReferenceValue;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/** Utilities for handling imported Datastore entities. */
|
||||||
|
public class EntityImports {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transitively sets the {@code appId} of all keys in a foreign entity to that of the current
|
||||||
|
* system.
|
||||||
|
*/
|
||||||
|
public static EntityProto fixEntity(EntityProto entityProto) {
|
||||||
|
String currentAappId = ApiProxy.getCurrentEnvironment().getAppId();
|
||||||
|
if (Objects.equals(currentAappId, entityProto.getKey().getApp())) {
|
||||||
|
return entityProto;
|
||||||
|
}
|
||||||
|
return fixEntity(entityProto, currentAappId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static EntityProto fixEntity(EntityProto entityProto, String appId) {
|
||||||
|
if (entityProto.hasKey()) {
|
||||||
|
fixKey(entityProto, appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (OnestoreEntity.Property property : entityProto.mutablePropertys()) {
|
||||||
|
fixProperty(property, appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (OnestoreEntity.Property property : entityProto.mutableRawPropertys()) {
|
||||||
|
fixProperty(property, appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
// CommitLogMutation embeds an entity as bytes, which needs additional fixes.
|
||||||
|
if (isCommitLogMutation(entityProto)) {
|
||||||
|
fixMutationEntityProtoBytes(entityProto, appId);
|
||||||
|
}
|
||||||
|
return entityProto;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isCommitLogMutation(EntityProto entityProto) {
|
||||||
|
if (!entityProto.hasKey()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Path path = entityProto.getKey().getPath();
|
||||||
|
if (path.elementSize() == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return Objects.equals(
|
||||||
|
path.getElement(path.elementSize() - 1).getType(StandardCharsets.UTF_8),
|
||||||
|
"CommitLogMutation");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void fixMutationEntityProtoBytes(EntityProto entityProto, String appId) {
|
||||||
|
for (OnestoreEntity.Property property : entityProto.mutableRawPropertys()) {
|
||||||
|
if (Objects.equals(property.getName(), "entityProtoBytes")) {
|
||||||
|
OnestoreEntity.PropertyValue value = property.getValue();
|
||||||
|
EntityProto fixedProto =
|
||||||
|
fixEntity(bytesToEntityProto(value.getStringValueAsBytes()), appId);
|
||||||
|
value.setStringValueAsBytes(fixedProto.toByteArray());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void fixKey(EntityProto entityProto, String appId) {
|
||||||
|
entityProto.getMutableKey().setApp(appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void fixKey(ReferenceValue referenceValue, String appId) {
|
||||||
|
referenceValue.setApp(appId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void fixProperty(OnestoreEntity.Property property, String appId) {
|
||||||
|
OnestoreEntity.PropertyValue value = property.getMutableValue();
|
||||||
|
if (value.hasReferenceValue()) {
|
||||||
|
fixKey(value.getMutableReferenceValue(), appId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (property.getMeaningEnum().equals(Meaning.ENTITY_PROTO)) {
|
||||||
|
EntityProto embeddedProto = bytesToEntityProto(value.getStringValueAsBytes());
|
||||||
|
fixEntity(embeddedProto, appId);
|
||||||
|
value.setStringValueAsBytes(embeddedProto.toByteArray());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static EntityProto bytesToEntityProto(byte[] bytes) {
|
||||||
|
EntityProto entityProto = new EntityProto();
|
||||||
|
boolean isParsed = entityProto.parseFrom(bytes);
|
||||||
|
if (!isParsed) {
|
||||||
|
throw new IllegalStateException("Failed to parse raw bytes as EntityProto.");
|
||||||
|
}
|
||||||
|
return entityProto;
|
||||||
|
}
|
||||||
|
}
|
|
@ -104,7 +104,7 @@ public class RestoreCommitLogsAction implements Runnable {
|
||||||
try (InputStream input = Channels.newInputStream(
|
try (InputStream input = Channels.newInputStream(
|
||||||
gcsService.openPrefetchingReadChannel(metadata.getFilename(), 0, BLOCK_SIZE))) {
|
gcsService.openPrefetchingReadChannel(metadata.getFilename(), 0, BLOCK_SIZE))) {
|
||||||
PeekingIterator<ImmutableObject> commitLogs =
|
PeekingIterator<ImmutableObject> commitLogs =
|
||||||
peekingIterator(createDeserializingIterator(input));
|
peekingIterator(createDeserializingIterator(input, true));
|
||||||
lastCheckpoint = (CommitLogCheckpoint) commitLogs.next();
|
lastCheckpoint = (CommitLogCheckpoint) commitLogs.next();
|
||||||
saveOfy(ImmutableList.of(lastCheckpoint)); // Save the checkpoint itself.
|
saveOfy(ImmutableList.of(lastCheckpoint)); // Save the checkpoint itself.
|
||||||
while (commitLogs.hasNext()) {
|
while (commitLogs.hasNext()) {
|
||||||
|
|
|
@ -0,0 +1,87 @@
|
||||||
|
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package google.registry.backup;
|
||||||
|
|
||||||
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
|
import static google.registry.model.ofy.ObjectifyService.auditedOfy;
|
||||||
|
|
||||||
|
import com.google.appengine.api.datastore.DatastoreService;
|
||||||
|
import com.google.appengine.api.datastore.DatastoreServiceFactory;
|
||||||
|
import com.google.appengine.api.datastore.Entity;
|
||||||
|
import com.google.appengine.api.datastore.EntityTranslator;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.io.Resources;
|
||||||
|
import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
|
||||||
|
import google.registry.model.ofy.CommitLogCheckpoint;
|
||||||
|
import google.registry.testing.AppEngineExtension;
|
||||||
|
import google.registry.testing.DatastoreEntityExtension;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Order;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
|
|
||||||
|
public class EntityImportsTest {
|
||||||
|
|
||||||
|
@RegisterExtension
|
||||||
|
@Order(value = 1)
|
||||||
|
final DatastoreEntityExtension datastoreEntityExtension = new DatastoreEntityExtension();
|
||||||
|
|
||||||
|
@RegisterExtension
|
||||||
|
final AppEngineExtension appEngine =
|
||||||
|
new AppEngineExtension.Builder().withDatastoreAndCloudSql().withoutCannedData().build();
|
||||||
|
|
||||||
|
private DatastoreService datastoreService;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void beforeEach() {
|
||||||
|
datastoreService = DatastoreServiceFactory.getDatastoreService();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void importCommitLogs_keysFixed() throws Exception {
|
||||||
|
// Input resource is a standard commit log file whose entities has "AppId_1" as appId. The key
|
||||||
|
// fixes can be verified by checking that the appId of an imported entity's key has been updated
|
||||||
|
// to 'test' (which is set by AppEngineExtension) and/or that after persistence the imported
|
||||||
|
// entity can be loaded by Objectify.
|
||||||
|
try (InputStream commitLogInputStream =
|
||||||
|
Resources.getResource("google/registry/backup/commitlog.data").openStream()) {
|
||||||
|
ImmutableList<Entity> entities =
|
||||||
|
loadEntityProtos(commitLogInputStream).stream()
|
||||||
|
.map(EntityImports::fixEntity)
|
||||||
|
.map(EntityTranslator::createFromPb)
|
||||||
|
.collect(ImmutableList.toImmutableList());
|
||||||
|
// Verifies that the original appId has been overwritten.
|
||||||
|
assertThat(entities.get(0).getKey().getAppId()).isEqualTo("test");
|
||||||
|
datastoreService.put(entities);
|
||||||
|
// Imported entity can be found by Ofy after appId conversion.
|
||||||
|
assertThat(auditedOfy().load().type(CommitLogCheckpoint.class).count()).isGreaterThan(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ImmutableList<EntityProto> loadEntityProtos(InputStream inputStream) {
|
||||||
|
ImmutableList.Builder<EntityProto> protosBuilder = new ImmutableList.Builder<>();
|
||||||
|
while (true) {
|
||||||
|
EntityProto proto = new EntityProto();
|
||||||
|
boolean parsed = proto.parseDelimitedFrom(inputStream);
|
||||||
|
if (parsed && proto.isInitialized()) {
|
||||||
|
protosBuilder.add(proto);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return protosBuilder.build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -34,9 +34,11 @@ import com.google.appengine.tools.cloudstorage.GcsService;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.io.Resources;
|
||||||
import com.google.common.primitives.Longs;
|
import com.google.common.primitives.Longs;
|
||||||
import com.googlecode.objectify.Key;
|
import com.googlecode.objectify.Key;
|
||||||
import google.registry.model.ImmutableObject;
|
import google.registry.model.ImmutableObject;
|
||||||
|
import google.registry.model.domain.DomainBase;
|
||||||
import google.registry.model.ofy.CommitLogBucket;
|
import google.registry.model.ofy.CommitLogBucket;
|
||||||
import google.registry.model.ofy.CommitLogCheckpoint;
|
import google.registry.model.ofy.CommitLogCheckpoint;
|
||||||
import google.registry.model.ofy.CommitLogCheckpointRoot;
|
import google.registry.model.ofy.CommitLogCheckpointRoot;
|
||||||
|
@ -258,10 +260,40 @@ public class RestoreCommitLogsActionTest {
|
||||||
assertInDatastore(CommitLogCheckpointRoot.create(now));
|
assertInDatastore(CommitLogCheckpointRoot.create(now));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testRestore_fromOtherProject() throws IOException {
|
||||||
|
// Input resource is a standard commit log file whose entities has "AppId_1" as appId. Among the
|
||||||
|
// entities are CommitLogMutations that have an embedded DomainBase and a ContactResource, both
|
||||||
|
// having "AppId_1" as appId. This test verifies that the embedded entities are properly
|
||||||
|
// imported, in particular, the domain's 'registrant' key can be used by Objectify to load the
|
||||||
|
// contact.
|
||||||
|
saveDiffFile(
|
||||||
|
gcsService,
|
||||||
|
Resources.toByteArray(Resources.getResource("google/registry/backup/commitlog.data")),
|
||||||
|
now);
|
||||||
|
action.run();
|
||||||
|
auditedOfy().clearSessionCache();
|
||||||
|
List<DomainBase> domainBases = auditedOfy().load().type(DomainBase.class).list();
|
||||||
|
assertThat(domainBases).hasSize(1);
|
||||||
|
DomainBase domainBase = domainBases.get(0);
|
||||||
|
// If the registrant is found, then the key instance in domainBase is fixed.
|
||||||
|
assertThat(auditedOfy().load().key(domainBase.getRegistrant().getOfyKey()).now()).isNotNull();
|
||||||
|
}
|
||||||
|
|
||||||
static CommitLogCheckpoint createCheckpoint(DateTime now) {
|
static CommitLogCheckpoint createCheckpoint(DateTime now) {
|
||||||
return CommitLogCheckpoint.create(now, toMap(getBucketIds(), x -> now));
|
return CommitLogCheckpoint.create(now, toMap(getBucketIds(), x -> now));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void saveDiffFile(GcsService gcsService, byte[] rawBytes, DateTime timestamp)
|
||||||
|
throws IOException {
|
||||||
|
gcsService.createOrReplace(
|
||||||
|
new GcsFilename(GCS_BUCKET, DIFF_FILE_PREFIX + timestamp),
|
||||||
|
new GcsFileOptions.Builder()
|
||||||
|
.addUserMetadata(LOWER_BOUND_CHECKPOINT, timestamp.minusMinutes(1).toString())
|
||||||
|
.build(),
|
||||||
|
ByteBuffer.wrap(rawBytes));
|
||||||
|
}
|
||||||
|
|
||||||
static Iterable<ImmutableObject> saveDiffFile(
|
static Iterable<ImmutableObject> saveDiffFile(
|
||||||
GcsService gcsService, CommitLogCheckpoint checkpoint, ImmutableObject... entities)
|
GcsService gcsService, CommitLogCheckpoint checkpoint, ImmutableObject... entities)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -271,12 +303,7 @@ public class RestoreCommitLogsActionTest {
|
||||||
for (ImmutableObject entity : allEntities) {
|
for (ImmutableObject entity : allEntities) {
|
||||||
serializeEntity(entity, output);
|
serializeEntity(entity, output);
|
||||||
}
|
}
|
||||||
gcsService.createOrReplace(
|
saveDiffFile(gcsService, output.toByteArray(), now);
|
||||||
new GcsFilename(GCS_BUCKET, DIFF_FILE_PREFIX + now),
|
|
||||||
new GcsFileOptions.Builder()
|
|
||||||
.addUserMetadata(LOWER_BOUND_CHECKPOINT, now.minusMinutes(1).toString())
|
|
||||||
.build(),
|
|
||||||
ByteBuffer.wrap(output.toByteArray()));
|
|
||||||
return allEntities;
|
return allEntities;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
BIN
core/src/test/resources/google/registry/backup/commitlog.data
Normal file
BIN
core/src/test/resources/google/registry/backup/commitlog.data
Normal file
Binary file not shown.
Loading…
Add table
Add a link
Reference in a new issue