* WIP on upload scan/processing

* WIP on user add/edit data to uploads
* Add write access (upload) to area ACS
* Add upload collision handling
* Add upload stats
This commit is contained in:
Bryan Ashby 2017-01-11 22:51:00 -07:00
parent 4c1c05e4da
commit e265e3cc97
11 changed files with 479 additions and 133 deletions

View file

@ -27,7 +27,7 @@ exports.getDefaultFileAreaTag = getDefaultFileAreaTag;
exports.getFileAreaByTag = getFileAreaByTag;
exports.getFileEntryPath = getFileEntryPath;
exports.changeFileAreaWithOptions = changeFileAreaWithOptions;
//exports.addOrUpdateFileEntry = addOrUpdateFileEntry;
exports.scanFile = scanFile;
exports.scanFileAreaForChanges = scanFileAreaForChanges;
const WellKnownAreaTags = exports.WellKnownAreaTags = {
@ -43,16 +43,18 @@ function getAvailableFileAreas(client, options) {
options = options || { };
// perform ACS check per conf & omit internal if desired
return _.omit(Config.fileBase.areas, (area, areaTag) => {
if(!options.includeSystemInternal && isInternalArea(areaTag)) {
const allAreas = _.map(Config.fileBase.areas, (areaInfo, areaTag) => Object.assign(areaInfo, { areaTag : areaTag } ));
return _.omit(allAreas, areaInfo => {
if(!options.includeSystemInternal && isInternalArea(areaInfo.areaTag)) {
return true;
}
if(options.writeAcs && !client.acs.hasFileAreaWrite(area)) {
if(options.writeAcs && !client.acs.hasFileAreaWrite(areaInfo)) {
return true; // omit
}
return !client.acs.hasFileAreaRead(area);
return !client.acs.hasFileAreaRead(areaInfo);
});
}
@ -326,42 +328,16 @@ function populateFileEntryWithArchive(fileEntry, filePath, archiveType, cb) {
);
}
function populateFileEntry(fileEntry, filePath, archiveType, cb) {
function populateFileEntryNonArchive(fileEntry, filePath, archiveType, cb) {
// :TODO: implement me!
return cb(null);
}
function addNewFileEntry(fileEntry, filePath, cb) {
const archiveUtil = ArchiveUtil.getInstance();
// :TODO: Use detectTypeWithBuf() once avail - we *just* read some file data
async.series(
[
function populateInfo(callback) {
archiveUtil.detectType(filePath, (err, archiveType) => {
if(archiveType) {
// save this off
fileEntry.meta.archive_type = archiveType;
populateFileEntryWithArchive(fileEntry, filePath, archiveType, err => {
if(err) {
populateFileEntry(fileEntry, filePath, err => {
// :TODO: log err
return callback(null); // ignore err
});
} else {
return callback(null);
}
});
} else {
populateFileEntry(fileEntry, filePath, err => {
// :TODO: log err
return callback(null); // ignore err
});
}
});
},
function addNewDbRecord(callback) {
return fileEntry.persist(callback);
}
@ -376,6 +352,102 @@ function updateFileEntry(fileEntry, filePath, cb) {
}
function scanFile(filePath, options, cb) {
if(_.isFunction(options) && !cb) {
cb = options;
options = {};
}
const fileEntry = new FileEntry({
areaTag : options.areaTag,
meta : options.meta,
hashTags : options.hashTags, // Set() or Array
fileName : paths.basename(filePath),
storageTag : options.storageTag,
});
async.waterfall(
[
function processPhysicalFileGeneric(callback) {
let byteSize = 0;
const sha1 = crypto.createHash('sha1');
const sha256 = crypto.createHash('sha256');
const md5 = crypto.createHash('md5');
const crc32 = new CRC32();
const stream = fs.createReadStream(filePath);
stream.on('data', data => {
byteSize += data.length;
sha1.update(data);
sha256.update(data);
md5.update(data);
crc32.update(data);
});
stream.on('end', () => {
fileEntry.meta.byte_size = byteSize;
// sha-1 is in basic file entry
fileEntry.fileSha1 = sha1.digest('hex');
// others are meta
fileEntry.meta.file_sha256 = sha256.digest('hex');
fileEntry.meta.file_md5 = md5.digest('hex');
fileEntry.meta.file_crc32 = crc32.finalize().toString(16);
return callback(null);
});
stream.on('error', err => {
return callback(err);
});
},
function processPhysicalFileByType(callback) {
const archiveUtil = ArchiveUtil.getInstance();
archiveUtil.detectType(filePath, (err, archiveType) => {
if(archiveType) {
// save this off
fileEntry.meta.archive_type = archiveType;
populateFileEntryWithArchive(fileEntry, filePath, archiveType, err => {
if(err) {
populateFileEntryNonArchive(fileEntry, filePath, err => {
// :TODO: log err
return callback(null); // ignore err
});
} else {
return callback(null);
}
});
} else {
populateFileEntryNonArchive(fileEntry, filePath, err => {
// :TODO: log err
return callback(null); // ignore err
});
}
});
},
function fetchExistingEntry(callback) {
getExistingFileEntriesBySha1(fileEntry.fileSha1, (err, existingEntries) => {
return callback(err, existingEntries);
});
}
],
(err, existingEntries) => {
if(err) {
return cb(err);
}
return cb(null, fileEntry, existingEntries);
}
);
}
/*
function addOrUpdateFileEntry(areaInfo, storageLocation, fileName, options, cb) {
const fileEntry = new FileEntry({
@ -444,6 +516,7 @@ function addOrUpdateFileEntry(areaInfo, storageLocation, fileName, options, cb)
}
);
}
*/
function scanFileAreaForChanges(areaInfo, cb) {
const storageLocations = getAreaStorageLocations(areaInfo);
@ -472,9 +545,28 @@ function scanFileAreaForChanges(areaInfo, cb) {
return nextFile(null);
}
addOrUpdateFileEntry(areaInfo, storageLoc, fileName, { }, err => {
return nextFile(err);
});
scanFile(
fullPath,
{
areaTag : areaInfo.areaTag,
storageTag : storageLoc.storageTag
},
(err, fileEntry, existingEntries) => {
if(err) {
// :TODO: Log me!!!
return nextFile(null); // try next anyway
}
if(existingEntries.length > 0) {
// :TODO: Handle duplidates -- what to do here???
} else {
addNewFileEntry(fileEntry, fullPath, err => {
// pass along error; we failed to insert a record in our DB or something else bad
return nextFile(err);
});
}
}
);
});
}, err => {
return callback(err);
@ -495,49 +587,3 @@ function scanFileAreaForChanges(areaInfo, cb) {
return cb(err);
});
}
/*
function scanFileAreaForChanges2(areaInfo, cb) {
const areaPhysDir = getAreaStorageDirectory(areaInfo);
async.series(
[
function scanPhysFiles(callback) {
fs.readdir(areaPhysDir, (err, files) => {
if(err) {
return callback(err);
}
async.eachSeries(files, (fileName, next) => {
const fullPath = paths.join(areaPhysDir, fileName);
fs.stat(fullPath, (err, stats) => {
if(err) {
// :TODO: Log me!
return next(null); // always try next file
}
if(!stats.isFile()) {
return next(null);
}
addOrUpdateFileEntry(areaInfo, fileName, { areaTag : areaInfo.areaTag }, err => {
return next(err);
});
});
}, err => {
return callback(err);
});
});
},
function scanDbEntries(callback) {
// :TODO: Look @ db entries for area that were *not* processed above
return callback(null);
}
],
err => {
return cb(err);
}
);
}
*/