* Change FTN packet read() to use async iterator

* createMessageUuidAlternate(): Mmethod for FTN message v5 UUID generation when no MSGID to work with
* parseAbbreviatedNetNodeList() now works properly
* Add core/uuid_util.js for various UUID utilities such as v5 named UUID generation
* Fix message meta load/retrieval
* Add lookup for REPLY kludge -> MSGID -> local reply IDs
* Fix SEEN-BY additions @ export
* Don't override MSGIDs if they already exist
* Store MSGID @ export so it can be inspected later
* Add import functionality (working, but WIP!)
* Clean up bundles and packets after import
This commit is contained in:
Bryan Ashby 2016-03-08 22:30:04 -07:00
parent 6094bed07f
commit ad0296addf
7 changed files with 628 additions and 289 deletions

View file

@ -20,6 +20,7 @@ let async = require('async');
let fs = require('fs');
let later = require('later');
let temp = require('temp').track(); // track() cleans up temp dir/files for us
let assert = require('assert');
exports.moduleInfo = {
name : 'FTN BSO',
@ -72,13 +73,44 @@ function FTNMessageScanTossModule() {
return(networkName === defaultNetworkName && address.zone === this.moduleConfig.defaultZone);
};
this.getNetworkNameByAddress = function(address) {
this.getNetworkNameByAddress = function(remoteAddress) {
return _.findKey(Config.messageNetworks.ftn.networks, network => {
const networkAddress = Address.fromString(network.localAddress);
return !_.isUndefined(networkAddress) && address.isEqual(networkAddress);
const localAddress = Address.fromString(network.localAddress);
return !_.isUndefined(localAddress) && localAddress.isEqual(remoteAddress);
});
};
this.getNetworkNameByAddressPattern = function(remoteAddressPattern) {
return _.findKey(Config.messageNetworks.ftn.networks, network => {
const localAddress = Address.fromString(network.localAddress);
return !_.isUndefined(localAddress) && localAddress.isPatternMatch(remoteAddressPattern);
});
};
this.getLocalAreaTagByFtnAreaTag = function(ftnAreaTag) {
return _.findKey(Config.messageNetworks.ftn.areas, areaConf => {
return areaConf.tag === ftnAreaTag;
});
};
/*
this.getSeenByAddresses = function(messageSeenBy) {
if(!_.isArray(messageSeenBy)) {
messageSeenBy = [ messageSeenBy ];
}
let seenByAddrs = [];
messageSeenBy.forEach(sb => {
seenByAddrs = seenByAddrs.concat(ftnUtil.parseAbbreviatedNetNodeList(sb));
});
return seenByAddrs;
};
*/
this.messageHasValidMSGID = function(msg) {
return _.isString(msg.meta.FtnKludge.MSGID) && msg.meta.FtnKludge.MSGID.length > 0;
};
this.getOutgoingPacketDir = function(networkName, destAddress) {
let dir = this.moduleConfig.paths.outbound;
if(!this.isDefaultDomainZone(networkName, destAddress)) {
@ -164,41 +196,6 @@ function FTNMessageScanTossModule() {
});
};
this.exportMessage = function(message, options, cb) {
this.prepareMessage(message, options);
let packet = new ftnMailPacket.Packet();
let packetHeader = new ftnMailPacket.PacketHeader(
options.network.localAddress,
options.destAddress,
options.nodeConfig.packetType);
packetHeader.password = options.nodeConfig.packetPassword || '';
if(message.isPrivate()) {
// :TODO: this should actually be checking for isNetMail()!!
} else {
const outgoingDir = this.getOutgoingPacketDir(options.networkName, options.destAddress);
mkdirp(outgoingDir, err => {
if(err) {
return cb(err);
}
this.getOutgoingBundleFileName(outgoingDir, options.network.localAddress, options.destAddress, (err, path) => {
console.log(path);
});
packet.write(
this.getOutgoingPacketFileName(outgoingDir, message),
packetHeader,
[ message ],
{ encoding : options.encoding }
);
});
}
};
this.prepareMessage = function(message, options) {
//
// Set various FTN kludges/etc.
@ -241,7 +238,8 @@ function FTNMessageScanTossModule() {
// When exporting messages, we should create/update SEEN-BY
// with remote address(s) we are exporting to.
//
const seenByAdditions = [ options.network.localAddress ].concat(Config.messageNetworks.ftn.areas[message.areaTag].uplinks);
const seenByAdditions =
[ `${options.network.localAddress.net}/${options.network.localAddress.node}` ].concat(Config.messageNetworks.ftn.areas[message.areaTag].uplinks);
message.meta.FtnProperty.ftn_seen_by =
ftnUtil.getUpdatedSeenByEntries(message.meta.FtnProperty.ftn_seen_by, seenByAdditions);
@ -256,8 +254,14 @@ function FTNMessageScanTossModule() {
//
// Additional kludges
//
// Check for existence of MSGID as we may already have stored it from a previous
// export that failed to finish
//
message.meta.FtnKludge.MSGID = ftnUtil.getMessageIdentifier(message, options.network.localAddress);
if(!message.meta.FtnKludge.MSGID) {
message.meta.FtnKludge.MSGID = ftnUtil.getMessageIdentifier(message, options.network.localAddress);
}
message.meta.FtnKludge.TZUTC = ftnUtil.getUTCTimeZoneOffset();
if(!message.meta.FtnKludge.PID) {
@ -369,7 +373,7 @@ function FTNMessageScanTossModule() {
this.getNodeConfigKeyForUplink = function(uplink) {
// :TODO: sort by least # of '*' & take top?
const nodeKey = _.filter(Object.keys(this.moduleConfig.nodes), addr => {
return Address.fromString(addr).isMatch(uplink);
return Address.fromString(addr).isPatternMatch(uplink);
})[0];
return nodeKey;
@ -451,6 +455,19 @@ function FTNMessageScanTossModule() {
ws.write(msgBuf);
}
callback(null);
},
function updateStoredMeta(callback) {
//
// We want to store some meta as if we had imported
// this message for later reference
//
if(message.meta.FtnKludge.MSGID) {
message.persistMetaValue('FtnKludge', 'MSGID', message.meta.FtnKludge.MSGID, err => {
callback(err);
});
} else {
callback(null);
}
}
],
err => {
@ -607,37 +624,144 @@ function FTNMessageScanTossModule() {
}, cb); // complete
};
this.importMessagesFromPacketFile = function(packetPath, cb) {
const packet = new ftnMailPacket.Packet();
this.setReplyToMsgIdFtnReplyKludge = function(message, cb) {
//
// Given a FTN REPLY kludge, set |message.replyToMsgId|, if possible,
// by looking up an associated MSGID kludge meta.
//
// See also: http://ftsc.org/docs/fts-0009.001
//
if(!_.isString(message.meta.FtnKludge.REPLY)) {
// nothing to do
return cb();
}
// :TODO: packet.read() should have a way to cancel iteration...
let localNetworkName;
packet.read(packetPath, (entryType, entryData) => {
Message.getMessageIdsByMetaValue('FtnKludge', 'MSGID', message.meta.FtnKludge.REPLY, (err, msgIds) => {
if(!err) {
assert(1 === msgIds.length);
message.replyToMsgId = msgIds[0];
}
cb();
});
};
this.importNetMailToArea = function(localAreaTag, header, message, cb) {
async.series(
[
function validateDestinationAddress(callback) {
/*
const messageDestAddress = new Address({
node : message.meta.FtnProperty.ftn_dest_node,
net : message.meta.FtnProperty.ftn_dest_network,
});
*/
const localNetworkPattern = `${message.meta.FtnProperty.ftn_dest_network}/${message.meta.FtnProperty.ftn_dest_node}`;
const localNetworkName = self.getNetworkNameByAddressPattern(localNetworkPattern);
callback(_.isString(localNetworkName) ? null : new Error('Packet destination is not us'));
},
function basicSetup(callback) {
message.areaTag = localAreaTag;
//
// If duplicates are NOT allowed in the area (the default), we need to update
// the message UUID using data available to us. Duplicate UUIDs are internally
// not allowed in our local database.
//
if(!Config.messageNetworks.ftn.areas[localAreaTag].allowDupes) {
if(self.messageHasValidMSGID(message)) {
// Update UUID with our preferred generation method
message.uuid = ftnUtil.createMessageUuid(
message.meta.FtnKludge.MSGID,
message.meta.FtnProperty.ftn_area);
} else {
// Update UUID with alternate/backup generation method
message.uuid = ftnUtil.createMessageUuidAlternate(
message.meta.FtnProperty.ftn_area,
message.modTimestamp,
message.subject,
message.message);
}
}
callback(null);
},
function setReplyToMessageId(callback) {
self.setReplyToMsgIdFtnReplyKludge(message, () => {
callback(null);
});
},
function persistImport(callback) {
message.persist(err => {
callback(err);
});
}
], err => {
cb(err);
}
);
};
//
// Ref. implementations on import:
// * https://github.com/larsks/crashmail/blob/26e5374710c7868dab3d834be14bf4041041aae5/crashmail/pkt.c
// https://github.com/larsks/crashmail/blob/26e5374710c7868dab3d834be14bf4041041aae5/crashmail/handle.c
//
this.importMessagesFromPacketFile = function(packetPath, cb) {
let packetHeader;
new ftnMailPacket.Packet().read(packetPath, (entryType, entryData, next) => {
if('header' === entryType) {
//
// Discover if this packet is for one of our network(s)
//
localNetworkName = self.getNetworkNameByAddress(entryData.destAddress);
} else if(localNetworkName && 'message' === entryType) {
const message = entryData; // so we ref something reasonable :)
packetHeader = entryData;
const localNetworkName = self.getNetworkNameByAddress(packetHeader.destAddress);
if(!_.isString(localNetworkName)) {
next(new Error('No configuration for this packet'));
} else {
next(null);
}
} else if('message' === entryType) {
const message = entryData;
const areaTag = message.meta.FtnProperty.ftn_area;
// :TODO: we need to know if this message is a dupe - UUID will be the same if MSGID, but if not present... what to do?
// :TODO: lookup and set message.areaTag if match
// :TODO: check SEEN-BY for echo
// :TODO: Handle area vs Netmail - Via, etc.
// :TODO: Handle PATH
// :TODO: handle REPLY kludges... set local ID when possible
if(areaTag) {
//
// Find local area tag
// EchoMail
//
const localAreaTag = self.getLocalAreaTagByFtnAreaTag(areaTag);
if(localAreaTag) {
self.importNetMailToArea(localAreaTag, packetHeader, message, err => {
if(err) {
if('SQLITE_CONSTRAINT' === err.code) {
Log.info(
{ subject : message.subject, uuid : message.uuid },
'Not importing non-unique message');
return next(null);
}
}
next(err);
});
} else {
//
// No local area configured for this import
//
// :TODO: Handle the "catch all" case, if configured
}
} else {
//
// NetMail
//
}
}
}, err => {
cb(err);
}); };
});
};
this.importPacketFilesFromDirectory = function(importDir, cb) {
async.waterfall(
@ -672,9 +796,9 @@ function FTNMessageScanTossModule() {
// :TODO: rename to .bad, perhaps move to a rejects dir + log
nextFile();
} else {
//fs.unlink(fullPath, err => {
fs.unlink(fullPath, err => {
nextFile();
//});
});
}
}, err => {
callback(err);
@ -687,7 +811,9 @@ function FTNMessageScanTossModule() {
);
};
this.importMessagesFromDirectory = function(importDir, cb) {
this.importMessagesFromDirectory = function(inboundType, importDir, cb) {
let tempDirectory;
async.waterfall(
[
// start with .pkt files
@ -712,21 +838,37 @@ function FTNMessageScanTossModule() {
},
function createTempDir(bundleFiles, callback) {
temp.mkdir('enigftnimport-', (err, tempDir) => {
callback(err, bundleFiles, tempDir);
tempDirectory = tempDir;
callback(err, bundleFiles);
});
},
function importBundles(bundleFiles, tempDir, callback) {
function importBundles(bundleFiles, callback) {
let rejects = [];
async.each(bundleFiles, (bundleFile, nextFile) => {
if(_.isUndefined(bundleFile.archName)) {
// :TODO: log?
Log.info(
{ fileName : bundleFile.path },
'Unknown bundle archive type');
rejects.push(bundleFile.path);
return nextFile(); // unknown archive type
}
self.archUtil.extractTo(
bundleFile.path,
tempDir,
tempDirectory,
bundleFile.archName,
err => {
if(err) {
Log.info(
{ fileName : bundleFile.path, error : err.toString() },
'Failed to extract bundle');
rejects.push(bundleFile.path);
}
nextFile();
}
);
@ -738,14 +880,39 @@ function FTNMessageScanTossModule() {
//
// All extracted - import .pkt's
//
self.importPacketFilesFromDirectory(tempDir, err => {
callback(err);
self.importPacketFilesFromDirectory(tempDirectory, err => {
callback(null, bundleFiles, rejects);
});
});
},
function handleProcessedBundleFiles(bundleFiles, rejects, callback) {
async.each(bundleFiles, (bundleFile, nextFile) => {
if(rejects.indexOf(bundleFile.path) > -1) {
// :TODO: rename to .bad, perhaps move to a rejects dir + log
nextFile();
} else {
fs.unlink(bundleFile.path, err => {
nextFile();
});
}
}, err => {
callback(err);
});
}
],
err => {
cb(err);
if(tempDirectory) {
temp.cleanup( (errIgnored, stats) => {
Log.trace(
Object.assign(stats, { tempDir : tempDirectory } ),
'Temporary directory cleaned up'
);
cb(err); // orig err
});
} else {
cb(err);
}
}
);
};
@ -818,8 +985,8 @@ FTNMessageScanTossModule.prototype.performImport = function(cb) {
var self = this;
async.each( [ 'inbound', 'secInbound' ], (importDir, nextDir) => {
self.importMessagesFromDirectory(self.moduleConfig.paths[importDir], err => {
async.each( [ 'inbound', 'secInbound' ], (inboundType, nextDir) => {
self.importMessagesFromDirectory(inboundType, self.moduleConfig.paths[inboundType], err => {
nextDir();
});
@ -879,7 +1046,7 @@ FTNMessageScanTossModule.prototype.performExport = function(cb) {
const newLastScanId = msgRows[msgRows.length - 1].message_id;
Log.info(
{ messagesExported : msgRows.length, newLastScanId : newLastScanId },
{ areaTag : areaTag, messagesExported : msgRows.length, newLastScanId : newLastScanId },
'Export complete');
callback(err, newLastScanId);