Final fix on the msg worker

This commit is contained in:
Pinga 2025-02-06 17:38:43 +02:00
parent 6651e3d613
commit e143b1ad39

View file

@ -3,14 +3,11 @@
* msg_worker.php * msg_worker.php
* *
* A worker script that continuously pulls messages from a Redis queue and processes them. * A worker script that continuously pulls messages from a Redis queue and processes them.
* Uses Swoole's coroutine runtime and Coroutine Redis client.
*/ */
// Enable strict types if desired // Enable strict types if desired
declare(strict_types=1); declare(strict_types=1);
use Swoole\Coroutine;
use Swoole\Coroutine\Redis;
use PHPMailer\PHPMailer\PHPMailer; use PHPMailer\PHPMailer\PHPMailer;
use PHPMailer\PHPMailer\Exception as PHPMailerException; use PHPMailer\PHPMailer\Exception as PHPMailerException;
use Utopia\Messaging\Messages\Email; use Utopia\Messaging\Messages\Email;
@ -39,7 +36,7 @@ $maxRetries = 3;
$retryQueueKey = 'message_queue_retry'; $retryQueueKey = 'message_queue_retry';
/** /**
* Creates and returns a new Coroutine Redis connection. * Creates and returns a new Redis connection.
* *
* @return Redis * @return Redis
* @throws Exception if connection fails. * @throws Exception if connection fails.
@ -53,165 +50,156 @@ function connectRedis(): Redis {
return $redis; return $redis;
} }
// Run the worker inside Swoole's coroutine runtime. // Run the worker
Swoole\Coroutine\run(function() use ($c, $logger, $maxRetries, $retryQueueKey) { $redis = connectRedis();
$redis = connectRedis(); $logger->info("Worker started, waiting for messages...");
$logger->info("Worker started, waiting for messages...");
while (true) { while (true) {
try {
// brPop blocks until a message is available.
// It returns an array: [queueKey, messageData]
$result = $redis->brPop(['message_queue', $retryQueueKey], 1);
} catch (Exception $e) {
$logger->error("Redis error", ['error' => $e->getMessage()]);
// Wait before trying to reconnect
sleep(5);
try { try {
// brPop blocks until a message is available. $redis = connectRedis();
// It returns an array: [queueKey, messageData] } catch (Exception $ex) {
$result = $redis->brPop(['message_queue', $retryQueueKey], 1); $logger->error("Redis reconnection failed", ['error' => $ex->getMessage()]);
} catch (Exception $e) {
$logger->error("Redis error", ['error' => $e->getMessage()]);
// Wait before trying to reconnect
Coroutine::sleep(5);
try {
$redis = connectRedis();
} catch (Exception $ex) {
$logger->error("Redis reconnection failed", ['error' => $ex->getMessage()]);
continue;
}
continue; continue;
} }
continue;
}
if (!$result) { if (!$result) {
continue; continue;
} }
// Decode the message data // Decode the message data
$queueKey = $result[0]; $queueKey = $result[0];
$data = json_decode($result[1], true); $data = json_decode($result[1], true);
if (!$data) { if (!$data) {
$logger->warning("Received invalid message from Redis", ['raw' => $result[1]]); $logger->warning("Received invalid message from Redis", ['raw' => $result[1]]);
continue; continue;
} }
try { try {
switch ($data['type']) { switch ($data['type']) {
case 'sendmail': case 'sendmail':
if ($c['mailer'] === 'phpmailer') { if ($c['mailer'] === 'phpmailer') {
$mail = new PHPMailer(true); $mail = new PHPMailer(true);
try { try {
$mail->SMTPDebug = 0; $mail->SMTPDebug = 1;
$mail->isSMTP(); $mail->isSMTP();
$mail->Host = $c['mailer_smtp_host']; $mail->Host = $c['mailer_smtp_host'];
$mail->SMTPAuth = true; $mail->SMTPAuth = true;
$mail->Username = $c['mailer_smtp_username']; $mail->Username = $c['mailer_smtp_username'];
$mail->Password = $c['mailer_smtp_password']; $mail->Password = $c['mailer_smtp_password'];
$mail->SMTPSecure = 'tls'; $mail->SMTPSecure = 'tls';
$mail->Port = $c['mailer_smtp_port']; $mail->Port = $c['mailer_smtp_port'];
$mail->setFrom($c['mailer_from']); $mail->setFrom($c['mailer_from']);
$mail->addAddress($data['toEmail']); $mail->addAddress($data['toEmail']);
$mail->Subject = $data['subject']; $mail->Subject = $data['subject'];
$mail->Body = $data['body']; $mail->Body = $data['body'];
if (substr($data['body'], 0, 2) === "<!") { if (substr($data['body'], 0, 2) === "<!") {
$mail->isHTML(true); $mail->isHTML(true);
$mail->AltBody = strip_tags($data['body']); $mail->AltBody = strip_tags($data['body']);
}
Swoole\Coroutine::create(function() use ($mail) {
try {
$mail->send();
} catch (Exception $e) {
echo "Mail Error: " . $e->getMessage();
}
});
} catch (PHPMailerException $e) {
$logger->error("PHPMailer error: ", ['error' => $e->getMessage()]);
} }
} elseif ($c['mailer'] === 'sendgrid') { $mail->send();
$message = new Email( } catch (PHPMailerException $e) {
from: [$c['mailer_from']], $logger->error("PHPMailer error: ", ['error' => $e->getMessage()]);
to: [$data['toEmail']],
subject: $data['subject'],
content: $data['body']
);
$messaging = new SendGrid($c['mailer_api_key']);
$messaging->send($message);
} elseif ($c['mailer'] === 'mailgun') {
$message = new Email(
from: [$c['mailer_from']],
to: [$data['toEmail']],
subject: $data['subject'],
content: $data['body']
);
$messaging = new Mailgun($c['mailer_api_key'], $c['mailer_domain']);
$messaging->send($message);
} else {
$logger->error("Invalid mailer specified");
} }
break; } elseif ($c['mailer'] === 'sendgrid') {
$message = new Email(
from: [$c['mailer_from']],
to: [$data['toEmail']],
subject: $data['subject'],
content: $data['body']
);
$messaging = new SendGrid($c['mailer_api_key']);
$messaging->send($message);
} elseif ($c['mailer'] === 'mailgun') {
$message = new Email(
from: [$c['mailer_from']],
to: [$data['toEmail']],
subject: $data['subject'],
content: $data['body']
);
$messaging = new Mailgun($c['mailer_api_key'], $c['mailer_domain']);
$messaging->send($message);
} else {
$logger->error("Invalid mailer specified");
}
break;
case 'sendsms': case 'sendsms':
if ($c['mailer_sms'] === 'twilio') { if ($c['mailer_sms'] === 'twilio') {
$message = new SMS( $message = new SMS(
to: [$data['toSMS']], to: [$data['toSMS']],
content: $data['contentSMS'] content: $data['contentSMS']
); );
$messaging = new Twilio($c['mailer_sms_account'], $c['mailer_sms_auth']); $messaging = new Twilio($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message); $messaging->send($message);
} elseif ($c['mailer_sms'] === 'telesign') { } elseif ($c['mailer_sms'] === 'telesign') {
$message = new SMS( $message = new SMS(
to: [$data['toSMS']], to: [$data['toSMS']],
content: $data['contentSMS'] content: $data['contentSMS']
); );
$messaging = new Telesign($c['mailer_sms_account'], $c['mailer_sms_auth']); $messaging = new Telesign($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message); $messaging->send($message);
} elseif ($c['mailer_sms'] === 'plivo') { } elseif ($c['mailer_sms'] === 'plivo') {
$message = new SMS( $message = new SMS(
to: [$data['toSMS']], to: [$data['toSMS']],
content: $data['contentSMS'] content: $data['contentSMS']
); );
$messaging = new Plivo($c['mailer_sms_account'], $c['mailer_sms_auth']); $messaging = new Plivo($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message); $messaging->send($message);
} elseif ($c['mailer_sms'] === 'vonage') { } elseif ($c['mailer_sms'] === 'vonage') {
$message = new SMS( $message = new SMS(
to: [$data['toSMS']], to: [$data['toSMS']],
content: $data['contentSMS'] content: $data['contentSMS']
); );
$messaging = new Vonage($c['mailer_sms_account'], $c['mailer_sms_auth']); $messaging = new Vonage($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message); $messaging->send($message);
} elseif ($c['mailer_sms'] === 'clickatell') { } elseif ($c['mailer_sms'] === 'clickatell') {
$message = new SMS( $message = new SMS(
to: [$data['toSMS']], to: [$data['toSMS']],
content: $data['contentSMS'] content: $data['contentSMS']
); );
$messaging = new Clickatell($c['mailer_sms_account']); $messaging = new Clickatell($c['mailer_sms_account']);
$messaging->send($message); $messaging->send($message);
} else { } else {
$logger->error("Invalid SMS provider specified"); $logger->error("Invalid SMS provider specified");
} }
break; break;
default: default:
throw new Exception("Unknown message type: " . $data['type']); throw new Exception("Unknown message type: " . $data['type']);
} }
$logger->info("Processed message successfully", ['type' => $data['type']]); $logger->info("Processed message successfully", ['type' => $data['type']]);
} catch (Exception $e) { } catch (Exception $e) {
// Increment the retry counter // Increment the retry counter
if (!isset($data['retries'])) { if (!isset($data['retries'])) {
$data['retries'] = 0; $data['retries'] = 0;
} }
$data['retries']++; $data['retries']++;
if ($data['retries'] > $maxRetries) { if ($data['retries'] > $maxRetries) {
$logger->error("Message processing failed after maximum retries", [ $logger->error("Message processing failed after maximum retries", [
'type' => $data['type'], 'type' => $data['type'],
'error' => $e->getMessage() 'error' => $e->getMessage()
]); ]);
} else { } else {
// Requeue the message for a retry // Requeue the message for a retry
$redis->lPush($retryQueueKey, json_encode($data)); $redis->lPush($retryQueueKey, json_encode($data));
$logger->warning("Message processing failed; requeued for retry", [ $logger->warning("Message processing failed; requeued for retry", [
'type' => $data['type'], 'type' => $data['type'],
'retry' => $data['retries'], 'retry' => $data['retries'],
'error' => $e->getMessage() 'error' => $e->getMessage()
]); ]);
}
} }
} }
}); }