diff --git a/core/archive_util.js b/core/archive_util.js index f4a3a168..914e5e47 100644 --- a/core/archive_util.js +++ b/core/archive_util.js @@ -51,13 +51,17 @@ module.exports = class ArchiveUtil { } } - haveArchiver(archType) { + getArchiver(archType) { if(!archType) { - return false; + return; } archType = archType.toLowerCase(); - return archType in this.archivers; + return this.archivers[archType]; + } + + haveArchiver(archType) { + return this.getArchiver(archType) ? true : false; } detectType(path, cb) { @@ -92,15 +96,13 @@ module.exports = class ArchiveUtil { } compressTo(archType, archivePath, files, cb) { - archType = archType.toLowerCase(); - const archiver = this.archivers[archType]; + const archiver = this.getArchiver(archType); if(!archiver) { - cb(new Error('Unknown archive type: ' + archType)); - return; + return cb(new Error(`Unknown archive type: ${archType}`)); } - let args = _.clone(archiver.compressArgs); // don't much with orig + let args = _.clone(archiver.compressArgs); // don't muck with orig for(let i = 0; i < args.length; ++i) { args[i] = args[i].format({ archivePath : archivePath, @@ -108,18 +110,42 @@ module.exports = class ArchiveUtil { }); } - let comp = pty.spawn(archiver.compressCmd, args, { - cols : 80, - rows : 24, - // :TODO: cwd - }); + let comp = pty.spawn(archiver.compressCmd, args, this.getPtyOpts()); - comp.on('exit', exitCode => { - cb(exitCode ? new Error('Compression failed with exit code: ' + exitCode) : null); + comp.once('exit', exitCode => { + cb(exitCode ? new Error(`Compression failed with exit code: ${exitCode}`) : null); }); } extractTo(archivePath, extractPath, archType, cb) { - + const archiver = this.getArchiver(archType); + + if(!archiver) { + return cb(new Error(`Unknown archive type: ${archType}`)); + } + + let args = _.clone(archiver.decompressArgs); // don't muck with orig + for(let i = 0; i < args.length; ++i) { + args[i] = args[i].format({ + archivePath : archivePath, + extractPath : extractPath, + }); + } + + let comp = pty.spawn(archiver.decompressCmd, args, this.getPtyOpts()); + + comp.once('exit', exitCode => { + cb(exitCode ? new Error(`Decompression failed with exit code: ${exitCode}`) : null); + }); + } + + getPtyOpts() { + return { + // :TODO: cwd + name : 'enigma-archiver', + cols : 80, + rows : 24, + env : process.env, + }; } } diff --git a/core/config.js b/core/config.js index 33a55d09..7525f147 100644 --- a/core/config.js +++ b/core/config.js @@ -216,7 +216,7 @@ function getDefaultConfig() { compressCmd : "7z", compressArgs : [ "a", "-tzip", "{archivePath}", "{fileList}" ], decompressCmd : "7z", - decompressArgs : [ "e", "-o{extractDir}", "{archivePath}" ] + decompressArgs : [ "e", "-o{extractPath}", "{archivePath}" ] } }, @@ -245,9 +245,6 @@ function getDefaultConfig() { outbound : paths.join(__dirname, './../mail/ftn_out/'), inbound : paths.join(__dirname, './../mail/ftn_in/'), secInbound : paths.join(__dirname, './../mail/ftn_secin/'), - - // :TODO: use general temp path - system temp by default...or just always system temp? - temp : paths.join(__dirname, './../mail/ftn_temp'), }, // diff --git a/core/scanner_tossers/ftn_bso.js b/core/scanner_tossers/ftn_bso.js index b6dcbc28..41a87f93 100644 --- a/core/scanner_tossers/ftn_bso.js +++ b/core/scanner_tossers/ftn_bso.js @@ -19,6 +19,7 @@ let mkdirp = require('mkdirp'); let async = require('async'); let fs = require('fs'); let later = require('later'); +let temp = require('temp').track(); // track() cleans up temp dir/files for us exports.moduleInfo = { name : 'FTN BSO', @@ -69,7 +70,14 @@ function FTNMessageScanTossModule() { this.isDefaultDomainZone = function(networkName, address) { const defaultNetworkName = this.getDefaultNetworkName(); return(networkName === defaultNetworkName && address.zone === this.moduleConfig.defaultZone); - } + }; + + this.getNetworkNameByAddress = function(address) { + return _.findKey(Config.messageNetworks.ftn.networks, network => { + const networkAddress = Address.fromString(network.localAddress); + return !_.isUndefined(networkAddress) && address.isEqual(networkAddress); + }); + }; this.getOutgoingPacketDir = function(networkName, destAddress) { let dir = this.moduleConfig.paths.outbound; @@ -306,6 +314,10 @@ function FTNMessageScanTossModule() { }; this.parseScheduleString = function(schedStr) { + if(!schedStr) { + return; // nothing to parse! + } + let schedule = {}; const m = SCHEDULE_REGEXP.exec(schedStr); @@ -413,7 +425,7 @@ function FTNMessageScanTossModule() { packetHeader.password = exportOpts.nodeConfig.packetPassword || ''; // use current message ID for filename seed - const pktFileName = self.getOutgoingPacketFileName(exportOpts.exportDir, message.messageId); + const pktFileName = self.getOutgoingPacketFileName(exportOpts.tempDir, message.messageId); exportedFiles.push(pktFileName); ws = fs.createWriteStream(pktFileName); @@ -475,7 +487,7 @@ function FTNMessageScanTossModule() { packetHeader.password = exportOpts.nodeConfig.packetPassword || ''; // use current message ID for filename seed - const pktFileName = self.getOutgoingPacketFileName(exportOpts.exportDir, remainMessageId); + const pktFileName = self.getOutgoingPacketFileName(exportOpts.tempDir, remainMessageId); exportedFiles.push(pktFileName); ws = fs.createWriteStream(pktFileName); @@ -512,7 +524,6 @@ function FTNMessageScanTossModule() { network : Config.messageNetworks.ftn.networks[areaConfig.network], destAddress : Address.fromString(uplink), networkName : areaConfig.network, - exportDir : self.moduleConfig.paths.temp, }; if(_.isString(exportOpts.network.localAddress)) { @@ -524,7 +535,8 @@ function FTNMessageScanTossModule() { async.waterfall( [ function createTempDir(callback) { - mkdirp(exportOpts.exportDir, err => { + temp.mkdir('enigftnexport--', (err, tempDir) => { + exportOpts.tempDir = tempDir; callback(err); }); }, @@ -551,7 +563,7 @@ function FTNMessageScanTossModule() { } // adjust back to temp path - const tempBundlePath = paths.join(exportOpts.exportDir, paths.basename(bundlePath)); + const tempBundlePath = paths.join(exportOpts.tempDir, paths.basename(bundlePath)); self.archUtil.compressTo( exportOpts.nodeConfig.archiveType, @@ -579,71 +591,164 @@ function FTNMessageScanTossModule() { fs.rename(oldPath, newPath, nextFile); } }, callback); + }, + function cleanUpTempDir(callback) { + temp.cleanup((err, stats) => { + Log.trace( + Object.assign(stats, { tempDir : exportOpts.tempDir }), + 'Temporary directory cleaned up'); + }); } ], err => { - nextUplink(); + nextUplink(); } ); }, cb); // complete }; - this.importMessagesFromPacketFile = function(path, packetFileName, cb) { - cb(null); + this.importMessagesFromPacketFile = function(packetPath, cb) { + const packet = new ftnMailPacket.Packet(); + + // :TODO: packet.read() should have a way to cancel iteration... + let localNetworkName; + packet.read(packetPath, (entryType, entryData) => { + if('header' === entryType) { + // + // Discover if this packet is for one of our network(s) + // + localNetworkName = self.getNetworkNameByAddress(entryData.destAddress); + + } else if(localNetworkName && 'message' === entryType) { + const message = entryData; // so we ref something reasonable :) + const areaTag = message.meta.FtnProperty.ftn_area; + + // :TODO: we need to know if this message is a dupe - UUID will be the same if MSGID, but if not present... what to do? + // :TODO: lookup and set message.areaTag if match + // :TODO: check SEEN-BY for echo + // :TODO: Handle area vs Netmail - Via, etc. + // :TODO: Handle PATH + // :TODO: handle REPLY kludges... set local ID when possible + if(areaTag) { + // + // Find local area tag + // + } + } + }, err => { + cb(err); + }); }; + + this.importPacketFilesFromDirectory = function(importDir, cb) { + async.waterfall( + [ + function getPacketFiles(callback) { + fs.readdir(importDir, (err, files) => { + if(err) { + return callback(err); + } + callback(null, files.filter(f => '.pkt' === paths.extname(f))); + }); + }, + function importPacketFiles(packetFiles, callback) { + let rejects = []; + async.each(packetFiles, (packetFile, nextFile) => { + self.importMessagesFromPacketFile(paths.join(importDir, packetFile), err => { + // :TODO: check err -- log / track rejects, etc. + if(err) { + rejects.push(packetFile); + } + nextFile(); + }); + }, err => { + // :TODO: Handle err! we should try to keep going though... + callback(err, packetFiles, rejects); + }); + }, + function handleProcessedFiles(packetFiles, rejects, callback) { + async.each(packetFiles, (packetFile, nextFile) => { + const fullPath = paths.join(importDir, packetFile); + if(rejects.indexOf(packetFile) > -1) { + // :TODO: rename to .bad, perhaps move to a rejects dir + log + nextFile(); + } else { + //fs.unlink(fullPath, err => { + nextFile(); + //}); + } + }, err => { + callback(err); + }); + } + ], + err => { + cb(err); + } + ); }; this.importMessagesFromDirectory = function(importDir, cb) { async.waterfall( [ - function getPossibleFiles(callback) { - fs.readdir(importDir, (err, files) => { - callback(err, files); + // start with .pkt files + function importPacketFiles(callback) { + self.importPacketFilesFromDirectory(importDir, err => { + callback(err); }); }, - function identify(files, callback) { - async.map(files, (f, transform) => { - let entry = { file : f }; + function discoverBundles(callback) { + fs.readdir(importDir, (err, files) => { + files = files.filter(f => '.pkt' !== paths.extname(f)); - if('.pkt' === paths.extname(f)) { - entry.type = 'packet'; - transform(null, entry); - } else { - const fullPath = paths.join(importDir, f); + async.map(files, (file, transform) => { + const fullPath = paths.join(importDir, file); self.archUtil.detectType(fullPath, (err, archName) => { - entry.type = archName; - transform(null, entry); + transform(null, { path : fullPath, archName : archName } ); }); - } - }, (err, identifiedFiles) => { + }, (err, bundleFiles) => { + callback(err, bundleFiles); + }); + }); + }, + function createTempDir(bundleFiles, callback) { + temp.mkdir('enigftnimport-', (err, tempDir) => { + callback(err, bundleFiles, tempDir); + }); + }, + function importBundles(bundleFiles, tempDir, callback) { + async.each(bundleFiles, (bundleFile, nextFile) => { + if(_.isUndefined(bundleFile.archName)) { + // :TODO: log? + return nextFile(); // unknown archive type + } + + self.archUtil.extractTo( + bundleFile.path, + tempDir, + bundleFile.archName, + err => { + nextFile(); + } + ); + }, err => { if(err) { return callback(err); } - const fileGroups = _.partition(identifiedFiles, entry => 'packet' === entry.type); - callback(null, fileGroups[0], fileGroups[1]); + // + // All extracted - import .pkt's + // + self.importPacketFilesFromDirectory(tempDir, err => { + callback(err); + }); }); - }, - function importPacketFiles(packetFiles, bundleFiles, callback) { - async.each(packetFiles, (packetFile, nextFile) => { - self.importMessagesFromPacketFile(importDir, packetFile.file, err => { - // :TODO: check err! - nextFile(); - }); - }, err => { - // :TODO: Handle err! we should try to keep going though... - callback(null, bundleFiles); - }); - }, - function importBundles(bundleFiles, callback) { - // :TODO: for each bundle, extract to temp location -> process each packet } - ], + ], err => { cb(err); - } + } ); }; - } require('util').inherits(FTNMessageScanTossModule, MessageScanTossModule); diff --git a/package.json b/package.json index 794100a7..0d1389ff 100644 --- a/package.json +++ b/package.json @@ -16,10 +16,11 @@ "async": "^1.5.1", "binary": "0.3.x", "buffers": "0.1.x", - "bunyan": "1.5.x", + "bunyan": "^1.7.1", "gaze": "^0.5.2", "hjson": "1.7.x", "iconv-lite": "^0.4.13", + "later": "1.2.0", "lodash": "^3.10.1", "minimist": "1.2.x", "mkdirp": "0.5.x", @@ -29,7 +30,7 @@ "sqlite3": "^3.1.1", "ssh2": "^0.4.13", "string-format": "davidchambers/string-format#mini-language", - "later" : "1.2.0" + "temp": "^0.8.3" }, "engines": { "node": ">=0.12.2"