Much faster email sending with the new Message Broker

This commit is contained in:
Pinga 2024-02-23 02:14:41 +02:00
parent 23d50a0b0c
commit 43e7b3250d
7 changed files with 286 additions and 174 deletions

View file

@ -7,6 +7,7 @@
"monolog/monolog": "^3.5",
"league/flysystem": "^3.23",
"league/flysystem-sftp-v3": "^3.22",
"peppeocchi/php-cron-scheduler": "^4.0"
"peppeocchi/php-cron-scheduler": "^4.0",
"predis/predis": "^2.2",
}
}

View file

@ -1,171 +0,0 @@
<?php
use Swoole\Http\Server;
use Swoole\Http\Request;
use Swoole\Http\Response;
use PHPMailer\PHPMailer\PHPMailer;
use PHPMailer\PHPMailer\Exception;
use \Utopia\Messaging\Messages\Email;
use \Utopia\Messaging\Adapters\Email\SendGrid;
use \Utopia\Messaging\Adapters\Email\Mailgun;
use \Utopia\Messaging\Messages\SMS;
use \Utopia\Messaging\Adapters\SMS\Twilio;
use \Utopia\Messaging\Adapters\SMS\Telesign;
use \Utopia\Messaging\Adapters\SMS\Plivo;
use \Utopia\Messaging\Adapters\SMS\Vonage;
use \Utopia\Messaging\Adapters\SMS\Clickatell;
require __DIR__ . '/vendor/autoload.php';
$c = require_once 'config.php';
require_once 'helpers.php';
$server = new Server("127.0.0.1", 8250);
$server->set([
'daemonize' => false,
'log_file' => '/var/log/namingo/messagebroker_application.log',
'log_level' => SWOOLE_LOG_INFO,
'worker_num' => swoole_cpu_num() * 2,
'pid_file' => '/var/run/messagebroker.pid'
]);
$logFilePath = '/var/log/namingo/messagebroker.log';
$log = setupLogger($logFilePath, 'Message_Broker');
$log->info('job started.');
$server->on("request", function (Request $request, Response $response) use ($c,$log) {
// Parse the received data
$data = json_decode($request->rawContent(), true);
if (!$data || !isset($data['type'])) {
$response->end(json_encode(['status' => 'error', 'message' => 'Invalid request']));
return;
}
switch ($data['type']) {
case 'sendmail':
if ($c['mailer'] == 'phpmailer') {
$mail = new PHPMailer(true);
try {
$mail->isSMTP();
$mail->Host = $c['mailer_smtp_host'];
$mail->SMTPAuth = true;
$mail->Username = $c['mailer_smtp_username'];
$mail->Password = $c['mailer_smtp_password'];
$mail->SMTPSecure = 'tls';
$mail->Port = $c['mailer_smtp_port'];
$mail->setFrom($c['mailer_from']);
$mail->addAddress($data['toEmail']);
$mail->Subject = $data['subject'];
$mail->Body = $data['body'];
$mail->send();
} catch (Exception $e) {
$response->end(json_encode(['status' => 'error', 'message' => 'Mail could not be sent. PHPMailer Error: ' . $mail->ErrorInfo]));
return;
}
} elseif ($c['mailer'] == 'sendgrid') {
$message = new Email(
from: [$c['mailer_from']],
to: [$data['toEmail']],
subject: $data['subject'],
content: $data['body']
);
$messaging = new Sendgrid($c['mailer_api_key']);
$messaging->send($message);
} elseif ($c['mailer'] == 'mailgun') {
$message = new Email(
from: [$c['mailer_from']],
to: [$data['toEmail']],
subject: $data['subject'],
content: $data['body']
);
$messaging = new Mailgun($c['mailer_api_key'], $c['mailer_domain']);
$messaging->send($message);
} else {
$response->end(json_encode(['status' => 'error', 'message' => 'Invalid mailer specified']));
return;
}
break;
case 'sendsms':
if ($c['mailer_sms'] == 'twilio') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Twilio($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] == 'telesign') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Telesign($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] == 'plivo') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Plivo($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] == 'vonage') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Vonage($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] == 'clickatell') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Clickatell($c['mailer_sms_account']);
$messaging->send($message);
} else {
$response->end(json_encode(['status' => 'error', 'message' => 'Invalid SMS provider specified']));
return;
}
break;
default:
$response->end(json_encode(['status' => 'error', 'message' => 'Unknown action']));
return;
}
$log->info('job finished successfully.');
$response->end(json_encode(['status' => 'success']));
});
$server->start();
/* USAGE
$url = 'http://127.0.0.1:8250';
$data = ['type' => 'sendmail', 'other_params' => '...'];
$options = [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_CUSTOMREQUEST => 'POST',
CURLOPT_POSTFIELDS => json_encode($data),
CURLOPT_HTTPHEADER => [
'Content-Type: application/json',
'Content-Length: ' . strlen(json_encode($data))
],
];
$curl = curl_init($url);
curl_setopt_array($curl, $options);
$response = curl_exec($curl);
if ($response === false) {
throw new Exception(curl_error($curl), curl_errno($curl));
}
curl_close($curl);
print_r($response);*/

