From 43e7b3250d9d919e8f90f57d2ebf95c0b930e016 Mon Sep 17 00:00:00 2001 From: Pinga <121483313+getpinga@users.noreply.github.com> Date: Fri, 23 Feb 2024 02:14:41 +0200 Subject: [PATCH] Much faster email sending with the new Message Broker --- automation/composer.json | 3 +- automation/messagebroker.php | 171 ---------------------------------- automation/msg_producer.php | 73 +++++++++++++++ automation/msg_worker.php | 176 +++++++++++++++++++++++++++++++++++ cp/app/Lib/Mail.php | 26 ++++++ cp/env-sample | 2 +- docs/install.md | 9 +- 7 files changed, 286 insertions(+), 174 deletions(-) delete mode 100644 automation/messagebroker.php create mode 100644 automation/msg_producer.php create mode 100644 automation/msg_worker.php diff --git a/automation/composer.json b/automation/composer.json index 8f51b88..6c93cc5 100644 --- a/automation/composer.json +++ b/automation/composer.json @@ -7,6 +7,7 @@ "monolog/monolog": "^3.5", "league/flysystem": "^3.23", "league/flysystem-sftp-v3": "^3.22", - "peppeocchi/php-cron-scheduler": "^4.0" + "peppeocchi/php-cron-scheduler": "^4.0", + "predis/predis": "^2.2", } } diff --git a/automation/messagebroker.php b/automation/messagebroker.php deleted file mode 100644 index ef7182f..0000000 --- a/automation/messagebroker.php +++ /dev/null @@ -1,171 +0,0 @@ -set([ - 'daemonize' => false, - 'log_file' => '/var/log/namingo/messagebroker_application.log', - 'log_level' => SWOOLE_LOG_INFO, - 'worker_num' => swoole_cpu_num() * 2, - 'pid_file' => '/var/run/messagebroker.pid' -]); -$logFilePath = '/var/log/namingo/messagebroker.log'; -$log = setupLogger($logFilePath, 'Message_Broker'); -$log->info('job started.'); - -$server->on("request", function (Request $request, Response $response) use ($c,$log) { - // Parse the received data - $data = json_decode($request->rawContent(), true); - - if (!$data || !isset($data['type'])) { - $response->end(json_encode(['status' => 'error', 'message' => 'Invalid request'])); - return; - } - - switch ($data['type']) { - case 'sendmail': - if ($c['mailer'] == 'phpmailer') { - $mail = new PHPMailer(true); - try { - $mail->isSMTP(); - $mail->Host = $c['mailer_smtp_host']; - $mail->SMTPAuth = true; - $mail->Username = $c['mailer_smtp_username']; - $mail->Password = $c['mailer_smtp_password']; - $mail->SMTPSecure = 'tls'; - $mail->Port = $c['mailer_smtp_port']; - $mail->setFrom($c['mailer_from']); - $mail->addAddress($data['toEmail']); - $mail->Subject = $data['subject']; - $mail->Body = $data['body']; - $mail->send(); - } catch (Exception $e) { - $response->end(json_encode(['status' => 'error', 'message' => 'Mail could not be sent. PHPMailer Error: ' . $mail->ErrorInfo])); - return; - } - } elseif ($c['mailer'] == 'sendgrid') { - $message = new Email( - from: [$c['mailer_from']], - to: [$data['toEmail']], - subject: $data['subject'], - content: $data['body'] - ); - $messaging = new Sendgrid($c['mailer_api_key']); - $messaging->send($message); - } elseif ($c['mailer'] == 'mailgun') { - $message = new Email( - from: [$c['mailer_from']], - to: [$data['toEmail']], - subject: $data['subject'], - content: $data['body'] - ); - $messaging = new Mailgun($c['mailer_api_key'], $c['mailer_domain']); - $messaging->send($message); - } else { - $response->end(json_encode(['status' => 'error', 'message' => 'Invalid mailer specified'])); - return; - } - break; - - case 'sendsms': - if ($c['mailer_sms'] == 'twilio') { - $message = new SMS( - to: [$data['toSMS']], - content: $data['contentSMS'] - ); - $messaging = new Twilio($c['mailer_sms_account'], $c['mailer_sms_auth']); - $messaging->send($message); - } elseif ($c['mailer_sms'] == 'telesign') { - $message = new SMS( - to: [$data['toSMS']], - content: $data['contentSMS'] - ); - $messaging = new Telesign($c['mailer_sms_account'], $c['mailer_sms_auth']); - $messaging->send($message); - } elseif ($c['mailer_sms'] == 'plivo') { - $message = new SMS( - to: [$data['toSMS']], - content: $data['contentSMS'] - ); - $messaging = new Plivo($c['mailer_sms_account'], $c['mailer_sms_auth']); - $messaging->send($message); - } elseif ($c['mailer_sms'] == 'vonage') { - $message = new SMS( - to: [$data['toSMS']], - content: $data['contentSMS'] - ); - $messaging = new Vonage($c['mailer_sms_account'], $c['mailer_sms_auth']); - $messaging->send($message); - } elseif ($c['mailer_sms'] == 'clickatell') { - $message = new SMS( - to: [$data['toSMS']], - content: $data['contentSMS'] - ); - $messaging = new Clickatell($c['mailer_sms_account']); - $messaging->send($message); - } else { - $response->end(json_encode(['status' => 'error', 'message' => 'Invalid SMS provider specified'])); - return; - } - break; - - default: - $response->end(json_encode(['status' => 'error', 'message' => 'Unknown action'])); - return; - } - - $log->info('job finished successfully.'); - $response->end(json_encode(['status' => 'success'])); -}); - -$server->start(); - -/* USAGE - -$url = 'http://127.0.0.1:8250'; -$data = ['type' => 'sendmail', 'other_params' => '...']; - -$options = [ - CURLOPT_RETURNTRANSFER => true, - CURLOPT_CUSTOMREQUEST => 'POST', - CURLOPT_POSTFIELDS => json_encode($data), - CURLOPT_HTTPHEADER => [ - 'Content-Type: application/json', - 'Content-Length: ' . strlen(json_encode($data)) - ], -]; - -$curl = curl_init($url); -curl_setopt_array($curl, $options); - -$response = curl_exec($curl); - -if ($response === false) { - throw new Exception(curl_error($curl), curl_errno($curl)); -} - -curl_close($curl); - -print_r($response);*/ \ No newline at end of file diff --git a/automation/msg_producer.php b/automation/msg_producer.php new file mode 100644 index 0000000..6b4a0e7 --- /dev/null +++ b/automation/msg_producer.php @@ -0,0 +1,73 @@ + 'tcp', + 'host' => '127.0.0.1', + 'port' => 6379, +]); + +$server = new Server("127.0.0.1", 8250); +$server->set([ + 'daemonize' => false, + 'log_file' => '/var/log/namingo/msg_producer_app.log', + 'log_level' => SWOOLE_LOG_INFO, + 'worker_num' => swoole_cpu_num() * 2, + 'pid_file' => '/var/run/msg_producer.pid' +]); +$logFilePath = '/var/log/namingo/msg_producer.log'; +$log = setupLogger($logFilePath, 'Msg_Producer'); +$log->info('job started.'); + +$server->on("request", function (Request $request, Response $response) use ($redis) { + $data = json_decode($request->rawContent(), true); + + if (!$data || !isset($data['type'])) { + $response->end(json_encode(['status' => 'error', 'message' => 'Invalid request'])); + return; + } + + // Enqueue the message + $redis->lpush('message_queue', json_encode($data)); + + $response->end(json_encode(['status' => 'success', 'message' => 'Message queued for delivery'])); +}); + +$server->start(); + +/* USAGE + +$url = 'http://127.0.0.1:8250'; +$data = ['type' => 'sendmail', 'other_params' => '...']; + +$options = [ + CURLOPT_RETURNTRANSFER => true, + CURLOPT_CUSTOMREQUEST => 'POST', + CURLOPT_POSTFIELDS => json_encode($data), + CURLOPT_HTTPHEADER => [ + 'Content-Type: application/json', + 'Content-Length: ' . strlen(json_encode($data)) + ], +]; + +$curl = curl_init($url); +curl_setopt_array($curl, $options); + +$response = curl_exec($curl); + +if ($response === false) { + throw new Exception(curl_error($curl), curl_errno($curl)); +} + +curl_close($curl); + +print_r($response);*/ \ No newline at end of file diff --git a/automation/msg_worker.php b/automation/msg_worker.php new file mode 100644 index 0000000..d214bf6 --- /dev/null +++ b/automation/msg_worker.php @@ -0,0 +1,176 @@ + 'tcp', + 'host' => '127.0.0.1', + 'port' => 6379, + 'persistent' => true, + ]); +} + +$redis = setupRedisClient(); + +$logFilePath = '/var/log/namingo/msg_worker.log'; +$log = setupLogger($logFilePath, 'Msg_Worker'); +$log->info('job started.'); + +// Maximum number of retries for a message +$maxRetries = 3; +// Key for retry messages to avoid infinite loops +$retryQueueKey = 'message_queue_retry'; + +while (true) { + try { + $rawData = $redis->brpop(['message_queue', $retryQueueKey], 0); + } catch (ConnectionException $e) { + //$log->error("Redis connection lost. Attempting to reconnect.", ['error' => $e->getMessage()]); + sleep(5); // Pause before retrying to avoid flooding logs and giving time to recover + $redis = setupRedisClient(); // Attempt to reconnect + continue; // Skip the current iteration and try again + } + + if (!$rawData) { + continue; // In case of an empty or failed read, continue to the next iteration + } + + $queueKey = $rawData[0]; + $data = json_decode($rawData[1], true); + + try { + switch ($data['type']) { + case 'sendmail': + if ($c['mailer'] == 'phpmailer') { + $mail = new PHPMailer(true); + try { + $mail->isSMTP(); + $mail->Host = $c['mailer_smtp_host']; + $mail->SMTPAuth = true; + $mail->Username = $c['mailer_smtp_username']; + $mail->Password = $c['mailer_smtp_password']; + $mail->SMTPSecure = 'tls'; + if (substr($data['body'], 0, 2) === "isHTML(true); + } + $mail->Port = $c['mailer_smtp_port']; + $mail->setFrom($c['mailer_from']); + $mail->addAddress($data['toEmail']); + $mail->Subject = $data['subject']; + $mail->Body = $data['body']; + if (substr($data['body'], 0, 2) === "AltBody = $data['body']; + } + $mail->send(); + } catch (Exception $e) { + throw new Exception("Failed to send email"); + } + } elseif ($c['mailer'] == 'sendgrid') { + $message = new Email( + from: [$c['mailer_from']], + to: [$data['toEmail']], + subject: $data['subject'], + content: $data['body'] + ); + $messaging = new Sendgrid($c['mailer_api_key']); + $messaging->send($message); + } elseif ($c['mailer'] == 'mailgun') { + $message = new Email( + from: [$c['mailer_from']], + to: [$data['toEmail']], + subject: $data['subject'], + content: $data['body'] + ); + $messaging = new Mailgun($c['mailer_api_key'], $c['mailer_domain']); + $messaging->send($message); + } else { + throw new Exception("Invalid mailer specified"); + } + break; + + case 'sendsms': + if ($c['mailer_sms'] == 'twilio') { + $message = new SMS( + to: [$data['toSMS']], + content: $data['contentSMS'] + ); + $messaging = new Twilio($c['mailer_sms_account'], $c['mailer_sms_auth']); + $messaging->send($message); + } elseif ($c['mailer_sms'] == 'telesign') { + $message = new SMS( + to: [$data['toSMS']], + content: $data['contentSMS'] + ); + $messaging = new Telesign($c['mailer_sms_account'], $c['mailer_sms_auth']); + $messaging->send($message); + } elseif ($c['mailer_sms'] == 'plivo') { + $message = new SMS( + to: [$data['toSMS']], + content: $data['contentSMS'] + ); + $messaging = new Plivo($c['mailer_sms_account'], $c['mailer_sms_auth']); + $messaging->send($message); + } elseif ($c['mailer_sms'] == 'vonage') { + $message = new SMS( + to: [$data['toSMS']], + content: $data['contentSMS'] + ); + $messaging = new Vonage($c['mailer_sms_account'], $c['mailer_sms_auth']); + $messaging->send($message); + } elseif ($c['mailer_sms'] == 'clickatell') { + $message = new SMS( + to: [$data['toSMS']], + content: $data['contentSMS'] + ); + $messaging = new Clickatell($c['mailer_sms_account']); + $messaging->send($message); + } else { + throw new Exception("Invalid SMS provider specified"); + } + break; + + default: + throw new Exception("Unknown action"); + } + + $log->info("Processed message successfully", ['type' => $data['type']]); + } catch (Exception $e) { + // Check if this message has been retried too many times + if (!isset($data['retries'])) { + $data['retries'] = 0; + } + + $data['retries']++; + + if ($data['retries'] > $maxRetries) { + // Log failure after exceeding retries + $log->error("Message processing failed after retries", ['type' => $data['type'], 'error' => $e->getMessage()]); + } else { + // Re-queue the message for retry + $redis->lpush($retryQueueKey, json_encode($data)); + $log->warning("Message processing failed, retrying", ['type' => $data['type'], 'retry' => $data['retries'], 'error' => $e->getMessage()]); + } + } +} \ No newline at end of file diff --git a/cp/app/Lib/Mail.php b/cp/app/Lib/Mail.php index f1d6cb4..224a62c 100644 --- a/cp/app/Lib/Mail.php +++ b/cp/app/Lib/Mail.php @@ -60,6 +60,32 @@ class Mail echo "Message could not be sent. Mailer Error: {$mail->ErrorInfo}"; return false; } + } else if (envi('MAIL_DRIVER') == 'msg') { + $url = 'http://127.0.0.1:8250'; + $data = ['type' => 'sendmail', 'mailer' => 'phpmailer', 'toEmail' => $to['email'], 'subject' => $subject, 'body' => $body]; + + $options = [ + CURLOPT_RETURNTRANSFER => true, + CURLOPT_CUSTOMREQUEST => 'POST', + CURLOPT_POSTFIELDS => json_encode($data), + CURLOPT_HTTPHEADER => [ + 'Content-Type: application/json', + 'Content-Length: ' . strlen(json_encode($data)) + ], + ]; + + $curl = curl_init($url); + curl_setopt_array($curl, $options); + + $response = curl_exec($curl); + + if ($response === false) { + throw new Exception(curl_error($curl), curl_errno($curl)); + } + + curl_close($curl); + + return true; } else { return true; } diff --git a/cp/env-sample b/cp/env-sample index 142c645..1ea136a 100644 --- a/cp/env-sample +++ b/cp/env-sample @@ -11,7 +11,7 @@ DB_USERNAME=root DB_PASSWORD= DB_PORT=3306 -#mailer settings (Driver = smtp or utopia, Api Provder = sendgrid or mailgun) +#mailer settings (Driver = smtp, utopia or msg [for local message broker]; Api Provder = sendgrid or mailgun) MAIL_DRIVER=none MAIL_HOST=smtp.mailtrap.io diff --git a/docs/install.md b/docs/install.md index 9717666..b459b70 100644 --- a/docs/install.md +++ b/docs/install.md @@ -566,7 +566,14 @@ You can easily configure the message broker for email delivery in ```config.php` For establishing your own mail server, Mox, available at [GitHub](https://github.com/mjl-/mox), provides a comprehensive solution. Install Mox following its GitHub instructions, then enter the required details in the ```config.php``` file. -To run the messagebroker.php script, execute the following command: ```/usr/bin/php /opt/registry/automation/messagebroker.php &```. This will start the script and place it in the background, allowing it to run independently of your current terminal session. +To run the Message Broker, execute the following commands: + +```bash +/usr/bin/php /opt/registry/automation/msg_producer.php & +/usr/bin/php /opt/registry/automation/msg_worker.php & +``` + +This will start the system and place it in the background, allowing it to run independently of your current terminal session. ### Setting Up an Audit Trail Database for Namingo