From 8c0ff8fdadb7819b99b17b72be14d256782d2a09 Mon Sep 17 00:00:00 2001 From: Tal Aloni Date: Thu, 18 May 2017 22:47:10 +0300 Subject: [PATCH] Improved read-ahead mechanism --- SMBLibrary/Utilities/PrefetchedStream.cs | 153 +++++++++++++++-------- 1 file changed, 102 insertions(+), 51 deletions(-) diff --git a/SMBLibrary/Utilities/PrefetchedStream.cs b/SMBLibrary/Utilities/PrefetchedStream.cs index f5b842e..f03e381 100644 --- a/SMBLibrary/Utilities/PrefetchedStream.cs +++ b/SMBLibrary/Utilities/PrefetchedStream.cs @@ -1,4 +1,4 @@ -/* Copyright (C) 2016 Tal Aloni . All rights reserved. +/* Copyright (C) 2016-2017 Tal Aloni . All rights reserved. * * You can redistribute this program and/or modify it under the terms of * the GNU Lesser Public License as published by the Free Software Foundation, @@ -14,81 +14,114 @@ namespace Utilities { public class PrefetchedStream : Stream { - public const int CacheSize = 1048576; // 1 MB + public const int CacheSize = 524288; // 512 KB + public const int ReadAheadThershold = 65536; // 64 KB private long m_cacheOffset; private byte[] m_cache = new byte[0]; private Stream m_stream; - private object m_syncLock = new object(); public PrefetchedStream(Stream stream) { m_stream = stream; if (m_stream.CanRead) { - new Thread(delegate() + ScheduleReadAhead(); + } + } + + private void ScheduleReadAhead() + { + new Thread(delegate() + { + ReadAhead(); + }).Start(); + } + + private void ReadAhead() + { + lock (m_stream) + { + long position = this.Position; + bool isInCache = (position >= m_cacheOffset) && (position < m_cacheOffset + m_cache.Length); + int bytesAlreadyRead; + if (isInCache) { - lock (m_syncLock) - { - m_cacheOffset = 0; - m_cache = new byte[CacheSize]; - int bytesRead = m_stream.Read(m_cache, 0, CacheSize); - System.Diagnostics.Debug.Print("[{0}] {1} bytes have been prefetched.", DateTime.Now.ToString("HH:mm:ss:ffff"), bytesRead); - this.Position = 0; - if (bytesRead < CacheSize) - { - // EOF, we must trim the response data array - m_cache = ByteReader.ReadBytes(m_cache, 0, bytesRead); - } - } - }).Start(); + int offsetInCache = (int)(position - m_cacheOffset); + bytesAlreadyRead = m_cache.Length - offsetInCache; + byte[] oldCache = m_cache; + m_cache = new byte[CacheSize]; + Array.Copy(oldCache, offsetInCache, m_cache, 0, bytesAlreadyRead); + this.Position = position + bytesAlreadyRead; + } + else + { + bytesAlreadyRead = 0; + m_cache = new byte[CacheSize]; + } + m_cacheOffset = position; + int bytesRead = m_stream.Read(m_cache, bytesAlreadyRead, CacheSize - bytesAlreadyRead); + System.Diagnostics.Debug.Print("[{0}] {1} bytes have been read ahead from offset {2}.", DateTime.Now.ToString("HH:mm:ss:ffff"), bytesRead, position); + if (bytesAlreadyRead + bytesRead < CacheSize) + { + // EOF, we must trim the response data array + m_cache = ByteReader.ReadBytes(m_cache, 0, bytesAlreadyRead + bytesRead); + } + this.Position = position; } } public override int Read(byte[] buffer, int offset, int count) { - long position; - lock (m_syncLock) + int bytesCopied; + lock (m_stream) { - position = this.Position; - bool isInCache = (position >= m_cacheOffset) && (position + count <= m_cacheOffset + m_cache.Length); - if (!isInCache) + long position = this.Position; + bool isInCache = (position >= m_cacheOffset) && (position < m_cacheOffset + m_cache.Length); + if (isInCache) { - m_cacheOffset = position; - int cacheSize = Math.Max(CacheSize, count); - m_cache = new byte[cacheSize]; - int bytesRead = m_stream.Read(m_cache, 0, cacheSize); - - if (bytesRead < cacheSize) + int offsetInCache = (int)(position - m_cacheOffset); + int bytesAvailableInCache = m_cache.Length - offsetInCache; + bytesCopied = Math.Min(count, bytesAvailableInCache); + Array.Copy(m_cache, offsetInCache, buffer, offset, bytesCopied); + this.Position = position + bytesCopied; + + if (bytesCopied < count) { - // EOF, we must trim the response data array - m_cache = ByteReader.ReadBytes(m_cache, 0, bytesRead); + int bytesMissing = count - bytesCopied; + int bytesRead = m_stream.Read(buffer, offset + bytesCopied, bytesMissing); + } + + if (offsetInCache + ReadAheadThershold >= m_cache.Length) + { + ScheduleReadAhead(); } } + else + { + bytesCopied = m_stream.Read(buffer, 0, count); + ScheduleReadAhead(); + } } - - int offsetInCache = (int)(position - m_cacheOffset); - int bytesRemained = m_cache.Length - offsetInCache; - int dataLength = Math.Min(count, bytesRemained); - - Array.Copy(m_cache, offsetInCache, buffer, offset, dataLength); - lock (m_syncLock) - { - this.Position = position + dataLength; - } - return dataLength; + return bytesCopied; } public override void Write(byte[] buffer, int offset, int count) { - m_cache = new byte[0]; - m_stream.Write(buffer, offset, count); + lock (m_stream) + { + m_cache = new byte[0]; + m_stream.Write(buffer, offset, count); + } } public override void Close() { - m_stream.Close(); + lock (m_stream) + { + m_stream.Close(); + } base.Close(); } @@ -120,7 +153,10 @@ namespace Utilities { get { - return m_stream.Length; + lock (m_stream) + { + return m_stream.Length; + } } } @@ -128,27 +164,42 @@ namespace Utilities { get { - return m_stream.Position; + lock (m_stream) + { + return m_stream.Position; + } } set { - m_stream.Position = value; + lock (m_stream) + { + m_stream.Position = value; + } } } public override void Flush() { - m_stream.Flush(); + lock (m_stream) + { + m_stream.Flush(); + } } public override long Seek(long offset, SeekOrigin origin) { - return m_stream.Seek(offset, origin); + lock (m_stream) + { + return m_stream.Seek(offset, origin); + } } public override void SetLength(long value) { - m_stream.SetLength(value); + lock (m_stream) + { + m_stream.SetLength(value); + } } } }