Read LevelDb incrementally (#593)

* Read LevelDb incrementally

Made LevelDbLogReader an iterator over a LevelDb data stream,
Reducing memory footprint which is important when used in a
Dataflow pipeline.
This commit is contained in:
Weimin Yu 2020-05-20 10:26:34 -04:00 committed by GitHub
parent 579a3d0ac1
commit 3947ac6ef7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 134 additions and 80 deletions

View file

@ -40,12 +40,10 @@ class CompareDbBackups {
} }
ImmutableSet<ComparableEntity> entities1 = ImmutableSet<ComparableEntity> entities1 =
new RecordAccumulator() RecordAccumulator.readDirectory(new File(args[0]), DATA_FILE_MATCHER)
.readDirectory(new File(args[0]), DATA_FILE_MATCHER)
.getComparableEntitySet(); .getComparableEntitySet();
ImmutableSet<ComparableEntity> entities2 = ImmutableSet<ComparableEntity> entities2 =
new RecordAccumulator() RecordAccumulator.readDirectory(new File(args[1]), DATA_FILE_MATCHER)
.readDirectory(new File(args[1]), DATA_FILE_MATCHER)
.getComparableEntitySet(); .getComparableEntitySet();
// Calculate the entities added and removed. // Calculate the entities added and removed.

View file

