mirror of
https://github.com/google/nomulus.git
synced 2025-04-30 12:07:51 +02:00
Add a beam pipeline to create synthetic history entries in SQL (#1383)
* Add a beam pipeline to create synthetic history entries in SQL The logic is mostly lifted from CreateSyntheticHistoryEntriesAction. We do not need to test for the existence of an embedded EPP resource in the history entry before create a synthetic one because after InitSqlPipeline runs it is guaranteed that no embedded resource exists.
This commit is contained in:
parent
f38497849f
commit
d0c8f29a3b
3 changed files with 255 additions and 1 deletions
|
@ -705,7 +705,11 @@ createToolTask(
|
|||
|
||||
|
||||
createToolTask(
|
||||
'jpaDemoPipeline', 'google.registry.beam.common.JpaDemoPipeline')
|
||||
'jpaDemoPipeline', 'google.registry.beam.common.JpaDemoPipeline')
|
||||
|
||||
createToolTask(
|
||||
'createSyntheticHistoryEntries',
|
||||
'google.registry.tools.javascrap.CreateSyntheticHistoryEntriesPipeline')
|
||||
|
||||
project.tasks.create('initSqlPipeline', JavaExec) {
|
||||
main = 'google.registry.beam.initsql.InitSqlPipeline'
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.tools.javascrap;
|
||||
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import dagger.Component;
|
||||
import google.registry.beam.common.RegistryJpaIO;
|
||||
import google.registry.beam.common.RegistryPipelineOptions;
|
||||
import google.registry.config.RegistryConfig.Config;
|
||||
import google.registry.config.RegistryConfig.ConfigModule;
|
||||
import google.registry.model.EppResource;
|
||||
import google.registry.model.UpdateAutoTimestamp;
|
||||
import google.registry.model.UpdateAutoTimestamp.DisableAutoUpdateResource;
|
||||
import google.registry.model.contact.ContactResource;
|
||||
import google.registry.model.domain.DomainBase;
|
||||
import google.registry.model.host.HostResource;
|
||||
import google.registry.model.reporting.HistoryEntry;
|
||||
import google.registry.persistence.PersistenceModule.TransactionIsolationLevel;
|
||||
import google.registry.persistence.VKey;
|
||||
import java.io.Serializable;
|
||||
import javax.inject.Singleton;
|
||||
import javax.persistence.Entity;
|
||||
import org.apache.beam.sdk.Pipeline;
|
||||
import org.apache.beam.sdk.options.PipelineOptionsFactory;
|
||||
import org.apache.beam.sdk.transforms.MapElements;
|
||||
import org.apache.beam.sdk.values.TypeDescriptor;
|
||||
|
||||
/**
|
||||
* Pipeline that creates a synthetic history entry for every {@link EppResource} in SQL at the
|
||||
* current time.
|
||||
*
|
||||
* <p>The history entries in Datastore does not have the EPP resource embedded in them. Therefore
|
||||
* after {@link google.registry.beam.initsql.InitSqlPipeline} runs, these fields will all be empty.
|
||||
* This pipeline loads all EPP resources and for each of them creates a synthetic history entry that
|
||||
* contains the resource and saves them back to SQL, so that they can be used in the RDE pipeline.
|
||||
*
|
||||
* <p>Note that this pipeline should only be run in a test environment right after the init SQL
|
||||
* pipeline finishes, and no EPP update is being made to the system, otherwise there is no garuantee
|
||||
* that the latest history entry for a given EPP resource does not already have the resource
|
||||
* embedded within it.
|
||||
*
|
||||
* <p>To run the pipeline:
|
||||
*
|
||||
* <p><code>
|
||||
* $ ./nom_build :core:cSHE --args="--region=us-central1
|
||||
* --runner=DataflowRunner
|
||||
* --registryEnvironment=CRASH
|
||||
* --project={project-id}
|
||||
* --workerMachineType=n2-standard-4"
|
||||
* </code>
|
||||
*
|
||||
* @see google.registry.tools.javascrap.CreateSyntheticHistoryEntriesAction
|
||||
*/
|
||||
public class CreateSyntheticHistoryEntriesPipeline implements Serializable {
|
||||
|
||||
private static final ImmutableList<Class<? extends EppResource>> EPP_RESOURCE_CLASSES =
|
||||
ImmutableList.of(DomainBase.class, ContactResource.class, HostResource.class);
|
||||
|
||||
private static final String HISTORY_REASON =
|
||||
"Backfill EppResource history objects after initial backup to SQL";
|
||||
|
||||
static void setup(Pipeline pipeline, String registryAdminRegistrarId) {
|
||||
for (Class<? extends EppResource> clazz : EPP_RESOURCE_CLASSES) {
|
||||
pipeline
|
||||
.apply(
|
||||
String.format("Read all %s", clazz.getSimpleName()),
|
||||
RegistryJpaIO.read(
|
||||
"SELECT id FROM %entity%"
|
||||
.replace("%entity%", clazz.getAnnotation(Entity.class).name()),
|
||||
String.class,
|
||||
repoId -> VKey.createSql(clazz, repoId)))
|
||||
.apply(
|
||||
String.format("Save a synthetic HistoryEntry for each %s", clazz),
|
||||
MapElements.into(TypeDescriptor.of(Void.class))
|
||||
.via(
|
||||
(VKey<? extends EppResource> key) -> {
|
||||
jpaTm()
|
||||
.transact(
|
||||
() -> {
|
||||
EppResource eppResource = jpaTm().loadByKey(key);
|
||||
try (DisableAutoUpdateResource disable =
|
||||
UpdateAutoTimestamp.disableAutoUpdate()) {
|
||||
jpaTm()
|
||||
.put(
|
||||
HistoryEntry.createBuilderForResource(eppResource)
|
||||
.setRegistrarId(registryAdminRegistrarId)
|
||||
.setBySuperuser(true)
|
||||
.setRequestedByRegistrar(false)
|
||||
.setModificationTime(jpaTm().getTransactionTime())
|
||||
.setReason(HISTORY_REASON)
|
||||
.setType(HistoryEntry.Type.SYNTHETIC)
|
||||
.build());
|
||||
}
|
||||
});
|
||||
return null;
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
RegistryPipelineOptions options =
|
||||
PipelineOptionsFactory.fromArgs(args).withValidation().as(RegistryPipelineOptions.class);
|
||||
RegistryPipelineOptions.validateRegistryPipelineOptions(options);
|
||||
options.setIsolationOverride(TransactionIsolationLevel.TRANSACTION_READ_COMMITTED);
|
||||
String registryAdminRegistrarId =
|
||||
DaggerCreateSyntheticHistoryEntriesPipeline_ConfigComponent.create()
|
||||
.getRegistryAdminRegistrarId();
|
||||
|
||||
Pipeline pipeline = Pipeline.create(options);
|
||||
setup(pipeline, registryAdminRegistrarId);
|
||||
pipeline.run();
|
||||
}
|
||||
|
||||
@Singleton
|
||||
@Component(modules = ConfigModule.class)
|
||||
interface ConfigComponent {
|
||||
|
||||
@Config("registryAdminClientId")
|
||||
String getRegistryAdminRegistrarId();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
// Copyright 2021 The Nomulus Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package google.registry.tools.javascrap;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static google.registry.model.ImmutableObjectSubject.assertAboutImmutableObjects;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.removeTmOverrideForTest;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.setTmOverrideForTest;
|
||||
import static google.registry.testing.DatabaseHelper.createTld;
|
||||
import static google.registry.testing.DatabaseHelper.newDomainBase;
|
||||
import static google.registry.testing.DatabaseHelper.persistActiveHost;
|
||||
import static google.registry.testing.DatabaseHelper.persistNewRegistrar;
|
||||
import static google.registry.testing.DatabaseHelper.persistSimpleResource;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import google.registry.beam.TestPipelineExtension;
|
||||
import google.registry.model.EppResource;
|
||||
import google.registry.model.contact.ContactHistory;
|
||||
import google.registry.model.contact.ContactResource;
|
||||
import google.registry.model.domain.DomainBase;
|
||||
import google.registry.model.domain.DomainHistory;
|
||||
import google.registry.model.host.HostHistory;
|
||||
import google.registry.model.host.HostResource;
|
||||
import google.registry.model.reporting.HistoryEntry;
|
||||
import google.registry.persistence.transaction.JpaTestExtensions;
|
||||
import google.registry.persistence.transaction.JpaTestExtensions.JpaIntegrationTestExtension;
|
||||
import google.registry.testing.DatastoreEntityExtension;
|
||||
import google.registry.testing.FakeClock;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
/** Unit tests for {@link CreateSyntheticHistoryEntriesPipeline}. */
|
||||
public class CreateSyntheticHistoryEntriesPipelineTest {
|
||||
|
||||
FakeClock clock = new FakeClock();
|
||||
|
||||
@RegisterExtension
|
||||
JpaIntegrationTestExtension jpaEextension =
|
||||
new JpaTestExtensions.Builder().withClock(clock).buildIntegrationTestExtension();
|
||||
|
||||
@RegisterExtension
|
||||
DatastoreEntityExtension datastoreEntityExtension =
|
||||
new DatastoreEntityExtension().allThreads(true);
|
||||
|
||||
@RegisterExtension TestPipelineExtension pipeline = TestPipelineExtension.create();
|
||||
|
||||
DomainBase domain;
|
||||
ContactResource contact;
|
||||
HostResource host;
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() {
|
||||
setTmOverrideForTest(jpaTm());
|
||||
persistNewRegistrar("TheRegistrar");
|
||||
persistNewRegistrar("NewRegistrar");
|
||||
createTld("tld");
|
||||
host = persistActiveHost("external.com");
|
||||
domain =
|
||||
persistSimpleResource(
|
||||
newDomainBase("example.tld").asBuilder().setNameservers(host.createVKey()).build());
|
||||
contact = jpaTm().transact(() -> jpaTm().loadByKey(domain.getRegistrant()));
|
||||
clock.advanceOneMilli();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void afterEach() {
|
||||
removeTmOverrideForTest();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSuccess() {
|
||||
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(DomainHistory.class))).isEmpty();
|
||||
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(ContactHistory.class))).isEmpty();
|
||||
assertThat(jpaTm().transact(() -> jpaTm().loadAllOf(HostHistory.class))).isEmpty();
|
||||
CreateSyntheticHistoryEntriesPipeline.setup(pipeline, "NewRegistrar");
|
||||
pipeline.run().waitUntilFinish();
|
||||
validateHistoryEntry(DomainHistory.class, domain);
|
||||
validateHistoryEntry(ContactHistory.class, contact);
|
||||
validateHistoryEntry(HostHistory.class, host);
|
||||
}
|
||||
|
||||
private static <T extends EppResource> void validateHistoryEntry(
|
||||
Class<? extends HistoryEntry> historyClazz, T resource) {
|
||||
ImmutableList<? extends HistoryEntry> historyEntries =
|
||||
jpaTm().transact(() -> jpaTm().loadAllOf(historyClazz));
|
||||
assertThat(historyEntries.size()).isEqualTo(1);
|
||||
HistoryEntry historyEntry = historyEntries.get(0);
|
||||
assertThat(historyEntry.getType()).isEqualTo(HistoryEntry.Type.SYNTHETIC);
|
||||
assertThat(historyEntry.getRegistrarId()).isEqualTo("NewRegistrar");
|
||||
EppResource embeddedResource;
|
||||
if (historyEntry instanceof DomainHistory) {
|
||||
embeddedResource = ((DomainHistory) historyEntry).getDomainContent().get();
|
||||
} else if (historyEntry instanceof ContactHistory) {
|
||||
embeddedResource = ((ContactHistory) historyEntry).getContactBase().get();
|
||||
} else {
|
||||
embeddedResource = ((HostHistory) historyEntry).getHostBase().get();
|
||||
}
|
||||
assertAboutImmutableObjects().that(embeddedResource).hasFieldsEqualTo(resource);
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue