From d9344518a3bab9c2756f4ed9fa0f2513a40c3f11 Mon Sep 17 00:00:00 2001 From: Pinga <121483313+getpinga@users.noreply.github.com> Date: Thu, 6 Feb 2025 16:46:16 +0200 Subject: [PATCH] Fixes in the message broker --- automation/composer.json | 3 +- automation/msg_producer.php | 64 ++++++++++++++++++++++--------------- automation/msg_worker.php | 10 +++--- 3 files changed, 45 insertions(+), 32 deletions(-) diff --git a/automation/composer.json b/automation/composer.json index 06fb630..7ba4450 100644 --- a/automation/composer.json +++ b/automation/composer.json @@ -10,6 +10,7 @@ "peppeocchi/php-cron-scheduler": "^4.0", "predis/predis": "^2.2", "guzzlehttp/guzzle": "^7.9", - "league/flysystem-ftp": "^3.29" + "league/flysystem-ftp": "^3.29", + "phpmailer/phpmailer": "^6.9" } } diff --git a/automation/msg_producer.php b/automation/msg_producer.php index 7a88d69..c682097 100644 --- a/automation/msg_producer.php +++ b/automation/msg_producer.php @@ -1,3 +1,4 @@ +#!/usr/bin/env php pool = new Swoole\Coroutine\Channel($size); $this->host = $host; @@ -51,6 +38,7 @@ class RedisPool { */ public function initialize(int $size = 10): void { for ($i = 0; $i < $size; $i++) { + // Create a coroutine for each connection. Swoole\Coroutine::create(function () { $redis = new Redis(); if (!$redis->connect($this->host, $this->port)) { @@ -63,9 +51,14 @@ class RedisPool { /** * Get a Redis connection from the pool. + * Optionally, you can add a timeout to avoid indefinite blocking. */ - public function get(): Redis { - return $this->pool->pop(); + public function get(float $timeout = 1.0): Redis { + $conn = $this->pool->pop($timeout); + if (!$conn) { + throw new Exception("No available Redis connection in pool"); + } + return $conn; } /** @@ -76,26 +69,47 @@ class RedisPool { } } +// Global RedisPool instance +$redisPool = new RedisPool('127.0.0.1', 6379, 10); + // Create the Swoole HTTP server $server = new Server("127.0.0.1", 8250); -// Swoole server settings (adjust daemonize to true when running in production) +// Swoole server settings $server->set([ - 'daemonize' => true, // set to true for daemon mode + 'daemonize' => true, 'log_file' => '/var/log/namingo/msg_producer.log', 'log_level' => SWOOLE_LOG_INFO, 'worker_num' => swoole_cpu_num() * 2, 'pid_file' => '/var/run/msg_producer.pid' ]); -// Initialize the Redis pool inside a coroutine-friendly context -$server->on("start", function () use (&$redisPool) { - $redisPool = new RedisPool('127.0.0.1', 6379, 10); - $redisPool->initialize(10); +/** + * Instead of initializing the Redis pool in the "start" event (which runs in the master process), + * we initialize it in the "workerStart" event so that it runs in a coroutine-enabled worker process. + */ +$server->on("workerStart", function () use ($redisPool, $logger) { + try { + $redisPool->initialize(10); + $logger->info("Redis pool initialized in worker process"); + } catch (Exception $e) { + $logger->error("Failed to initialize Redis pool: " . $e->getMessage()); + } }); -$server->on("request", function (Request $request, Response $response) use (&$redisPool, $logger) { - // Handle HTTP request and push messages to Redis +// Handle incoming requests +$server->on("request", function (Request $request, Response $response) use ($redisPool, $logger) { + if (!$redisPool) { + $logger->error("Redis pool not initialized"); + $response->status(500); + $response->header('Content-Type', 'application/json'); + $response->end(json_encode([ + 'status' => 'error', + 'message' => 'Redis pool not initialized' + ])); + return; + } + if (strtoupper($request->server['request_method'] ?? '') !== 'POST') { $response->status(405); $response->header('Content-Type', 'application/json'); @@ -106,7 +120,6 @@ $server->on("request", function (Request $request, Response $response) use (&$re return; } - // Decode the incoming JSON data $data = json_decode($request->rawContent(), true); if (!$data || empty($data['type'])) { $response->status(400); @@ -118,7 +131,6 @@ $server->on("request", function (Request $request, Response $response) use (&$re return; } - // Push the message onto the Redis queue try { $redis = $redisPool->get(); $redis->lPush('message_queue', json_encode($data)); diff --git a/automation/msg_worker.php b/automation/msg_worker.php index cca34b1..d0af089 100644 --- a/automation/msg_worker.php +++ b/automation/msg_worker.php @@ -26,7 +26,7 @@ use Utopia\Messaging\Adapters\SMS\Clickatell; // Autoload Composer dependencies require __DIR__ . '/vendor/autoload.php'; -// Load configuration and helper functions (assumed to be provided) +// Load configuration and helper functions $c = require_once 'config.php'; require_once 'helpers.php'; @@ -62,7 +62,7 @@ Swoole\Coroutine\run(function() use ($c, $logger, $maxRetries, $retryQueueKey) { try { // brPop blocks until a message is available. // It returns an array: [queueKey, messageData] - $result = $redis->brPop(['message_queue', $retryQueueKey], 0); + $result = $redis->brPop(['message_queue', $retryQueueKey], 1); } catch (Exception $e) { $logger->error("Redis error", ['error' => $e->getMessage()]); // Wait before trying to reconnect @@ -111,7 +111,7 @@ Swoole\Coroutine\run(function() use ($c, $logger, $maxRetries, $retryQueueKey) { } $mail->send(); } catch (PHPMailerException $e) { - throw new Exception("PHPMailer error: " . $e->getMessage()); + $logger->error("PHPMailer error: ", ['error' => $e->getMessage()]); } } elseif ($c['mailer'] === 'sendgrid') { $message = new Email( @@ -132,7 +132,7 @@ Swoole\Coroutine\run(function() use ($c, $logger, $maxRetries, $retryQueueKey) { $messaging = new Mailgun($c['mailer_api_key'], $c['mailer_domain']); $messaging->send($message); } else { - throw new Exception("Invalid mailer specified"); + $logger->error("Invalid mailer specified"); } break; @@ -173,7 +173,7 @@ Swoole\Coroutine\run(function() use ($c, $logger, $maxRetries, $retryQueueKey) { $messaging = new Clickatell($c['mailer_sms_account']); $messaging->send($message); } else { - throw new Exception("Invalid SMS provider specified"); + $logger->error("Invalid SMS provider specified"); } break;