Fix hanging threads in GcsDiffFileLister (#1243)

* Fix hanging threads in GcsDiffFileLister

Basically, whenever we request threads using the request thread factory,
we must be on the request thread itself. Dagger doesn't guarantee this
for us if we provide the ExecutorService directly in the action (or in
the GcsDiffFileLister), but we can gurantee that we're on the request
thread itself by simply injecting a Lazy, so that the executor is
instantiated inside the request itself.

In addition, add a timeout on the futures just in case.
This commit is contained in:
gbrodman 2021-07-16 14:13:20 -04:00 committed by GitHub
parent bb5d2dcf0a
commit 34f3823960
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 91 additions and 55 deletions

View file

@ -35,6 +35,8 @@ import google.registry.request.HttpException.BadRequestException;
import google.registry.request.Parameter; import google.registry.request.Parameter;
import java.lang.annotation.Documented; import java.lang.annotation.Documented;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Qualifier; import javax.inject.Qualifier;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -101,4 +103,9 @@ public final class BackupModule {
static ListeningExecutorService provideListeningExecutorService() { static ListeningExecutorService provideListeningExecutorService() {
return listeningDecorator(newFixedThreadPool(NUM_THREADS, currentRequestThreadFactory())); return listeningDecorator(newFixedThreadPool(NUM_THREADS, currentRequestThreadFactory()));
} }
@Provides
static ScheduledExecutorService provideScheduledExecutorService() {
return Executors.newSingleThreadScheduledExecutor();
}
} }

View file

