mirror of
https://github.com/google/nomulus.git
synced 2025-08-02 07:52:11 +02:00
Make RefreshDnsOnHostRenameAction SQL-aware (#1190)
<!-- Reviewable:start --> This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/google/nomulus/1190) <!-- Reviewable:end -->
This commit is contained in:
parent
16392c3808
commit
68304133c4
4 changed files with 104 additions and 32 deletions
|
@ -23,6 +23,7 @@ import static google.registry.batch.AsyncTaskEnqueuer.PARAM_REQUESTED_TIME;
|
|||
import static google.registry.batch.AsyncTaskEnqueuer.QUEUE_ASYNC_HOST_RENAME;
|
||||
import static google.registry.batch.AsyncTaskMetrics.OperationType.DNS_REFRESH;
|
||||
import static google.registry.mapreduce.inputs.EppResourceInputs.createEntityInput;
|
||||
import static google.registry.model.EppResourceUtils.getLinkedDomainKeys;
|
||||
import static google.registry.model.EppResourceUtils.isActive;
|
||||
import static google.registry.model.EppResourceUtils.isDeleted;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
||||
|
@ -69,6 +70,7 @@ import java.util.logging.Level;
|
|||
import javax.annotation.Nullable;
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Named;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
|
||||
|
@ -86,6 +88,8 @@ public class RefreshDnsOnHostRenameAction implements Runnable {
|
|||
@Inject Clock clock;
|
||||
@Inject MapreduceRunner mrRunner;
|
||||
@Inject @Named(QUEUE_ASYNC_HOST_RENAME) Queue pullQueue;
|
||||
|
||||
@Inject DnsQueue dnsQueue;
|
||||
@Inject RequestStatusChecker requestStatusChecker;
|
||||
@Inject Response response;
|
||||
@Inject Retrier retrier;
|
||||
|
@ -153,7 +157,39 @@ public class RefreshDnsOnHostRenameAction implements Runnable {
|
|||
} else {
|
||||
logger.atInfo().log(
|
||||
"Processing asynchronous DNS refresh for renamed hosts: %s", hostKeys.build());
|
||||
runMapreduce(refreshRequests, lock);
|
||||
if (tm().isOfy()) {
|
||||
runMapreduce(refreshRequests, lock);
|
||||
} else {
|
||||
try {
|
||||
refreshRequests.stream()
|
||||
.flatMap(
|
||||
request ->
|
||||
getLinkedDomainKeys(request.hostKey(), request.lastUpdateTime(), null)
|
||||
.stream())
|
||||
.distinct()
|
||||
.map(domainKey -> tm().transact(() -> tm().loadByKey(domainKey).getDomainName()))
|
||||
.forEach(
|
||||
domainName -> {
|
||||
retrier.callWithRetry(
|
||||
() -> dnsQueue.addDomainRefreshTask(domainName),
|
||||
TransientFailureException.class);
|
||||
logger.atInfo().log("Enqueued DNS refresh for domain %s.", domainName);
|
||||
});
|
||||
deleteTasksWithRetry(
|
||||
refreshRequests,
|
||||
getQueue(QUEUE_ASYNC_HOST_RENAME),
|
||||
asyncTaskMetrics,
|
||||
retrier,
|
||||
OperationResult.SUCCESS);
|
||||
} catch (Throwable t) {
|
||||
String message = "Error refreshing DNS on host rename.";
|
||||
logger.atSevere().withCause(t).log(message);
|
||||
response.setPayload(message);
|
||||
response.setStatus(HttpStatus.SC_INTERNAL_SERVER_ERROR);
|
||||
} finally {
|
||||
lock.get().release();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ package google.registry.model;
|
|||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.collect.ImmutableSet.toImmutableSet;
|
||||
import static google.registry.model.ofy.ObjectifyService.auditedOfy;
|
||||
import static google.registry.model.ofy.ObjectifyService.ofy;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.jpaTm;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
||||
|
@ -366,27 +367,26 @@ public final class EppResourceUtils {
|
|||
*
|
||||
* @param key the referent key
|
||||
* @param now the logical time of the check
|
||||
* @param limit the maximum number of returned keys
|
||||
* @param limit the maximum number of returned keys, unlimited if null
|
||||
*/
|
||||
public static ImmutableSet<VKey<DomainBase>> getLinkedDomainKeys(
|
||||
VKey<? extends EppResource> key, DateTime now, int limit) {
|
||||
VKey<? extends EppResource> key, DateTime now, @Nullable Integer limit) {
|
||||
checkArgument(
|
||||
key.getKind().equals(ContactResource.class) || key.getKind().equals(HostResource.class),
|
||||
"key must be either VKey<ContactResource> or VKey<HostResource>, but it is %s",
|
||||
key);
|
||||
boolean isContactKey = key.getKind().equals(ContactResource.class);
|
||||
if (tm().isOfy()) {
|
||||
return ofy()
|
||||
.load()
|
||||
.type(DomainBase.class)
|
||||
.filter(isContactKey ? "allContacts.contact" : "nsHosts", key.getOfyKey())
|
||||
.filter("deletionTime >", now)
|
||||
.limit(limit)
|
||||
.keys()
|
||||
.list()
|
||||
.stream()
|
||||
.map(DomainBase::createVKey)
|
||||
.collect(toImmutableSet());
|
||||
com.googlecode.objectify.cmd.Query<DomainBase> query =
|
||||
auditedOfy()
|
||||
.load()
|
||||
.type(DomainBase.class)
|
||||
.filter(isContactKey ? "allContacts.contact" : "nsHosts", key.getOfyKey())
|
||||
.filter("deletionTime >", now);
|
||||
if (limit != null) {
|
||||
query.limit(limit);
|
||||
}
|
||||
return query.keys().list().stream().map(DomainBase::createVKey).collect(toImmutableSet());
|
||||
} else {
|
||||
return tm().transact(
|
||||
() -> {
|
||||
|
@ -405,11 +405,13 @@ public final class EppResourceUtils {
|
|||
.setParameter("fkRepoId", key.getSqlKey())
|
||||
.setParameter("now", now.toDate());
|
||||
}
|
||||
if (limit != null) {
|
||||
query.setMaxResults(limit);
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
ImmutableSet<VKey<DomainBase>> domainBaseKeySet =
|
||||
(ImmutableSet<VKey<DomainBase>>)
|
||||
query
|
||||
.setMaxResults(limit)
|
||||
.getResultStream()
|
||||
.map(
|
||||
repoId ->
|
||||
|
|
|
@ -17,6 +17,8 @@ package google.registry.batch;
|
|||
import static com.google.appengine.api.taskqueue.QueueFactory.getQueue;
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static com.google.common.truth.Truth8.assertThat;
|
||||
import static google.registry.batch.AsyncTaskEnqueuer.PARAM_HOST_KEY;
|
||||
import static google.registry.batch.AsyncTaskEnqueuer.PARAM_REQUESTED_TIME;
|
||||
import static google.registry.batch.AsyncTaskEnqueuer.QUEUE_ASYNC_HOST_RENAME;
|
||||
import static google.registry.batch.AsyncTaskMetrics.OperationType.DNS_REFRESH;
|
||||
import static google.registry.persistence.transaction.TransactionManagerFactory.tm;
|
||||
|
@ -45,6 +47,7 @@ import static org.mockito.Mockito.when;
|
|||
import com.googlecode.objectify.Key;
|
||||
import google.registry.batch.AsyncTaskMetrics.OperationResult;
|
||||
import google.registry.batch.RefreshDnsOnHostRenameAction.RefreshDnsOnHostRenameReducer;
|
||||
import google.registry.dns.DnsQueue;
|
||||
import google.registry.model.host.HostResource;
|
||||
import google.registry.model.server.Lock;
|
||||
import google.registry.testing.DualDatabaseTest;
|
||||
|
@ -55,6 +58,7 @@ import google.registry.testing.InjectExtension;
|
|||
import google.registry.testing.TaskQueueHelper.TaskMatcher;
|
||||
import google.registry.testing.TestOfyAndSql;
|
||||
import google.registry.testing.TestOfyOnly;
|
||||
import google.registry.testing.TestSqlOnly;
|
||||
import google.registry.testing.mapreduce.MapreduceTestCase;
|
||||
import google.registry.util.AppEngineServiceUtils;
|
||||
import google.registry.util.RequestStatusChecker;
|
||||
|
@ -62,6 +66,7 @@ import google.registry.util.Retrier;
|
|||
import google.registry.util.Sleeper;
|
||||
import google.registry.util.SystemSleeper;
|
||||
import java.util.Optional;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Duration;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
@ -94,6 +99,7 @@ public class RefreshDnsOnHostRenameActionTest
|
|||
action.clock = clock;
|
||||
action.mrRunner = makeDefaultRunner();
|
||||
action.pullQueue = getQueue(QUEUE_ASYNC_HOST_RENAME);
|
||||
action.dnsQueue = DnsQueue.createForTesting(clock);
|
||||
action.requestStatusChecker = requestStatusChecker;
|
||||
action.response = fakeResponse;
|
||||
action.retrier = new Retrier(new FakeSleeper(clock), 1);
|
||||
|
@ -102,7 +108,7 @@ public class RefreshDnsOnHostRenameActionTest
|
|||
.thenThrow(new AssertionError("Should not be called"));
|
||||
}
|
||||
|
||||
private void runMapreduce() throws Exception {
|
||||
private void runAction() throws Exception {
|
||||
clock.advanceOneMilli();
|
||||
// Use hard sleeps to ensure that the tasks are enqueued properly and will be leased.
|
||||
Sleeper sleeper = new SystemSleeper();
|
||||
|
@ -123,10 +129,32 @@ public class RefreshDnsOnHostRenameActionTest
|
|||
tm().clearSessionCache();
|
||||
}
|
||||
|
||||
// TODO(b/181662306) None of the map reduce tests work with SQL since our map-reduce setup is
|
||||
// inherently Datastore oriented, but this is a bigger task.
|
||||
@TestSqlOnly
|
||||
void testFailure_dnsUpdateEnqueueFailed() throws Exception {
|
||||
HostResource host = persistActiveHost("ns1.example.tld");
|
||||
persistResource(newDomainBase("example.tld", host));
|
||||
persistResource(newDomainBase("otherexample.tld", host));
|
||||
persistResource(newDomainBase("untouched.tld", persistActiveHost("ns2.example.tld")));
|
||||
DateTime timeEnqueued = clock.nowUtc();
|
||||
enqueuer.enqueueAsyncDnsRefresh(host, timeEnqueued);
|
||||
DnsQueue mockedQueue = mock(DnsQueue.class);
|
||||
action.dnsQueue = mockedQueue;
|
||||
when(mockedQueue.addDomainRefreshTask(anyString()))
|
||||
.thenThrow(new RuntimeException("Cannot enqueue task."));
|
||||
runAction();
|
||||
assertNoDnsTasksEnqueued();
|
||||
assertTasksEnqueued(
|
||||
QUEUE_ASYNC_HOST_RENAME,
|
||||
new TaskMatcher()
|
||||
.param(PARAM_HOST_KEY, host.createVKey().getOfyKey().getString())
|
||||
.param(PARAM_REQUESTED_TIME, timeEnqueued.toString()));
|
||||
verify(action.asyncTaskMetrics).recordDnsRefreshBatchSize(1L);
|
||||
verifyNoMoreInteractions(action.asyncTaskMetrics);
|
||||
assertThat(fakeResponse.getStatus()).isEqualTo(HttpStatus.SC_INTERNAL_SERVER_ERROR);
|
||||
assertThat(acquireLock()).isPresent();
|
||||
}
|
||||
|
||||
@TestOfyOnly
|
||||
@TestOfyAndSql
|
||||
void testSuccess_dnsUpdateEnqueued() throws Exception {
|
||||
HostResource host = persistActiveHost("ns1.example.tld");
|
||||
persistResource(newDomainBase("example.tld", host));
|
||||
|
@ -134,21 +162,22 @@ public class RefreshDnsOnHostRenameActionTest
|
|||
persistResource(newDomainBase("untouched.tld", persistActiveHost("ns2.example.tld")));
|
||||
DateTime timeEnqueued = clock.nowUtc();
|
||||
enqueuer.enqueueAsyncDnsRefresh(host, timeEnqueued);
|
||||
runMapreduce();
|
||||
runAction();
|
||||
assertDnsTasksEnqueued("example.tld", "otherexample.tld");
|
||||
assertNoTasksEnqueued(QUEUE_ASYNC_HOST_RENAME);
|
||||
verify(action.asyncTaskMetrics).recordDnsRefreshBatchSize(1L);
|
||||
verify(action.asyncTaskMetrics)
|
||||
.recordAsyncFlowResult(DNS_REFRESH, OperationResult.SUCCESS, timeEnqueued);
|
||||
verifyNoMoreInteractions(action.asyncTaskMetrics);
|
||||
assertThat(acquireLock()).isPresent();
|
||||
}
|
||||
|
||||
@TestOfyOnly
|
||||
@TestOfyAndSql
|
||||
void testSuccess_multipleHostsProcessedInBatch() throws Exception {
|
||||
HostResource host1 = persistActiveHost("ns1.example.tld");
|
||||
HostResource host2 = persistActiveHost("ns2.example.tld");
|
||||
HostResource host3 = persistActiveHost("ns3.example.tld");
|
||||
persistResource(newDomainBase("example1.tld", host1));
|
||||
persistResource(newDomainBase("example1.tld", host1, host2));
|
||||
persistResource(newDomainBase("example2.tld", host2));
|
||||
persistResource(newDomainBase("example3.tld", host3));
|
||||
DateTime timeEnqueued = clock.nowUtc();
|
||||
|
@ -156,7 +185,7 @@ public class RefreshDnsOnHostRenameActionTest
|
|||
enqueuer.enqueueAsyncDnsRefresh(host1, timeEnqueued);
|
||||
enqueuer.enqueueAsyncDnsRefresh(host2, timeEnqueued);
|
||||
enqueuer.enqueueAsyncDnsRefresh(host3, laterTimeEnqueued);
|
||||
runMapreduce();
|
||||
runAction();
|
||||
assertDnsTasksEnqueued("example1.tld", "example2.tld", "example3.tld");
|
||||
assertNoTasksEnqueued(QUEUE_ASYNC_HOST_RENAME);
|
||||
verify(action.asyncTaskMetrics).recordDnsRefreshBatchSize(3L);
|
||||
|
@ -165,21 +194,23 @@ public class RefreshDnsOnHostRenameActionTest
|
|||
verify(action.asyncTaskMetrics)
|
||||
.recordAsyncFlowResult(DNS_REFRESH, OperationResult.SUCCESS, laterTimeEnqueued);
|
||||
verifyNoMoreInteractions(action.asyncTaskMetrics);
|
||||
assertThat(acquireLock()).isPresent();
|
||||
}
|
||||
|
||||
@TestOfyOnly
|
||||
@TestOfyAndSql
|
||||
void testSuccess_deletedHost_doesntTriggerDnsRefresh() throws Exception {
|
||||
HostResource host = persistDeletedHost("ns11.fakesss.tld", clock.nowUtc().minusDays(4));
|
||||
persistResource(newDomainBase("example1.tld", host));
|
||||
DateTime timeEnqueued = clock.nowUtc();
|
||||
enqueuer.enqueueAsyncDnsRefresh(host, timeEnqueued);
|
||||
runMapreduce();
|
||||
runAction();
|
||||
assertNoDnsTasksEnqueued();
|
||||
assertNoTasksEnqueued(QUEUE_ASYNC_HOST_RENAME);
|
||||
verify(action.asyncTaskMetrics).recordDnsRefreshBatchSize(1L);
|
||||
verify(action.asyncTaskMetrics)
|
||||
.recordAsyncFlowResult(DNS_REFRESH, OperationResult.STALE, timeEnqueued);
|
||||
verifyNoMoreInteractions(action.asyncTaskMetrics);
|
||||
assertThat(acquireLock()).isPresent();
|
||||
}
|
||||
|
||||
@TestOfyAndSql
|
||||
|
@ -191,9 +222,10 @@ public class RefreshDnsOnHostRenameActionTest
|
|||
.setDeletionTime(START_OF_TIME)
|
||||
.build());
|
||||
enqueuer.enqueueAsyncDnsRefresh(renamedHost, clock.nowUtc());
|
||||
runMapreduce();
|
||||
runAction();
|
||||
assertNoDnsTasksEnqueued();
|
||||
assertNoTasksEnqueued(QUEUE_ASYNC_HOST_RENAME);
|
||||
assertThat(acquireLock()).isPresent();
|
||||
}
|
||||
|
||||
@TestOfyAndSql
|
||||
|
@ -217,9 +249,10 @@ public class RefreshDnsOnHostRenameActionTest
|
|||
enqueueMapreduceOnly();
|
||||
assertThat(fakeResponse.getPayload()).isEqualTo("Can't acquire lock; aborting.");
|
||||
assertNoDnsTasksEnqueued();
|
||||
assertThat(acquireLock()).isEmpty();
|
||||
}
|
||||
|
||||
@TestOfyAndSql
|
||||
@TestOfyOnly
|
||||
void test_mapreduceHasWorkToDo_lockIsAcquired() {
|
||||
HostResource host = persistActiveHost("ns1.example.tld");
|
||||
enqueuer.enqueueAsyncDnsRefresh(host, clock.nowUtc());
|
||||
|
|
|
@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkState;
|
|||
import static com.google.common.base.Suppliers.memoize;
|
||||
import static com.google.common.collect.ImmutableList.toImmutableList;
|
||||
import static com.google.common.collect.ImmutableMap.toImmutableMap;
|
||||
import static com.google.common.collect.ImmutableSet.toImmutableSet;
|
||||
import static com.google.common.collect.Iterables.toArray;
|
||||
import static com.google.common.collect.MoreCollectors.onlyElement;
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
|
@ -110,6 +111,7 @@ import google.registry.model.transfer.TransferStatus;
|
|||
import google.registry.persistence.VKey;
|
||||
import google.registry.schema.tld.PremiumListDao;
|
||||
import google.registry.tmch.LordnTaskUtils;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
|
@ -182,11 +184,10 @@ public class DatabaseHelper {
|
|||
domainName, generateNewDomainRoid(getTldFromDomainName(domainName)), contact);
|
||||
}
|
||||
|
||||
public static DomainBase newDomainBase(String domainName, HostResource host) {
|
||||
return newDomainBase(domainName)
|
||||
.asBuilder()
|
||||
.setNameservers(ImmutableSet.of(host.createVKey()))
|
||||
.build();
|
||||
public static DomainBase newDomainBase(String domainName, HostResource... hosts) {
|
||||
ImmutableSet<VKey<HostResource>> hostKeys =
|
||||
Arrays.stream(hosts).map(HostResource::createVKey).collect(toImmutableSet());
|
||||
return newDomainBase(domainName).asBuilder().setNameservers(hostKeys).build();
|
||||
}
|
||||
|
||||
public static DomainBase newDomainBase(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue