* Additional helpers to MenuModule

* Gzip signature (.gz)
* Switch to sha-256 vs sha1 for internal file hashes
* Nearly complete callback / scan update support for scanFile()
* Fix data input issue after performing upload
* Support 'sz' recv (uploads)
This commit is contained in:
Bryan Ashby 2017-01-21 22:09:29 -07:00
parent fb176d3ab3
commit 8d51c7d47c
11 changed files with 349 additions and 195 deletions

View file

@ -160,14 +160,14 @@ function getFileEntryPath(fileEntry) {
}
}
function getExistingFileEntriesBySha1(sha1, cb) {
function getExistingFileEntriesBySha256(sha256, cb) {
const entries = [];
FileDb.each(
`SELECT file_id, area_tag
FROM file
WHERE file_sha1=?;`,
[ sha1 ],
WHERE file_sha256=?;`,
[ sha256 ],
(err, fileRow) => {
if(fileRow) {
entries.push({
@ -237,14 +237,38 @@ function attemptSetEstimatedReleaseDate(fileEntry) {
}
}
function populateFileEntryWithArchive(fileEntry, filePath, archiveType, cb) {
const archiveUtil = ArchiveUtil.getInstance();
function populateFileEntryWithArchive(fileEntry, filePath, stepInfo, iterator, cb) {
const archiveUtil = ArchiveUtil.getInstance();
const archiveType = fileEntry.meta.archive_type; // we set this previous to populateFileEntryWithArchive()
async.waterfall(
[
function getArchiveFileList(callback) {
archiveUtil.listEntries(filePath, archiveType, (err, entries) => {
return callback(null, entries || []); // ignore any errors here
function getArchiveFileList(callback) {
stepInfo.step = 'archive_list_start';
iterator(err => {
if(err) {
return callback(err);
}
archiveUtil.listEntries(filePath, archiveType, (err, entries) => {
if(err) {
stepInfo.step = 'archive_list_failed';
} else {
stepInfo.step = 'archive_list_finish';
stepInfo.archiveEntries = entries || [];
}
iterator(iterErr => {
return callback( iterErr, entries || [] ); // ignore original |err| here
});
});
});
},
function processDescFilesStart(entries, callback) {
stepInfo.step = 'desc_files_start';
iterator(err => {
return callback(err, entries);
});
},
function extractDescFiles(entries, callback) {
@ -320,7 +344,11 @@ function populateFileEntryWithArchive(fileEntry, filePath, archiveType, cb) {
function attemptReleaseYearEstimation(callback) {
attemptSetEstimatedReleaseDate(fileEntry);
return callback(null);
}
},
function processDescFilesFinish(callback) {
stepInfo.step = 'desc_files_finish';
return iterator(callback);
},
],
err => {
return cb(err);
@ -328,7 +356,7 @@ function populateFileEntryWithArchive(fileEntry, filePath, archiveType, cb) {
);
}
function populateFileEntryNonArchive(fileEntry, filePath, archiveType, cb) {
function populateFileEntryNonArchive(fileEntry, filePath, stepInfo, iterator, cb) {
// :TODO: implement me!
return cb(null);
}
@ -352,11 +380,17 @@ function updateFileEntry(fileEntry, filePath, cb) {
}
function scanFile(filePath, options, cb) {
if(_.isFunction(options) && !cb) {
cb = options;
options = {};
const HASH_NAMES = [ 'sha1', 'sha256', 'md5', 'crc32' ];
function scanFile(filePath, options, iterator, cb) {
if(3 === arguments.length && _.isFunction(iterator)) {
cb = iterator;
iterator = null;
} else if(2 === arguments.length && _.isFunction(options)) {
cb = options;
iterator = null;
options = {};
}
const fileEntry = new FileEntry({
@ -367,42 +401,96 @@ function scanFile(filePath, options, cb) {
storageTag : options.storageTag,
});
const stepInfo = {
filePath : filePath,
fileName : paths.basename(filePath),
};
function callIter(next) {
if(iterator) {
return iterator(stepInfo, next);
} else {
return next(null);
}
}
function readErrorCallIter(origError, next) {
stepInfo.step = 'read_error';
stepInfo.error = origError.message;
callIter( () => {
return next(origError);
});
}
async.waterfall(
[
function startScan(callback) {
fs.stat(filePath, (err, stats) => {
if(err) {
return readErrorCallIter(err, callback);
}
stepInfo.step = 'start';
stepInfo.byteSize = fileEntry.meta.byte_size = stats.size;
return callIter(callback);
});
},
function processPhysicalFileGeneric(callback) {
let byteSize = 0;
const sha1 = crypto.createHash('sha1');
const sha256 = crypto.createHash('sha256');
const md5 = crypto.createHash('md5');
const crc32 = new CRC32();
stepInfo.bytesProcessed = 0;
const hashes = {
sha1 : crypto.createHash('sha1'),
sha256 : crypto.createHash('sha256'),
md5 : crypto.createHash('md5'),
crc32 : new CRC32(),
};
const stream = fs.createReadStream(filePath);
stream.on('data', data => {
byteSize += data.length;
stream.pause(); // until iterator compeltes
sha1.update(data);
sha256.update(data);
md5.update(data);
crc32.update(data);
stepInfo.bytesProcessed += data.length;
stepInfo.step = 'hash_update';
callIter(err => {
if(err) {
stream.destroy(); // cancel read
return callback(err);
}
async.each( HASH_NAMES, (hashName, nextHash) => {
hashes[hashName].update(data);
return nextHash(null);
}, () => {
return stream.resume();
});
});
});
stream.on('end', () => {
fileEntry.meta.byte_size = byteSize;
fileEntry.meta.byte_size = stepInfo.bytesProcessed;
// sha-1 is in basic file entry
fileEntry.fileSha1 = sha1.digest('hex');
async.each(HASH_NAMES, (hashName, nextHash) => {
if('sha256' === hashName) {
stepInfo.sha256 = fileEntry.fileSha256 = hashes.sha256.digest('hex');
} else if('sha1' === hashName || 'md5' === hashName) {
stepInfo[hashName] = fileEntry.meta[`file_${hashName}`] = hashes[hashName].digest('hex');
} else if('crc32' === hashName) {
stepInfo.crc32 = fileEntry.meta.crc32 = hashes.crc32.finalize().toString(16);
}
// others are meta
fileEntry.meta.file_sha256 = sha256.digest('hex');
fileEntry.meta.file_md5 = md5.digest('hex');
fileEntry.meta.file_crc32 = crc32.finalize().toString(16);
return callback(null);
return nextHash(null);
}, () => {
stepInfo.step = 'hash_finish';
return callIter(callback);
});
});
stream.on('error', err => {
return callback(err);
return readErrorCallIter(err, callback);
});
},
function processPhysicalFileByType(callback) {
@ -413,9 +501,9 @@ function scanFile(filePath, options, cb) {
// save this off
fileEntry.meta.archive_type = archiveType;
populateFileEntryWithArchive(fileEntry, filePath, archiveType, err => {
populateFileEntryWithArchive(fileEntry, filePath, stepInfo, callIter, err => {
if(err) {
populateFileEntryNonArchive(fileEntry, filePath, err => {
populateFileEntryNonArchive(fileEntry, filePath, stepInfo, callIter, err => {
// :TODO: log err
return callback(null); // ignore err
});
@ -424,7 +512,7 @@ function scanFile(filePath, options, cb) {
}
});
} else {
populateFileEntryNonArchive(fileEntry, filePath, err => {
populateFileEntryNonArchive(fileEntry, filePath, stepInfo, callIter, err => {
// :TODO: log err
return callback(null); // ignore err
});
@ -432,92 +520,21 @@ function scanFile(filePath, options, cb) {
});
},
function fetchExistingEntry(callback) {
getExistingFileEntriesBySha1(fileEntry.fileSha1, (err, existingEntries) => {
return callback(err, existingEntries);
getExistingFileEntriesBySha256(fileEntry.fileSha256, (err, dupeEntries) => {
return callback(err, dupeEntries);
});
}
],
(err, existingEntries) => {
(err, dupeEntries) => {
if(err) {
return cb(err);
}
return cb(null, fileEntry, existingEntries);
return cb(null, fileEntry, dupeEntries);
}
);
}
/*
function addOrUpdateFileEntry(areaInfo, storageLocation, fileName, options, cb) {
const fileEntry = new FileEntry({
areaTag : areaInfo.areaTag,
meta : options.meta,
hashTags : options.hashTags, // Set() or Array
fileName : fileName,
storageTag : storageLocation.storageTag,
});
const filePath = paths.join(storageLocation.dir, fileName);
async.waterfall(
[
function processPhysicalFile(callback) {
let byteSize = 0;
const sha1 = crypto.createHash('sha1');
const sha256 = crypto.createHash('sha256');
const md5 = crypto.createHash('md5');
const crc32 = new CRC32();
const stream = fs.createReadStream(filePath);
stream.on('data', data => {
byteSize += data.length;
sha1.update(data);
sha256.update(data);
md5.update(data);
crc32.update(data);
});
stream.on('end', () => {
fileEntry.meta.byte_size = byteSize;
// sha-1 is in basic file entry
fileEntry.fileSha1 = sha1.digest('hex');
// others are meta
fileEntry.meta.file_sha256 = sha256.digest('hex');
fileEntry.meta.file_md5 = md5.digest('hex');
fileEntry.meta.file_crc32 = crc32.finalize().toString(16);
return callback(null);
});
stream.on('error', err => {
return callback(err);
});
},
function fetchExistingEntry(callback) {
getExistingFileEntriesBySha1(fileEntry.fileSha1, (err, existingEntries) => {
return callback(err, existingEntries);
});
},
function addOrUpdate(existingEntries, callback) {
if(existingEntries.length > 0) {
} else {
return addNewFileEntry(fileEntry, filePath, callback);
}
},
],
err => {
return cb(err);
}
);
}
*/
function scanFileAreaForChanges(areaInfo, cb) {
const storageLocations = getAreaStorageLocations(areaInfo);
@ -551,13 +568,13 @@ function scanFileAreaForChanges(areaInfo, cb) {
areaTag : areaInfo.areaTag,
storageTag : storageLoc.storageTag
},
(err, fileEntry, existingEntries) => {
(err, fileEntry, dupeEntries) => {
if(err) {
// :TODO: Log me!!!
return nextFile(null); // try next anyway
}
if(existingEntries.length > 0) {
if(dupeEntries.length > 0) {
// :TODO: Handle duplidates -- what to do here???
} else {
addNewFileEntry(fileEntry, fullPath, err => {