@ -14,17 +14,25 @@
package google.registry.tools; package google.registry.tools;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.FileSystems; import java.nio.file.FileSystems;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Optional;
/** /**
* Reads records from a set of LevelDB files and builds a gigantic ImmutableList from them. * Iterator that incrementally parses binary data in LevelDb format into records.
* *
* <p>See <a * <p>See <a
* href="https://github.com/google/leveldb/blob/master/doc/log_format.md">log_format.md</a> for the * href="https://github.com/google/leveldb/blob/master/doc/log_format.md">log_format.md</a> for the
@ -32,19 +40,72 @@ import java.nio.file.Path;
* *
* <p>There are several other implementations of this, none of which appeared suitable for our use * <p>There are several other implementations of this, none of which appeared suitable for our use
* case: <a href="https://github.com/google/leveldb">The original C++ implementation</a>. <a * case: <a href="https://github.com/google/leveldb">The original C++ implementation</a>. <a
* href="https://cloud.google.com/appengine/docs/standard/java/javadoc/com/google/appengine/api/files/RecordWriteChannel"> * href="https://cloud.google.com/appengine/docs/standard/java/javadoc/com/google/appengine/api/files/RecordReadChannel">
* com.google.appengine.api.files.RecordWriteChannel</a> - Exactly what we need but deprecated. The * com.google.appengine.api.files.RecordReadChannel</a> - Exactly what we need but deprecated. The
* referenced replacement: <a * referenced replacement: <a
* href="https://github.com/GoogleCloudPlatform/appengine-gcs-client.git">The App Engine GCS * href="https://github.com/GoogleCloudPlatform/appengine-gcs-client.git">The App Engine GCS
* Client</a> - Does not appear to have any support for working with LevelDB. * Client</a> - Does not appear to have any support for working with LevelDB.
*/ */
public final class LevelDbLogReader { public final class LevelDbLogReader implements Iterator<byte[]> {
@VisibleForTesting static final int BLOCK_SIZE = 32 * 1024; @VisibleForTesting static final int BLOCK_SIZE = 32 * 1024;
@VisibleForTesting static final int HEADER_SIZE = 7; @VisibleForTesting static final int HEADER_SIZE = 7;
private final ByteArrayOutputStream recordContents = new ByteArrayOutputStream(); private final ByteArrayOutputStream recordContents = new ByteArrayOutputStream();
private final ImmutableList.Builder<byte[]> recordListBuilder = new ImmutableList.Builder<>(); private final LinkedList<byte[]> recordList = Lists.newLinkedList();
private final ByteBuffer byteBuffer = ByteBuffer.allocate(BLOCK_SIZE);
private final ReadableByteChannel channel;
LevelDbLogReader(ReadableByteChannel channel) {
this.channel = channel;
}
@Override
public boolean hasNext() {
while (recordList.isEmpty()) {
try {
Optional<byte[]> block = readFromChannel();
if (!block.isPresent()) {
return false;
}
if (block.get().length != BLOCK_SIZE) {
throw new IllegalStateException("Data size is not multiple of " + BLOCK_SIZE);
}
processBlock(block.get());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return true;
}
@Override
public byte[] next() {
checkState(hasNext(), "The next() method called on empty iterator.");
return recordList.removeFirst();
}
/**
* Returns the next {@link #BLOCK_SIZE} bytes from the input channel, or {@link
* Optional#empty()} if there is no more data.
*/
// TODO(weiminyu): use ByteBuffer directly.
private Optional<byte[]> readFromChannel() throws IOException {
while (true) {
int bytesRead = channel.read(byteBuffer);
if (!byteBuffer.hasRemaining() || bytesRead < 0) {
byteBuffer.flip();
if (!byteBuffer.hasRemaining()) {
return Optional.empty();
}
byte[] result = new byte[byteBuffer.remaining()];
byteBuffer.get(result);
byteBuffer.clear();
return Optional.of(result);
}
}
}
/** Read a complete block, which must be exactly 32 KB. */ /** Read a complete block, which must be exactly 32 KB. */
private void processBlock(byte[] block) { private void processBlock(byte[] block) {
@ -63,7 +124,7 @@ public final class LevelDbLogReader {
// If this is the last (or only) chunk in the record, store the full contents into the List. // If this is the last (or only) chunk in the record, store the full contents into the List.
if (recordHeader.type == ChunkType.FULL || recordHeader.type == ChunkType.LAST) { if (recordHeader.type == ChunkType.FULL || recordHeader.type == ChunkType.LAST) {
recordListBuilder.add(recordContents.toByteArray()); recordList.add(recordContents.toByteArray());
recordContents.reset(); recordContents.reset();
} }
@ -96,40 +157,24 @@ public final class LevelDbLogReader {
return new RecordHeader(checksum, size, ChunkType.fromCode(type)); return new RecordHeader(checksum, size, ChunkType.fromCode(type));
} }
/** Reads all records in the Reader into the record set. */ /** Returns a {@link LevelDbLogReader} over a {@link ReadableByteChannel}. */
public void readFrom(InputStream source) throws IOException { public static LevelDbLogReader from(ReadableByteChannel channel) {
byte[] block = new byte[BLOCK_SIZE]; return new LevelDbLogReader(channel);
// read until we have no more.
while (true) {
int amountRead = source.read(block, 0, BLOCK_SIZE);
if (amountRead <= 0) {
break;
}
assert amountRead == BLOCK_SIZE;
processBlock(block);
}
} }
/** Reads all records from the file specified by "path" into the record set. */ /** Returns a {@link LevelDbLogReader} over an {@link InputStream}. */
public void readFrom(Path path) throws IOException { public static LevelDbLogReader from(InputStream source) {
readFrom(Files.newInputStream(path)); return new LevelDbLogReader(Channels.newChannel(source));
} }
/** Reads all records from the specified file into the record set. */ /** Returns a {@link LevelDbLogReader} over a file specified by {@link Path}. */
public void readFrom(String filename) throws IOException { public static LevelDbLogReader from(Path path) throws IOException {
readFrom(FileSystems.getDefault().getPath(filename)); return from(Files.newInputStream(path));
} }
/** /** Returns a {@link LevelDbLogReader} over a file specified by {@code filename}. */
* Gets the list of records constructed so far. public static LevelDbLogReader from(String filename) throws IOException {
* return from(FileSystems.getDefault().getPath(filename));
* <p>Note that this does not invalidate the internal state of the object: we return a copy and
* this can be called multiple times.
*/
ImmutableList<byte[]> getRecords() {
return recordListBuilder.build();
} }
/** Aggregates the fields in a record header. */ /** Aggregates the fields in a record header. */

View file

@ -15,38 +15,43 @@
package google.registry.tools; package google.registry.tools;
import com.google.appengine.api.datastore.EntityTranslator; import com.google.appengine.api.datastore.EntityTranslator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.storage.onestore.v3.OnestoreEntity.EntityProto; import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.function.Predicate; import java.util.function.Predicate;
/** Accumulates Entity records from level db files under a directory hierarchy. */ /** Accumulates Entity records from level db files under a directory hierarchy. */
class RecordAccumulator { class RecordAccumulator {
private final LevelDbLogReader reader = new LevelDbLogReader(); private final ImmutableList<byte[]> records;
RecordAccumulator(ImmutableList<byte[]> records) {
this.records = records;
}
/** Recursively reads all records in the directory. */ /** Recursively reads all records in the directory. */
public final RecordAccumulator readDirectory(File dir, Predicate<File> fileMatcher) { public static RecordAccumulator readDirectory(File dir, Predicate<File> fileMatcher) {
ImmutableList.Builder<byte[]> builder = new ImmutableList.Builder<>();
for (File child : dir.listFiles()) { for (File child : dir.listFiles()) {
if (child.isDirectory()) { if (child.isDirectory()) {
readDirectory(child, fileMatcher); builder.addAll(readDirectory(child, fileMatcher).records);
} else if (fileMatcher.test(child)) { } else if (fileMatcher.test(child)) {
try { try {
reader.readFrom(new FileInputStream(child)); builder.addAll(LevelDbLogReader.from(child.getPath()));
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("IOException reading from file: " + child, e); throw new RuntimeException("IOException reading from file: " + child, e);
} }
} }
} }
return this; return new RecordAccumulator(builder.build());
} }
/** Creates an entity set from the current set of raw records. */ /** Creates an entity set from the current set of raw records. */
ImmutableSet<ComparableEntity> getComparableEntitySet() { ImmutableSet<ComparableEntity> getComparableEntitySet() {
ImmutableSet.Builder<ComparableEntity> builder = new ImmutableSet.Builder<>(); ImmutableSet.Builder<ComparableEntity> builder = new ImmutableSet.Builder<>();
for (byte[] rawRecord : reader.getRecords()) { for (byte[] rawRecord : records) {
// Parse the entity proto and create an Entity object from it. // Parse the entity proto and create an Entity object from it.
EntityProto proto = new EntityProto(); EntityProto proto = new EntityProto();
proto.parseFrom(rawRecord); proto.parseFrom(rawRecord);

View file

@ -23,7 +23,6 @@ import com.google.storage.onestore.v3.OnestoreEntity.EntityProto;
import google.registry.testing.AppEngineRule; import google.registry.testing.AppEngineRule;
import google.registry.tools.LevelDbFileBuilder.Property; import google.registry.tools.LevelDbFileBuilder.Property;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -51,10 +50,7 @@ public class LevelDbFileBuilderTest {
BASE_ID, Property.create("first", 100L), Property.create("second", 200L)); BASE_ID, Property.create("first", 100L), Property.create("second", 200L));
builder.build(); builder.build();
LevelDbLogReader reader = new LevelDbLogReader(); ImmutableList<byte[]> records = ImmutableList.copyOf(LevelDbLogReader.from(logFile.getPath()));
reader.readFrom(new FileInputStream(logFile));
ImmutableList<byte[]> records = reader.getRecords();
assertThat(records).hasSize(1); assertThat(records).hasSize(1);
// Reconstitute an entity, make sure that what we've got is the same as what we started with. // Reconstitute an entity, make sure that what we've got is the same as what we started with.
@ -82,10 +78,7 @@ public class LevelDbFileBuilderTest {
builder.build(); builder.build();
ImmutableList<ComparableEntity> originalEntities = originalEntitiesBuilder.build(); ImmutableList<ComparableEntity> originalEntities = originalEntitiesBuilder.build();
LevelDbLogReader reader = new LevelDbLogReader(); ImmutableList<byte[]> records = ImmutableList.copyOf(LevelDbLogReader.from(logFile.getPath()));
reader.readFrom(new FileInputStream(logFile));
ImmutableList<byte[]> records = reader.getRecords();
assertThat(records).hasSize(1000); assertThat(records).hasSize(1000);
int index = 0; int index = 0;
for (byte[] record : records) { for (byte[] record : records) {

View file

@ -17,6 +17,7 @@ package google.registry.tools;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static google.registry.tools.LevelDbUtil.MAX_RECORD; import static google.registry.tools.LevelDbUtil.MAX_RECORD;
import static google.registry.tools.LevelDbUtil.addRecord; import static google.registry.tools.LevelDbUtil.addRecord;
import static org.junit.jupiter.api.Assertions.assertThrows;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Bytes; import com.google.common.primitives.Bytes;
@ -24,12 +25,9 @@ import google.registry.tools.LevelDbLogReader.ChunkType;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.junit.Test; import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** LevelDbLogReader tests. */ /** Unit tests of {@link LevelDbLogReader}. */
@RunWith(JUnit4.class)
public final class LevelDbLogReaderTest { public final class LevelDbLogReaderTest {
// Size of the test record. Any value < 256 will do. // Size of the test record. Any value < 256 will do.
@ -54,28 +52,23 @@ public final class LevelDbLogReaderTest {
@Test @Test
public void testSimpleBlock() throws IOException { public void testSimpleBlock() throws IOException {
TestBlock block = makeBlockOfRepeatingBytes(0); TestBlock block = makeBlockOfRepeatingBytes(0);
LevelDbLogReader reader = new LevelDbLogReader(); assertThat(readIncrementally(block.data)).hasSize(block.recordCount);
reader.readFrom(new ByteArrayInputStream(block.data));
ImmutableList<byte[]> records = reader.getRecords();
assertThat(records).hasSize(block.recordCount);
} }
@Test @Test
public void testLargeRecord() throws IOException { public void testLargeRecord() throws IOException {
byte[] block = new byte[LevelDbLogReader.BLOCK_SIZE]; byte[] block0 = new byte[LevelDbLogReader.BLOCK_SIZE];
addRecord(block, 0, ChunkType.FIRST, MAX_RECORD, (byte) 1); addRecord(block0, 0, ChunkType.FIRST, MAX_RECORD, (byte) 1);
LevelDbLogReader reader = new LevelDbLogReader(); assertThat(readIncrementally(block0)).isEmpty();
reader.readFrom(new ByteArrayInputStream(block));
assertThat(reader.getRecords()).isEmpty();
addRecord(block, 0, ChunkType.MIDDLE, MAX_RECORD, (byte) 2); byte[] block1 = new byte[LevelDbLogReader.BLOCK_SIZE];
reader.readFrom(new ByteArrayInputStream(block)); addRecord(block1, 0, ChunkType.MIDDLE, MAX_RECORD, (byte) 2);
assertThat(reader.getRecords()).isEmpty(); assertThat(readIncrementally(block0, block1)).isEmpty();
addRecord(block, 0, ChunkType.LAST, MAX_RECORD, (byte) 3); byte[] block2 = new byte[LevelDbLogReader.BLOCK_SIZE];
reader.readFrom(new ByteArrayInputStream(block)); addRecord(block2, 0, ChunkType.LAST, MAX_RECORD, (byte) 3);
List<byte[]> records = reader.getRecords(); List<byte[]> records = readIncrementally(block0, block1, block2);
assertThat(records).hasSize(1); assertThat(records).hasSize(1);
byte[] record = records.get(0); byte[] record = records.get(0);
@ -95,11 +88,24 @@ public final class LevelDbLogReaderTest {
public void readFromMultiBlockStream() throws IOException { public void readFromMultiBlockStream() throws IOException {
TestBlock block0 = makeBlockOfRepeatingBytes(0); TestBlock block0 = makeBlockOfRepeatingBytes(0);
TestBlock block1 = makeBlockOfRepeatingBytes(138); TestBlock block1 = makeBlockOfRepeatingBytes(138);
ByteArrayInputStream source = new ByteArrayInputStream(Bytes.concat(block0.data, block1.data)); assertThat(readIncrementally(block0.data, block1.data))
.hasSize(block0.recordCount + block1.recordCount);
}
LevelDbLogReader reader = new LevelDbLogReader(); @Test
reader.readFrom(source); void read_noData() {
assertThat(reader.getRecords()).hasSize(block0.recordCount + block1.recordCount); assertThat(readIncrementally(new byte[0])).isEmpty();
}
@Test
void read_failBadFirstBlock() {
assertThrows(IllegalStateException.class, () -> readIncrementally(new byte[1]));
}
@Test
void read_failBadTrailingBlock() {
TestBlock block = makeBlockOfRepeatingBytes(0);
assertThrows(IllegalStateException.class, () -> readIncrementally(block.data, new byte[2]));
} }
@Test @Test
@ -112,6 +118,13 @@ public final class LevelDbLogReaderTest {
assertThat(ChunkType.fromCode(ChunkType.LAST.getCode())).isEqualTo(ChunkType.LAST); assertThat(ChunkType.fromCode(ChunkType.LAST.getCode())).isEqualTo(ChunkType.LAST);
} }
@SafeVarargs
private static ImmutableList<byte[]> readIncrementally(byte[]... blocks) {
LevelDbLogReader recordReader =
LevelDbLogReader.from(new ByteArrayInputStream(Bytes.concat(blocks)));
return ImmutableList.copyOf(recordReader);
}
/** Aggregates the bytes of a test block with the record count. */ /** Aggregates the bytes of a test block with the record count. */
private static final class TestBlock { private static final class TestBlock {
final byte[] data; final byte[] data;

View file

@ -76,7 +76,7 @@ public class RecordAccumulatorTest {
builder.build(); builder.build();
ImmutableSet<ComparableEntity> entities = ImmutableSet<ComparableEntity> entities =
new RecordAccumulator().readDirectory(subdir, any -> true).getComparableEntitySet(); RecordAccumulator.readDirectory(subdir, any -> true).getComparableEntitySet();
assertThat(entities).containsExactly(e1, e2, e3); assertThat(entities).containsExactly(e1, e2, e3);
} }
} }