diff --git a/automation/msg_worker.php b/automation/msg_worker.php index 139b76e..4fe8b62 100644 --- a/automation/msg_worker.php +++ b/automation/msg_worker.php @@ -3,14 +3,11 @@ * msg_worker.php * * A worker script that continuously pulls messages from a Redis queue and processes them. - * Uses Swoole's coroutine runtime and Coroutine Redis client. */ // Enable strict types if desired declare(strict_types=1); -use Swoole\Coroutine; -use Swoole\Coroutine\Redis; use PHPMailer\PHPMailer\PHPMailer; use PHPMailer\PHPMailer\Exception as PHPMailerException; use Utopia\Messaging\Messages\Email; @@ -39,7 +36,7 @@ $maxRetries = 3; $retryQueueKey = 'message_queue_retry'; /** - * Creates and returns a new Coroutine Redis connection. + * Creates and returns a new Redis connection. * * @return Redis * @throws Exception if connection fails. @@ -53,165 +50,156 @@ function connectRedis(): Redis { return $redis; } -// Run the worker inside Swoole's coroutine runtime. -Swoole\Coroutine\run(function() use ($c, $logger, $maxRetries, $retryQueueKey) { - $redis = connectRedis(); - $logger->info("Worker started, waiting for messages..."); +// Run the worker +$redis = connectRedis(); +$logger->info("Worker started, waiting for messages..."); - while (true) { +while (true) { + try { + // brPop blocks until a message is available. + // It returns an array: [queueKey, messageData] + $result = $redis->brPop(['message_queue', $retryQueueKey], 1); + } catch (Exception $e) { + $logger->error("Redis error", ['error' => $e->getMessage()]); + // Wait before trying to reconnect + sleep(5); try { - // brPop blocks until a message is available. - // It returns an array: [queueKey, messageData] - $result = $redis->brPop(['message_queue', $retryQueueKey], 1); - } catch (Exception $e) { - $logger->error("Redis error", ['error' => $e->getMessage()]); - // Wait before trying to reconnect - Coroutine::sleep(5); - try { - $redis = connectRedis(); - } catch (Exception $ex) { - $logger->error("Redis reconnection failed", ['error' => $ex->getMessage()]); - continue; - } + $redis = connectRedis(); + } catch (Exception $ex) { + $logger->error("Redis reconnection failed", ['error' => $ex->getMessage()]); continue; } + continue; + } - if (!$result) { - continue; - } + if (!$result) { + continue; + } - // Decode the message data - $queueKey = $result[0]; - $data = json_decode($result[1], true); - if (!$data) { - $logger->warning("Received invalid message from Redis", ['raw' => $result[1]]); - continue; - } + // Decode the message data + $queueKey = $result[0]; + $data = json_decode($result[1], true); + if (!$data) { + $logger->warning("Received invalid message from Redis", ['raw' => $result[1]]); + continue; + } - try { - switch ($data['type']) { - case 'sendmail': - if ($c['mailer'] === 'phpmailer') { - $mail = new PHPMailer(true); - try { - $mail->SMTPDebug = 0; - $mail->isSMTP(); - $mail->Host = $c['mailer_smtp_host']; - $mail->SMTPAuth = true; - $mail->Username = $c['mailer_smtp_username']; - $mail->Password = $c['mailer_smtp_password']; - $mail->SMTPSecure = 'tls'; - $mail->Port = $c['mailer_smtp_port']; + try { + switch ($data['type']) { + case 'sendmail': + if ($c['mailer'] === 'phpmailer') { + $mail = new PHPMailer(true); + try { + $mail->SMTPDebug = 1; + $mail->isSMTP(); + $mail->Host = $c['mailer_smtp_host']; + $mail->SMTPAuth = true; + $mail->Username = $c['mailer_smtp_username']; + $mail->Password = $c['mailer_smtp_password']; + $mail->SMTPSecure = 'tls'; + $mail->Port = $c['mailer_smtp_port']; - $mail->setFrom($c['mailer_from']); - $mail->addAddress($data['toEmail']); - $mail->Subject = $data['subject']; - $mail->Body = $data['body']; - if (substr($data['body'], 0, 2) === "isHTML(true); - $mail->AltBody = strip_tags($data['body']); - } - - Swoole\Coroutine::create(function() use ($mail) { - try { - $mail->send(); - } catch (Exception $e) { - echo "Mail Error: " . $e->getMessage(); - } - }); - } catch (PHPMailerException $e) { - $logger->error("PHPMailer error: ", ['error' => $e->getMessage()]); + $mail->setFrom($c['mailer_from']); + $mail->addAddress($data['toEmail']); + $mail->Subject = $data['subject']; + $mail->Body = $data['body']; + if (substr($data['body'], 0, 2) === "isHTML(true); + $mail->AltBody = strip_tags($data['body']); } - } elseif ($c['mailer'] === 'sendgrid') { - $message = new Email( - from: [$c['mailer_from']], - to: [$data['toEmail']], - subject: $data['subject'], - content: $data['body'] - ); - $messaging = new SendGrid($c['mailer_api_key']); - $messaging->send($message); - } elseif ($c['mailer'] === 'mailgun') { - $message = new Email( - from: [$c['mailer_from']], - to: [$data['toEmail']], - subject: $data['subject'], - content: $data['body'] - ); - $messaging = new Mailgun($c['mailer_api_key'], $c['mailer_domain']); - $messaging->send($message); - } else { - $logger->error("Invalid mailer specified"); + $mail->send(); + } catch (PHPMailerException $e) { + $logger->error("PHPMailer error: ", ['error' => $e->getMessage()]); } - break; + } elseif ($c['mailer'] === 'sendgrid') { + $message = new Email( + from: [$c['mailer_from']], + to: [$data['toEmail']], + subject: $data['subject'], + content: $data['body'] + ); + $messaging = new SendGrid($c['mailer_api_key']); + $messaging->send($message); + } elseif ($c['mailer'] === 'mailgun') { + $message = new Email( + from: [$c['mailer_from']], + to: [$data['toEmail']], + subject: $data['subject'], + content: $data['body'] + ); + $messaging = new Mailgun($c['mailer_api_key'], $c['mailer_domain']); + $messaging->send($message); + } else { + $logger->error("Invalid mailer specified"); + } + break; - case 'sendsms': - if ($c['mailer_sms'] === 'twilio') { - $message = new SMS( - to: [$data['toSMS']], - content: $data['contentSMS'] - ); - $messaging = new Twilio($c['mailer_sms_account'], $c['mailer_sms_auth']); - $messaging->send($message); - } elseif ($c['mailer_sms'] === 'telesign') { - $message = new SMS( - to: [$data['toSMS']], - content: $data['contentSMS'] - ); - $messaging = new Telesign($c['mailer_sms_account'], $c['mailer_sms_auth']); - $messaging->send($message); - } elseif ($c['mailer_sms'] === 'plivo') { - $message = new SMS( - to: [$data['toSMS']], - content: $data['contentSMS'] - ); - $messaging = new Plivo($c['mailer_sms_account'], $c['mailer_sms_auth']); - $messaging->send($message); - } elseif ($c['mailer_sms'] === 'vonage') { - $message = new SMS( - to: [$data['toSMS']], - content: $data['contentSMS'] - ); - $messaging = new Vonage($c['mailer_sms_account'], $c['mailer_sms_auth']); - $messaging->send($message); - } elseif ($c['mailer_sms'] === 'clickatell') { - $message = new SMS( - to: [$data['toSMS']], - content: $data['contentSMS'] - ); - $messaging = new Clickatell($c['mailer_sms_account']); - $messaging->send($message); - } else { - $logger->error("Invalid SMS provider specified"); - } - break; + case 'sendsms': + if ($c['mailer_sms'] === 'twilio') { + $message = new SMS( + to: [$data['toSMS']], + content: $data['contentSMS'] + ); + $messaging = new Twilio($c['mailer_sms_account'], $c['mailer_sms_auth']); + $messaging->send($message); + } elseif ($c['mailer_sms'] === 'telesign') { + $message = new SMS( + to: [$data['toSMS']], + content: $data['contentSMS'] + ); + $messaging = new Telesign($c['mailer_sms_account'], $c['mailer_sms_auth']); + $messaging->send($message); + } elseif ($c['mailer_sms'] === 'plivo') { + $message = new SMS( + to: [$data['toSMS']], + content: $data['contentSMS'] + ); + $messaging = new Plivo($c['mailer_sms_account'], $c['mailer_sms_auth']); + $messaging->send($message); + } elseif ($c['mailer_sms'] === 'vonage') { + $message = new SMS( + to: [$data['toSMS']], + content: $data['contentSMS'] + ); + $messaging = new Vonage($c['mailer_sms_account'], $c['mailer_sms_auth']); + $messaging->send($message); + } elseif ($c['mailer_sms'] === 'clickatell') { + $message = new SMS( + to: [$data['toSMS']], + content: $data['contentSMS'] + ); + $messaging = new Clickatell($c['mailer_sms_account']); + $messaging->send($message); + } else { + $logger->error("Invalid SMS provider specified"); + } + break; - default: - throw new Exception("Unknown message type: " . $data['type']); - } + default: + throw new Exception("Unknown message type: " . $data['type']); + } - $logger->info("Processed message successfully", ['type' => $data['type']]); - } catch (Exception $e) { - // Increment the retry counter - if (!isset($data['retries'])) { - $data['retries'] = 0; - } - $data['retries']++; + $logger->info("Processed message successfully", ['type' => $data['type']]); + } catch (Exception $e) { + // Increment the retry counter + if (!isset($data['retries'])) { + $data['retries'] = 0; + } + $data['retries']++; - if ($data['retries'] > $maxRetries) { - $logger->error("Message processing failed after maximum retries", [ - 'type' => $data['type'], - 'error' => $e->getMessage() - ]); - } else { - // Requeue the message for a retry - $redis->lPush($retryQueueKey, json_encode($data)); - $logger->warning("Message processing failed; requeued for retry", [ - 'type' => $data['type'], - 'retry' => $data['retries'], - 'error' => $e->getMessage() - ]); - } + if ($data['retries'] > $maxRetries) { + $logger->error("Message processing failed after maximum retries", [ + 'type' => $data['type'], + 'error' => $e->getMessage() + ]); + } else { + // Requeue the message for a retry + $redis->lPush($retryQueueKey, json_encode($data)); + $logger->warning("Message processing failed; requeued for retry", [ + 'type' => $data['type'], + 'retry' => $data['retries'], + 'error' => $e->getMessage() + ]); } } -}); +} \ No newline at end of file