Improved read-ahead mechanism

This commit is contained in:
Tal Aloni 2017-05-18 22:47:10 +03:00
parent ae8a3d77d3
commit 8c0ff8fdad

View file

@ -1,4 +1,4 @@
/* Copyright (C) 2016 Tal Aloni <tal.aloni.il@gmail.com>. All rights reserved. /* Copyright (C) 2016-2017 Tal Aloni <tal.aloni.il@gmail.com>. All rights reserved.
* *
* You can redistribute this program and/or modify it under the terms of * You can redistribute this program and/or modify it under the terms of
* the GNU Lesser Public License as published by the Free Software Foundation, * the GNU Lesser Public License as published by the Free Software Foundation,
@ -14,81 +14,114 @@ namespace Utilities
{ {
public class PrefetchedStream : Stream public class PrefetchedStream : Stream
{ {
public const int CacheSize = 1048576; // 1 MB public const int CacheSize = 524288; // 512 KB
public const int ReadAheadThershold = 65536; // 64 KB
private long m_cacheOffset; private long m_cacheOffset;
private byte[] m_cache = new byte[0]; private byte[] m_cache = new byte[0];
private Stream m_stream; private Stream m_stream;
private object m_syncLock = new object();
public PrefetchedStream(Stream stream) public PrefetchedStream(Stream stream)
{ {
m_stream = stream; m_stream = stream;
if (m_stream.CanRead) if (m_stream.CanRead)
{
ScheduleReadAhead();
}
}
private void ScheduleReadAhead()
{ {
new Thread(delegate() new Thread(delegate()
{ {
lock (m_syncLock) ReadAhead();
}).Start();
}
private void ReadAhead()
{ {
m_cacheOffset = 0; lock (m_stream)
{
long position = this.Position;
bool isInCache = (position >= m_cacheOffset) && (position < m_cacheOffset + m_cache.Length);
int bytesAlreadyRead;
if (isInCache)
{
int offsetInCache = (int)(position - m_cacheOffset);
bytesAlreadyRead = m_cache.Length - offsetInCache;
byte[] oldCache = m_cache;
m_cache = new byte[CacheSize]; m_cache = new byte[CacheSize];
int bytesRead = m_stream.Read(m_cache, 0, CacheSize); Array.Copy(oldCache, offsetInCache, m_cache, 0, bytesAlreadyRead);
System.Diagnostics.Debug.Print("[{0}] {1} bytes have been prefetched.", DateTime.Now.ToString("HH:mm:ss:ffff"), bytesRead); this.Position = position + bytesAlreadyRead;
this.Position = 0; }
if (bytesRead < CacheSize) else
{
bytesAlreadyRead = 0;
m_cache = new byte[CacheSize];
}
m_cacheOffset = position;
int bytesRead = m_stream.Read(m_cache, bytesAlreadyRead, CacheSize - bytesAlreadyRead);
System.Diagnostics.Debug.Print("[{0}] {1} bytes have been read ahead from offset {2}.", DateTime.Now.ToString("HH:mm:ss:ffff"), bytesRead, position);
if (bytesAlreadyRead + bytesRead < CacheSize)
{ {
// EOF, we must trim the response data array // EOF, we must trim the response data array
m_cache = ByteReader.ReadBytes(m_cache, 0, bytesRead); m_cache = ByteReader.ReadBytes(m_cache, 0, bytesAlreadyRead + bytesRead);
} }
} this.Position = position;
}).Start();
} }
} }
public override int Read(byte[] buffer, int offset, int count) public override int Read(byte[] buffer, int offset, int count)
{ {
long position; int bytesCopied;
lock (m_syncLock) lock (m_stream)
{ {
position = this.Position; long position = this.Position;
bool isInCache = (position >= m_cacheOffset) && (position + count <= m_cacheOffset + m_cache.Length); bool isInCache = (position >= m_cacheOffset) && (position < m_cacheOffset + m_cache.Length);
if (!isInCache) if (isInCache)
{ {
m_cacheOffset = position;
int cacheSize = Math.Max(CacheSize, count);
m_cache = new byte[cacheSize];
int bytesRead = m_stream.Read(m_cache, 0, cacheSize);
if (bytesRead < cacheSize)
{
// EOF, we must trim the response data array
m_cache = ByteReader.ReadBytes(m_cache, 0, bytesRead);
}
}
}
int offsetInCache = (int)(position - m_cacheOffset); int offsetInCache = (int)(position - m_cacheOffset);
int bytesRemained = m_cache.Length - offsetInCache; int bytesAvailableInCache = m_cache.Length - offsetInCache;
int dataLength = Math.Min(count, bytesRemained); bytesCopied = Math.Min(count, bytesAvailableInCache);
Array.Copy(m_cache, offsetInCache, buffer, offset, bytesCopied);
this.Position = position + bytesCopied;
Array.Copy(m_cache, offsetInCache, buffer, offset, dataLength); if (bytesCopied < count)
lock (m_syncLock)
{ {
this.Position = position + dataLength; int bytesMissing = count - bytesCopied;
int bytesRead = m_stream.Read(buffer, offset + bytesCopied, bytesMissing);
} }
return dataLength;
if (offsetInCache + ReadAheadThershold >= m_cache.Length)
{
ScheduleReadAhead();
}
}
else
{
bytesCopied = m_stream.Read(buffer, 0, count);
ScheduleReadAhead();
}
}
return bytesCopied;
} }
public override void Write(byte[] buffer, int offset, int count) public override void Write(byte[] buffer, int offset, int count)
{
lock (m_stream)
{ {
m_cache = new byte[0]; m_cache = new byte[0];
m_stream.Write(buffer, offset, count); m_stream.Write(buffer, offset, count);
} }
}
public override void Close() public override void Close()
{
lock (m_stream)
{ {
m_stream.Close(); m_stream.Close();
}
base.Close(); base.Close();
} }
@ -119,36 +152,54 @@ namespace Utilities
public override long Length public override long Length
{ {
get get
{
lock (m_stream)
{ {
return m_stream.Length; return m_stream.Length;
} }
} }
}
public override long Position public override long Position
{ {
get get
{
lock (m_stream)
{ {
return m_stream.Position; return m_stream.Position;
} }
}
set set
{
lock (m_stream)
{ {
m_stream.Position = value; m_stream.Position = value;
} }
} }
}
public override void Flush() public override void Flush()
{
lock (m_stream)
{ {
m_stream.Flush(); m_stream.Flush();
} }
}
public override long Seek(long offset, SeekOrigin origin) public override long Seek(long offset, SeekOrigin origin)
{
lock (m_stream)
{ {
return m_stream.Seek(offset, origin); return m_stream.Seek(offset, origin);
} }
}
public override void SetLength(long value) public override void SetLength(long value)
{
lock (m_stream)
{ {
m_stream.SetLength(value); m_stream.SetLength(value);
} }
} }
} }
}