View file

@ -0,0 +1,73 @@
<?php
require __DIR__ . '/vendor/autoload.php';
use Predis\Client as RedisClient;
use Swoole\Http\Server;
use Swoole\Http\Request;
use Swoole\Http\Response;
$c = require_once 'config.php';
require_once 'helpers.php';
$redis = new RedisClient([
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
]);
$server = new Server("127.0.0.1", 8250);
$server->set([
'daemonize' => false,
'log_file' => '/var/log/namingo/msg_producer_app.log',
'log_level' => SWOOLE_LOG_INFO,
'worker_num' => swoole_cpu_num() * 2,
'pid_file' => '/var/run/msg_producer.pid'
]);
$logFilePath = '/var/log/namingo/msg_producer.log';
$log = setupLogger($logFilePath, 'Msg_Producer');
$log->info('job started.');
$server->on("request", function (Request $request, Response $response) use ($redis) {
$data = json_decode($request->rawContent(), true);
if (!$data || !isset($data['type'])) {
$response->end(json_encode(['status' => 'error', 'message' => 'Invalid request']));
return;
}
// Enqueue the message
$redis->lpush('message_queue', json_encode($data));
$response->end(json_encode(['status' => 'success', 'message' => 'Message queued for delivery']));
});
$server->start();
/* USAGE
$url = 'http://127.0.0.1:8250';
$data = ['type' => 'sendmail', 'other_params' => '...'];
$options = [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_CUSTOMREQUEST => 'POST',
CURLOPT_POSTFIELDS => json_encode($data),
CURLOPT_HTTPHEADER => [
'Content-Type: application/json',
'Content-Length: ' . strlen(json_encode($data))
],
];
$curl = curl_init($url);
curl_setopt_array($curl, $options);
$response = curl_exec($curl);
if ($response === false) {
throw new Exception(curl_error($curl), curl_errno($curl));
}
curl_close($curl);
print_r($response);*/

176
automation/msg_worker.php Normal file
View file

