From 3947ac6ef716f54db1d8ee10b2be5f30a70ca188 Mon Sep 17 00:00:00 2001 From: Weimin Yu Date: Wed, 20 May 2020 10:26:34 -0400 Subject: [PATCH] Read LevelDb incrementally (#593) * Read LevelDb incrementally Made LevelDbLogReader an iterator over a LevelDb data stream, Reducing memory footprint which is important when used in a Dataflow pipeline. --- .../registry/tools/CompareDbBackups.java | 6 +- .../registry/tools/LevelDbLogReader.java | 115 ++++++++++++------ .../registry/tools/RecordAccumulator.java | 19 +-- .../tools/LevelDbFileBuilderTest.java | 11 +- .../registry/tools/LevelDbLogReaderTest.java | 61 ++++++---- .../registry/tools/RecordAccumulatorTest.java | 2 +- 6 files changed, 134 insertions(+), 80 deletions(-) diff --git a/core/src/main/java/google/registry/tools/CompareDbBackups.java b/core/src/main/java/google/registry/tools/CompareDbBackups.java index c0595d794..5f1766153 100644 --- a/core/src/main/java/google/registry/tools/CompareDbBackups.java +++ b/core/src/main/java/google/registry/tools/CompareDbBackups.java @@ -40,12 +40,10 @@ class CompareDbBackups { } ImmutableSet entities1 = - new RecordAccumulator() - .readDirectory(new File(args[0]), DATA_FILE_MATCHER) + RecordAccumulator.readDirectory(new File(args[0]), DATA_FILE_MATCHER) .getComparableEntitySet(); ImmutableSet entities2 = - new RecordAccumulator() - .readDirectory(new File(args[1]), DATA_FILE_MATCHER) + RecordAccumulator.readDirectory(new File(args[1]), DATA_FILE_MATCHER) .getComparableEntitySet(); // Calculate the entities added and removed. diff --git a/core/src/main/java/google/registry/tools/LevelDbLogReader.java b/core/src/main/java/google/registry/tools/LevelDbLogReader.java index 6562acdbc..8f6b26a7b 100644 --- a/core/src/main/java/google/registry/tools/LevelDbLogReader.java +++ b/core/src/main/java/google/registry/tools/LevelDbLogReader.java @@ -14,17 +14,25 @@ package google.registry.tools; +import static com.google.common.base.Preconditions.checkState; + import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Optional; /** - * Reads records from a set of LevelDB files and builds a gigantic ImmutableList from them. + * Iterator that incrementally parses binary data in LevelDb format into records. * *

See log_format.md for the @@ -32,19 +40,72 @@ import java.nio.file.Path; * *

There are several other implementations of this, none of which appeared suitable for our use * case: The original C++ implementation. - * com.google.appengine.api.files.RecordWriteChannel - Exactly what we need but deprecated. The + * href="https://cloud.google.com/appengine/docs/standard/java/javadoc/com/google/appengine/api/files/RecordReadChannel"> + * com.google.appengine.api.files.RecordReadChannel - Exactly what we need but deprecated. The * referenced replacement: The App Engine GCS * Client - Does not appear to have any support for working with LevelDB. */ -public final class LevelDbLogReader { +public final class LevelDbLogReader implements Iterator { @VisibleForTesting static final int BLOCK_SIZE = 32 * 1024; @VisibleForTesting static final int HEADER_SIZE = 7; private final ByteArrayOutputStream recordContents = new ByteArrayOutputStream(); - private final ImmutableList.Builder recordListBuilder = new ImmutableList.Builder<>(); + private final LinkedList recordList = Lists.newLinkedList(); + + private final ByteBuffer byteBuffer = ByteBuffer.allocate(BLOCK_SIZE); + private final ReadableByteChannel channel; + + LevelDbLogReader(ReadableByteChannel channel) { + this.channel = channel; + } + + @Override + public boolean hasNext() { + while (recordList.isEmpty()) { + try { + Optional block = readFromChannel(); + if (!block.isPresent()) { + return false; + } + if (block.get().length != BLOCK_SIZE) { + throw new IllegalStateException("Data size is not multiple of " + BLOCK_SIZE); + } + processBlock(block.get()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return true; + } + + @Override + public byte[] next() { + checkState(hasNext(), "The next() method called on empty iterator."); + return recordList.removeFirst(); + } + + /** + * Returns the next {@link #BLOCK_SIZE} bytes from the input channel, or {@link + * Optional#empty()} if there is no more data. + */ + // TODO(weiminyu): use ByteBuffer directly. + private Optional readFromChannel() throws IOException { + while (true) { + int bytesRead = channel.read(byteBuffer); + if (!byteBuffer.hasRemaining() || bytesRead < 0) { + byteBuffer.flip(); + if (!byteBuffer.hasRemaining()) { + return Optional.empty(); + } + byte[] result = new byte[byteBuffer.remaining()]; + byteBuffer.get(result); + byteBuffer.clear(); + return Optional.of(result); + } + } + } /** Read a complete block, which must be exactly 32 KB. */ private void processBlock(byte[] block) { @@ -63,7 +124,7 @@ public final class LevelDbLogReader { // If this is the last (or only) chunk in the record, store the full contents into the List. if (recordHeader.type == ChunkType.FULL || recordHeader.type == ChunkType.LAST) { - recordListBuilder.add(recordContents.toByteArray()); + recordList.add(recordContents.toByteArray()); recordContents.reset(); } @@ -96,40 +157,24 @@ public final class LevelDbLogReader { return new RecordHeader(checksum, size, ChunkType.fromCode(type)); } - /** Reads all records in the Reader into the record set. */ - public void readFrom(InputStream source) throws IOException { - byte[] block = new byte[BLOCK_SIZE]; - - // read until we have no more. - while (true) { - int amountRead = source.read(block, 0, BLOCK_SIZE); - if (amountRead <= 0) { - break; - } - assert amountRead == BLOCK_SIZE; - - processBlock(block); - } + /** Returns a {@link LevelDbLogReader} over a {@link ReadableByteChannel}. */ + public static LevelDbLogReader from(ReadableByteChannel channel) { + return new LevelDbLogReader(channel); } - /** Reads all records from the file specified by "path" into the record set. */ - public void readFrom(Path path) throws IOException { - readFrom(Files.newInputStream(path)); + /** Returns a {@link LevelDbLogReader} over an {@link InputStream}. */ + public static LevelDbLogReader from(InputStream source) { + return new LevelDbLogReader(Channels.newChannel(source)); } - /** Reads all records from the specified file into the record set. */ - public void readFrom(String filename) throws IOException { - readFrom(FileSystems.getDefault().getPath(filename)); + /** Returns a {@link LevelDbLogReader} over a file specified by {@link Path}. */ + public static LevelDbLogReader from(Path path) throws IOException { + return from(Files.newInputStream(path)); } - /** - * Gets the list of records constructed so far. - * - *

Note that this does not invalidate the internal state of the object: we return a copy and - * this can be called multiple times. - */ - ImmutableList getRecords() { - return recordListBuilder.build(); + /** Returns a {@link LevelDbLogReader} over a file specified by {@code filename}. */ + public static LevelDbLogReader from(String filename) throws IOException { + return from(FileSystems.getDefault().getPath(filename)); } /** Aggregates the fields in a record header. */ diff --git a/core/src/main/java/google/registry/tools/RecordAccumulator.java b/core/src/main/java/google/registry/tools/RecordAccumulator.java index 676c88f9a..855656b96 100644 --- a/core/src/main/java/google/registry/tools/RecordAccumulator.java +++ b/core/src/main/java/google/registry/tools/RecordAccumulator.java @@ -15,38 +15,43 @@ package google.registry.tools; import com.google.appengine.api.datastore.EntityTranslator; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.storage.onestore.v3.OnestoreEntity.EntityProto; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.util.function.Predicate; /** Accumulates Entity records from level db files under a directory hierarchy. */ class RecordAccumulator { - private final LevelDbLogReader reader = new LevelDbLogReader(); + private final ImmutableList records; + + RecordAccumulator(ImmutableList records) { + this.records = records; + } /** Recursively reads all records in the directory. */ - public final RecordAccumulator readDirectory(File dir, Predicate fileMatcher) { + public static RecordAccumulator readDirectory(File dir, Predicate fileMatcher) { + ImmutableList.Builder builder = new ImmutableList.Builder<>(); for (File child : dir.listFiles()) { if (child.isDirectory()) { - readDirectory(child, fileMatcher); + builder.addAll(readDirectory(child, fileMatcher).records); } else if (fileMatcher.test(child)) { try { - reader.readFrom(new FileInputStream(child)); + builder.addAll(LevelDbLogReader.from(child.getPath())); } catch (IOException e) { throw new RuntimeException("IOException reading from file: " + child, e); } } } - return this; + return new RecordAccumulator(builder.build()); } /** Creates an entity set from the current set of raw records. */ ImmutableSet getComparableEntitySet() { ImmutableSet.Builder builder = new ImmutableSet.Builder<>(); - for (byte[] rawRecord : reader.getRecords()) { + for (byte[] rawRecord : records) { // Parse the entity proto and create an Entity object from it. EntityProto proto = new EntityProto(); proto.parseFrom(rawRecord); diff --git a/core/src/test/java/google/registry/tools/LevelDbFileBuilderTest.java b/core/src/test/java/google/registry/tools/LevelDbFileBuilderTest.java index 624d7bcba..75c876efd 100644 --- a/core/src/test/java/google/registry/tools/LevelDbFileBuilderTest.java +++ b/core/src/test/java/google/registry/tools/LevelDbFileBuilderTest.java @@ -23,7 +23,6 @@ import com.google.storage.onestore.v3.OnestoreEntity.EntityProto; import google.registry.testing.AppEngineRule; import google.registry.tools.LevelDbFileBuilder.Property; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import org.junit.Rule; import org.junit.Test; @@ -51,10 +50,7 @@ public class LevelDbFileBuilderTest { BASE_ID, Property.create("first", 100L), Property.create("second", 200L)); builder.build(); - LevelDbLogReader reader = new LevelDbLogReader(); - reader.readFrom(new FileInputStream(logFile)); - - ImmutableList records = reader.getRecords(); + ImmutableList records = ImmutableList.copyOf(LevelDbLogReader.from(logFile.getPath())); assertThat(records).hasSize(1); // Reconstitute an entity, make sure that what we've got is the same as what we started with. @@ -82,10 +78,7 @@ public class LevelDbFileBuilderTest { builder.build(); ImmutableList originalEntities = originalEntitiesBuilder.build(); - LevelDbLogReader reader = new LevelDbLogReader(); - reader.readFrom(new FileInputStream(logFile)); - - ImmutableList records = reader.getRecords(); + ImmutableList records = ImmutableList.copyOf(LevelDbLogReader.from(logFile.getPath())); assertThat(records).hasSize(1000); int index = 0; for (byte[] record : records) { diff --git a/core/src/test/java/google/registry/tools/LevelDbLogReaderTest.java b/core/src/test/java/google/registry/tools/LevelDbLogReaderTest.java index ca6daaf7d..1be5cb562 100644 --- a/core/src/test/java/google/registry/tools/LevelDbLogReaderTest.java +++ b/core/src/test/java/google/registry/tools/LevelDbLogReaderTest.java @@ -17,6 +17,7 @@ package google.registry.tools; import static com.google.common.truth.Truth.assertThat; import static google.registry.tools.LevelDbUtil.MAX_RECORD; import static google.registry.tools.LevelDbUtil.addRecord; +import static org.junit.jupiter.api.Assertions.assertThrows; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Bytes; @@ -24,12 +25,9 @@ import google.registry.tools.LevelDbLogReader.ChunkType; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.List; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.jupiter.api.Test; -/** LevelDbLogReader tests. */ -@RunWith(JUnit4.class) +/** Unit tests of {@link LevelDbLogReader}. */ public final class LevelDbLogReaderTest { // Size of the test record. Any value < 256 will do. @@ -54,28 +52,23 @@ public final class LevelDbLogReaderTest { @Test public void testSimpleBlock() throws IOException { TestBlock block = makeBlockOfRepeatingBytes(0); - LevelDbLogReader reader = new LevelDbLogReader(); - reader.readFrom(new ByteArrayInputStream(block.data)); - ImmutableList records = reader.getRecords(); - assertThat(records).hasSize(block.recordCount); + assertThat(readIncrementally(block.data)).hasSize(block.recordCount); } @Test public void testLargeRecord() throws IOException { - byte[] block = new byte[LevelDbLogReader.BLOCK_SIZE]; - addRecord(block, 0, ChunkType.FIRST, MAX_RECORD, (byte) 1); - LevelDbLogReader reader = new LevelDbLogReader(); - reader.readFrom(new ByteArrayInputStream(block)); - assertThat(reader.getRecords()).isEmpty(); + byte[] block0 = new byte[LevelDbLogReader.BLOCK_SIZE]; + addRecord(block0, 0, ChunkType.FIRST, MAX_RECORD, (byte) 1); + assertThat(readIncrementally(block0)).isEmpty(); - addRecord(block, 0, ChunkType.MIDDLE, MAX_RECORD, (byte) 2); - reader.readFrom(new ByteArrayInputStream(block)); - assertThat(reader.getRecords()).isEmpty(); + byte[] block1 = new byte[LevelDbLogReader.BLOCK_SIZE]; + addRecord(block1, 0, ChunkType.MIDDLE, MAX_RECORD, (byte) 2); + assertThat(readIncrementally(block0, block1)).isEmpty(); - addRecord(block, 0, ChunkType.LAST, MAX_RECORD, (byte) 3); - reader.readFrom(new ByteArrayInputStream(block)); + byte[] block2 = new byte[LevelDbLogReader.BLOCK_SIZE]; + addRecord(block2, 0, ChunkType.LAST, MAX_RECORD, (byte) 3); - List records = reader.getRecords(); + List records = readIncrementally(block0, block1, block2); assertThat(records).hasSize(1); byte[] record = records.get(0); @@ -95,11 +88,24 @@ public final class LevelDbLogReaderTest { public void readFromMultiBlockStream() throws IOException { TestBlock block0 = makeBlockOfRepeatingBytes(0); TestBlock block1 = makeBlockOfRepeatingBytes(138); - ByteArrayInputStream source = new ByteArrayInputStream(Bytes.concat(block0.data, block1.data)); + assertThat(readIncrementally(block0.data, block1.data)) + .hasSize(block0.recordCount + block1.recordCount); + } - LevelDbLogReader reader = new LevelDbLogReader(); - reader.readFrom(source); - assertThat(reader.getRecords()).hasSize(block0.recordCount + block1.recordCount); + @Test + void read_noData() { + assertThat(readIncrementally(new byte[0])).isEmpty(); + } + + @Test + void read_failBadFirstBlock() { + assertThrows(IllegalStateException.class, () -> readIncrementally(new byte[1])); + } + + @Test + void read_failBadTrailingBlock() { + TestBlock block = makeBlockOfRepeatingBytes(0); + assertThrows(IllegalStateException.class, () -> readIncrementally(block.data, new byte[2])); } @Test @@ -112,6 +118,13 @@ public final class LevelDbLogReaderTest { assertThat(ChunkType.fromCode(ChunkType.LAST.getCode())).isEqualTo(ChunkType.LAST); } + @SafeVarargs + private static ImmutableList readIncrementally(byte[]... blocks) { + LevelDbLogReader recordReader = + LevelDbLogReader.from(new ByteArrayInputStream(Bytes.concat(blocks))); + return ImmutableList.copyOf(recordReader); + } + /** Aggregates the bytes of a test block with the record count. */ private static final class TestBlock { final byte[] data; diff --git a/core/src/test/java/google/registry/tools/RecordAccumulatorTest.java b/core/src/test/java/google/registry/tools/RecordAccumulatorTest.java index 9073176eb..853807029 100644 --- a/core/src/test/java/google/registry/tools/RecordAccumulatorTest.java +++ b/core/src/test/java/google/registry/tools/RecordAccumulatorTest.java @@ -76,7 +76,7 @@ public class RecordAccumulatorTest { builder.build(); ImmutableSet entities = - new RecordAccumulator().readDirectory(subdir, any -> true).getComparableEntitySet(); + RecordAccumulator.readDirectory(subdir, any -> true).getComparableEntitySet(); assertThat(entities).containsExactly(e1, e2, e3); } }