@ -28,11 +28,15 @@ import com.google.common.flogger.FluentLogger;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.UncheckedExecutionException;
import dagger.Lazy;
import google.registry.backup.BackupModule.Backups; import google.registry.backup.BackupModule.Backups;
import google.registry.gcs.GcsUtils; import google.registry.gcs.GcsUtils;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.inject.Inject; import javax.inject.Inject;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -42,15 +46,25 @@ class GcsDiffFileLister {
private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private static final FluentLogger logger = FluentLogger.forEnclosingClass();
/** Timeout for retrieving per-file information from GCS. */
private static final Duration FILE_INFO_TIMEOUT_DURATION = Duration.ofMinutes(1);
@Inject GcsUtils gcsUtils; @Inject GcsUtils gcsUtils;
@Inject @Backups ListeningExecutorService executor; @Inject @Backups Lazy<ListeningExecutorService> lazyExecutor;
@Inject GcsDiffFileLister() {} @Inject ScheduledExecutorService scheduledExecutorService;
@Inject
GcsDiffFileLister() {}
/** /**
* Traverses the sequence of diff files backwards from checkpointTime and inserts the file * Traverses the sequence of diff files backwards from checkpointTime and inserts the file
* metadata into "sequence". Returns true if a complete sequence was discovered, false if one or * metadata into "sequence". Returns true if a complete sequence was discovered, false if one or
* more files are missing. * more files are missing.
*
* @throws UncheckedExecutionException wrapping a {@link java.util.concurrent.TimeoutException} if
* the GCS call fails to finish within one minute, or wrapping any other exception if
* something else goes wrong.
*/ */
private boolean constructDiffSequence( private boolean constructDiffSequence(
String gcsBucket, String gcsBucket,
@ -62,7 +76,12 @@ class GcsDiffFileLister {
while (isBeforeOrAt(fromTime, checkpointTime)) { while (isBeforeOrAt(fromTime, checkpointTime)) {
BlobInfo blobInfo; BlobInfo blobInfo;
if (upperBoundTimesToBlobInfo.containsKey(checkpointTime)) { if (upperBoundTimesToBlobInfo.containsKey(checkpointTime)) {
blobInfo = Futures.getUnchecked(upperBoundTimesToBlobInfo.get(checkpointTime)); blobInfo =
Futures.getUnchecked(
Futures.withTimeout(
upperBoundTimesToBlobInfo.get(checkpointTime),
FILE_INFO_TIMEOUT_DURATION,
scheduledExecutorService));
} else { } else {
String filename = DIFF_FILE_PREFIX + checkpointTime; String filename = DIFF_FILE_PREFIX + checkpointTime;
logger.atInfo().log("Patching GCS list; discovered file: %s", filename); logger.atInfo().log("Patching GCS list; discovered file: %s", filename);
@ -102,12 +121,14 @@ class GcsDiffFileLister {
} }
DateTime lastUpperBoundTime = START_OF_TIME; DateTime lastUpperBoundTime = START_OF_TIME;
TreeMap<DateTime, BlobInfo> sequence = new TreeMap<>();
try {
for (String strippedFilename : strippedFilenames) { for (String strippedFilename : strippedFilenames) {
final String filename = DIFF_FILE_PREFIX + strippedFilename; final String filename = DIFF_FILE_PREFIX + strippedFilename;
DateTime upperBoundTime = DateTime.parse(strippedFilename); DateTime upperBoundTime = DateTime.parse(strippedFilename);
if (isInRange(upperBoundTime, fromTime, toTime)) { if (isInRange(upperBoundTime, fromTime, toTime)) {
upperBoundTimesToBlobInfo.put( upperBoundTimesToBlobInfo.put(
upperBoundTime, executor.submit(() -> getBlobInfo(gcsBucket, filename))); upperBoundTime, lazyExecutor.get().submit(() -> getBlobInfo(gcsBucket, filename)));
lastUpperBoundTime = latestOf(upperBoundTime, lastUpperBoundTime); lastUpperBoundTime = latestOf(upperBoundTime, lastUpperBoundTime);
} }
} }
@ -116,15 +137,14 @@ class GcsDiffFileLister {
return ImmutableList.of(); return ImmutableList.of();
} }
// Reconstruct the sequence of files by traversing backwards from "lastUpperBoundTime" (i.e. the // Reconstruct the sequence of files by traversing backwards from "lastUpperBoundTime" (i.e.
// last file that we found) and finding its previous file until we either run out of files or // the last file that we found) and finding its previous file until we either run out of files
// get to one that precedes "fromTime". // or get to one that precedes "fromTime".
// //
// GCS file listing is eventually consistent, so it's possible that we are missing a file. The // GCS file listing is eventually consistent, so it's possible that we are missing a file. The
// metadata of a file is sufficient to identify the preceding file, so if we start from the // metadata of a file is sufficient to identify the preceding file, so if we start from the
// last file and work backwards we can verify that we have no holes in our chain (although we // last file and work backwards we can verify that we have no holes in our chain (although we
// may be missing files at the end). // may be missing files at the end).
TreeMap<DateTime, BlobInfo> sequence = new TreeMap<>();
logger.atInfo().log("Restoring until: %s", lastUpperBoundTime); logger.atInfo().log("Restoring until: %s", lastUpperBoundTime);
boolean inconsistentFileSet = boolean inconsistentFileSet =
!constructDiffSequence( !constructDiffSequence(
@ -152,6 +172,9 @@ class GcsDiffFileLister {
!inconsistentFileSet, !inconsistentFileSet,
"Unable to compute commit diff history, there are either gaps or forks in the history " "Unable to compute commit diff history, there are either gaps or forks in the history "
+ "file set. Check log for details."); + "file set. Check log for details.");
} finally {
lazyExecutor.get().shutdown();
}
logger.atInfo().log( logger.atInfo().log(
"Actual restore from time: %s", getLowerBoundTime(sequence.firstEntry().getValue())); "Actual restore from time: %s", getLowerBoundTime(sequence.firstEntry().getValue()));

View file

@ -16,7 +16,6 @@ package google.registry.backup;
import static com.google.common.collect.Iterables.transform; import static com.google.common.collect.Iterables.transform;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static google.registry.backup.BackupUtils.GcsMetadataKeys.LOWER_BOUND_CHECKPOINT; import static google.registry.backup.BackupUtils.GcsMetadataKeys.LOWER_BOUND_CHECKPOINT;
import static google.registry.backup.ExportCommitLogDiffAction.DIFF_FILE_PREFIX; import static google.registry.backup.ExportCommitLogDiffAction.DIFF_FILE_PREFIX;
import static org.joda.time.DateTimeZone.UTC; import static org.joda.time.DateTimeZone.UTC;
@ -33,10 +32,12 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.flogger.LoggerConfig; import com.google.common.flogger.LoggerConfig;
import com.google.common.testing.TestLogHandler; import com.google.common.testing.TestLogHandler;
import com.google.common.util.concurrent.MoreExecutors;
import google.registry.gcs.GcsUtils; import google.registry.gcs.GcsUtils;
import google.registry.gcs.backport.LocalStorageHelper; import google.registry.gcs.backport.LocalStorageHelper;
import google.registry.testing.AppEngineExtension; import google.registry.testing.AppEngineExtension;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.logging.LogRecord; import java.util.logging.LogRecord;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -60,7 +61,8 @@ public class GcsDiffFileListerTest {
@BeforeEach @BeforeEach
void beforeEach() throws Exception { void beforeEach() throws Exception {
diffLister.gcsUtils = gcsUtils; diffLister.gcsUtils = gcsUtils;
diffLister.executor = newDirectExecutorService(); diffLister.lazyExecutor = MoreExecutors::newDirectExecutorService;
diffLister.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
addGcsFile(i, i + 1); addGcsFile(i, i + 1);
} }

View file

@ -16,7 +16,6 @@ package google.registry.backup;
import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static google.registry.backup.RestoreCommitLogsActionTest.createCheckpoint; import static google.registry.backup.RestoreCommitLogsActionTest.createCheckpoint;
import static google.registry.backup.RestoreCommitLogsActionTest.saveDiffFile; import static google.registry.backup.RestoreCommitLogsActionTest.saveDiffFile;
import static google.registry.backup.RestoreCommitLogsActionTest.saveDiffFileNotToRestore; import static google.registry.backup.RestoreCommitLogsActionTest.saveDiffFileNotToRestore;
@ -42,6 +41,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.ImmutableSortedMap;
import com.google.common.truth.Truth8; import com.google.common.truth.Truth8;
import com.google.common.util.concurrent.MoreExecutors;
import com.googlecode.objectify.Key; import com.googlecode.objectify.Key;
import google.registry.gcs.GcsUtils; import google.registry.gcs.GcsUtils;
import google.registry.gcs.backport.LocalStorageHelper; import google.registry.gcs.backport.LocalStorageHelper;
@ -72,6 +72,7 @@ import google.registry.testing.FakeResponse;
import google.registry.testing.TestObject; import google.registry.testing.TestObject;
import google.registry.util.RequestStatusChecker; import google.registry.util.RequestStatusChecker;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Executors;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -131,7 +132,8 @@ public class ReplayCommitLogsToSqlActionTest {
action.gcsBucket = "gcs bucket"; action.gcsBucket = "gcs bucket";
action.diffLister = new GcsDiffFileLister(); action.diffLister = new GcsDiffFileLister();
action.diffLister.gcsUtils = gcsUtils; action.diffLister.gcsUtils = gcsUtils;
action.diffLister.executor = newDirectExecutorService(); action.diffLister.lazyExecutor = MoreExecutors::newDirectExecutorService;
action.diffLister.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
ofyTm() ofyTm()
.transact( .transact(
() -> () ->

View file

@ -17,7 +17,6 @@ package google.registry.backup;
import static com.google.common.collect.Iterables.transform; import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Maps.toMap; import static com.google.common.collect.Maps.toMap;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static google.registry.backup.BackupUtils.GcsMetadataKeys.LOWER_BOUND_CHECKPOINT; import static google.registry.backup.BackupUtils.GcsMetadataKeys.LOWER_BOUND_CHECKPOINT;
import static google.registry.backup.BackupUtils.serializeEntity; import static google.registry.backup.BackupUtils.serializeEntity;
import static google.registry.backup.ExportCommitLogDiffAction.DIFF_FILE_PREFIX; import static google.registry.backup.ExportCommitLogDiffAction.DIFF_FILE_PREFIX;
@ -34,6 +33,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.Resources; import com.google.common.io.Resources;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.MoreExecutors;
import com.googlecode.objectify.Key; import com.googlecode.objectify.Key;
import google.registry.gcs.GcsUtils; import google.registry.gcs.GcsUtils;
import google.registry.gcs.backport.LocalStorageHelper; import google.registry.gcs.backport.LocalStorageHelper;
@ -55,6 +55,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.Executors;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -87,7 +88,8 @@ public class RestoreCommitLogsActionTest {
action.gcsBucketOverride = Optional.empty(); action.gcsBucketOverride = Optional.empty();
action.diffLister = new GcsDiffFileLister(); action.diffLister = new GcsDiffFileLister();
action.diffLister.gcsUtils = gcsUtils; action.diffLister.gcsUtils = gcsUtils;
action.diffLister.executor = newDirectExecutorService(); action.diffLister.lazyExecutor = MoreExecutors::newDirectExecutorService;
action.diffLister.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
} }
@Test @Test