@ -0,0 +1,176 @@
<?php
require __DIR__ . '/vendor/autoload.php';
use Predis\Client as RedisClient;
use Predis\Connection\ConnectionException;
use PHPMailer\PHPMailer\PHPMailer;
use PHPMailer\PHPMailer\Exception;
use \Utopia\Messaging\Messages\Email;
use \Utopia\Messaging\Adapters\Email\SendGrid;
use \Utopia\Messaging\Adapters\Email\Mailgun;
use \Utopia\Messaging\Messages\SMS;
use \Utopia\Messaging\Adapters\SMS\Twilio;
use \Utopia\Messaging\Adapters\SMS\Telesign;
use \Utopia\Messaging\Adapters\SMS\Plivo;
use \Utopia\Messaging\Adapters\SMS\Vonage;
use \Utopia\Messaging\Adapters\SMS\Clickatell;
$c = require_once 'config.php';
require_once 'helpers.php';
// Setup initial Redis client
function setupRedisClient() {
return new RedisClient([
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
'persistent' => true,
]);
}
$redis = setupRedisClient();
$logFilePath = '/var/log/namingo/msg_worker.log';
$log = setupLogger($logFilePath, 'Msg_Worker');
$log->info('job started.');
// Maximum number of retries for a message
$maxRetries = 3;
// Key for retry messages to avoid infinite loops
$retryQueueKey = 'message_queue_retry';
while (true) {
try {
$rawData = $redis->brpop(['message_queue', $retryQueueKey], 0);
} catch (ConnectionException $e) {
//$log->error("Redis connection lost. Attempting to reconnect.", ['error' => $e->getMessage()]);
sleep(5); // Pause before retrying to avoid flooding logs and giving time to recover
$redis = setupRedisClient(); // Attempt to reconnect
continue; // Skip the current iteration and try again
}
if (!$rawData) {
continue; // In case of an empty or failed read, continue to the next iteration
}
$queueKey = $rawData[0];
$data = json_decode($rawData[1], true);
try {
switch ($data['type']) {
case 'sendmail':
if ($c['mailer'] == 'phpmailer') {
$mail = new PHPMailer(true);
try {
$mail->isSMTP();
$mail->Host = $c['mailer_smtp_host'];
$mail->SMTPAuth = true;
$mail->Username = $c['mailer_smtp_username'];
$mail->Password = $c['mailer_smtp_password'];
$mail->SMTPSecure = 'tls';
if (substr($data['body'], 0, 2) === "<!") {
$mail->isHTML(true);
}
$mail->Port = $c['mailer_smtp_port'];
$mail->setFrom($c['mailer_from']);
$mail->addAddress($data['toEmail']);
$mail->Subject = $data['subject'];
$mail->Body = $data['body'];
if (substr($data['body'], 0, 2) === "<!") {
$mail->AltBody = $data['body'];
}
$mail->send();
} catch (Exception $e) {
throw new Exception("Failed to send email");
}
} elseif ($c['mailer'] == 'sendgrid') {
$message = new Email(
from: [$c['mailer_from']],
to: [$data['toEmail']],
subject: $data['subject'],
content: $data['body']
);
$messaging = new Sendgrid($c['mailer_api_key']);
$messaging->send($message);
} elseif ($c['mailer'] == 'mailgun') {
$message = new Email(
from: [$c['mailer_from']],
to: [$data['toEmail']],
subject: $data['subject'],
content: $data['body']
);
$messaging = new Mailgun($c['mailer_api_key'], $c['mailer_domain']);
$messaging->send($message);
} else {
throw new Exception("Invalid mailer specified");
}
break;
case 'sendsms':
if ($c['mailer_sms'] == 'twilio') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Twilio($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] == 'telesign') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Telesign($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] == 'plivo') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Plivo($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] == 'vonage') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Vonage($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] == 'clickatell') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Clickatell($c['mailer_sms_account']);
$messaging->send($message);
} else {
throw new Exception("Invalid SMS provider specified");
}
break;
default:
throw new Exception("Unknown action");
}
$log->info("Processed message successfully", ['type' => $data['type']]);
} catch (Exception $e) {
// Check if this message has been retried too many times
if (!isset($data['retries'])) {
$data['retries'] = 0;
}
$data['retries']++;
if ($data['retries'] > $maxRetries) {
// Log failure after exceeding retries
$log->error("Message processing failed after retries", ['type' => $data['type'], 'error' => $e->getMessage()]);
} else {
// Re-queue the message for retry
$redis->lpush($retryQueueKey, json_encode($data));
$log->warning("Message processing failed, retrying", ['type' => $data['type'], 'retry' => $data['retries'], 'error' => $e->getMessage()]);
}
}
}

View file

@ -60,6 +60,32 @@ class Mail
echo "Message could not be sent. Mailer Error: {$mail->ErrorInfo}";
return false;
}
} else if (envi('MAIL_DRIVER') == 'msg') {
$url = 'http://127.0.0.1:8250';
$data = ['type' => 'sendmail', 'mailer' => 'phpmailer', 'toEmail' => $to['email'], 'subject' => $subject, 'body' => $body];
$options = [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_CUSTOMREQUEST => 'POST',
CURLOPT_POSTFIELDS => json_encode($data),
CURLOPT_HTTPHEADER => [
'Content-Type: application/json',
'Content-Length: ' . strlen(json_encode($data))
],
];
$curl = curl_init($url);
curl_setopt_array($curl, $options);
$response = curl_exec($curl);
if ($response === false) {
throw new Exception(curl_error($curl), curl_errno($curl));
}
curl_close($curl);
return true;
} else {
return true;
}

View file

@ -11,7 +11,7 @@ DB_USERNAME=root
DB_PASSWORD=
DB_PORT=3306
#mailer settings (Driver = smtp or utopia, Api Provder = sendgrid or mailgun)
#mailer settings (Driver = smtp, utopia or msg [for local message broker]; Api Provder = sendgrid or mailgun)
MAIL_DRIVER=none
MAIL_HOST=smtp.mailtrap.io

View file

@ -566,7 +566,14 @@ You can easily configure the message broker for email delivery in ```config.php`
For establishing your own mail server, Mox, available at [GitHub](https://github.com/mjl-/mox), provides a comprehensive solution. Install Mox following its GitHub instructions, then enter the required details in the ```config.php``` file.
To run the messagebroker.php script, execute the following command: ```/usr/bin/php /opt/registry/automation/messagebroker.php &```. This will start the script and place it in the background, allowing it to run independently of your current terminal session.
To run the Message Broker, execute the following commands:
```bash
/usr/bin/php /opt/registry/automation/msg_producer.php &
/usr/bin/php /opt/registry/automation/msg_worker.php &
```
This will start the system and place it in the background, allowing it to run independently of your current terminal session.
### Setting Up an Audit Trail Database for Namingo