Fixes in the message broker

This commit is contained in:
Pinga 2025-02-06 16:46:16 +02:00
parent 82c8b27f69
commit d9344518a3
3 changed files with 45 additions and 32 deletions

View file

@ -10,6 +10,7 @@
"peppeocchi/php-cron-scheduler": "^4.0", "peppeocchi/php-cron-scheduler": "^4.0",
"predis/predis": "^2.2", "predis/predis": "^2.2",
"guzzlehttp/guzzle": "^7.9", "guzzlehttp/guzzle": "^7.9",
"league/flysystem-ftp": "^3.29" "league/flysystem-ftp": "^3.29",
"phpmailer/phpmailer": "^6.9"
} }
} }

View file

@ -1,3 +1,4 @@
#!/usr/bin/env php
<?php <?php
/** /**
* msg_producer.php * msg_producer.php
@ -6,7 +7,6 @@
* Uses Swooles Coroutine Redis client with a simple connection pool. * Uses Swooles Coroutine Redis client with a simple connection pool.
*/ */
// Enable strict types if you wish
declare(strict_types=1); declare(strict_types=1);
use Swoole\Http\Server; use Swoole\Http\Server;
@ -14,32 +14,19 @@ use Swoole\Http\Request;
use Swoole\Http\Response; use Swoole\Http\Response;
use Swoole\Coroutine\Redis; use Swoole\Coroutine\Redis;
// Autoload Composer dependencies
require __DIR__ . '/vendor/autoload.php'; require __DIR__ . '/vendor/autoload.php';
// Load configuration and helper functions (assumed to be provided)
$c = require_once 'config.php'; $c = require_once 'config.php';
require_once 'helpers.php'; require_once 'helpers.php';
// Set up logger for the producer (adjust log file paths as needed)
$logFilePath = '/var/log/namingo/msg_producer.log'; $logFilePath = '/var/log/namingo/msg_producer.log';
$logger = setupLogger($logFilePath, 'Msg_Producer'); $logger = setupLogger($logFilePath, 'Msg_Producer');
/**
* A simple Redis connection pool using Swoole's Coroutine Channel.
*/
class RedisPool { class RedisPool {
private $pool; private $pool;
private $host; private $host;
private $port; private $port;
/**
* Constructor.
*
* @param string $host
* @param int $port
* @param int $size Number of connections to create
*/
public function __construct(string $host, int $port, int $size = 10) { public function __construct(string $host, int $port, int $size = 10) {
$this->pool = new Swoole\Coroutine\Channel($size); $this->pool = new Swoole\Coroutine\Channel($size);
$this->host = $host; $this->host = $host;
@ -51,6 +38,7 @@ class RedisPool {
*/ */
public function initialize(int $size = 10): void { public function initialize(int $size = 10): void {
for ($i = 0; $i < $size; $i++) { for ($i = 0; $i < $size; $i++) {
// Create a coroutine for each connection.
Swoole\Coroutine::create(function () { Swoole\Coroutine::create(function () {
$redis = new Redis(); $redis = new Redis();
if (!$redis->connect($this->host, $this->port)) { if (!$redis->connect($this->host, $this->port)) {
@ -63,9 +51,14 @@ class RedisPool {
/** /**
* Get a Redis connection from the pool. * Get a Redis connection from the pool.
* Optionally, you can add a timeout to avoid indefinite blocking.
*/ */
public function get(): Redis { public function get(float $timeout = 1.0): Redis {
return $this->pool->pop(); $conn = $this->pool->pop($timeout);
if (!$conn) {
throw new Exception("No available Redis connection in pool");
}
return $conn;
} }
/** /**
@ -76,26 +69,47 @@ class RedisPool {
} }
} }
// Global RedisPool instance
$redisPool = new RedisPool('127.0.0.1', 6379, 10);
// Create the Swoole HTTP server // Create the Swoole HTTP server
$server = new Server("127.0.0.1", 8250); $server = new Server("127.0.0.1", 8250);
// Swoole server settings (adjust daemonize to true when running in production) // Swoole server settings
$server->set([ $server->set([
'daemonize' => true, // set to true for daemon mode 'daemonize' => true,
'log_file' => '/var/log/namingo/msg_producer.log', 'log_file' => '/var/log/namingo/msg_producer.log',
'log_level' => SWOOLE_LOG_INFO, 'log_level' => SWOOLE_LOG_INFO,
'worker_num' => swoole_cpu_num() * 2, 'worker_num' => swoole_cpu_num() * 2,
'pid_file' => '/var/run/msg_producer.pid' 'pid_file' => '/var/run/msg_producer.pid'
]); ]);
// Initialize the Redis pool inside a coroutine-friendly context /**
$server->on("start", function () use (&$redisPool) { * Instead of initializing the Redis pool in the "start" event (which runs in the master process),
$redisPool = new RedisPool('127.0.0.1', 6379, 10); * we initialize it in the "workerStart" event so that it runs in a coroutine-enabled worker process.
*/
$server->on("workerStart", function () use ($redisPool, $logger) {
try {
$redisPool->initialize(10); $redisPool->initialize(10);
$logger->info("Redis pool initialized in worker process");
} catch (Exception $e) {
$logger->error("Failed to initialize Redis pool: " . $e->getMessage());
}
}); });
$server->on("request", function (Request $request, Response $response) use (&$redisPool, $logger) { // Handle incoming requests
// Handle HTTP request and push messages to Redis $server->on("request", function (Request $request, Response $response) use ($redisPool, $logger) {
if (!$redisPool) {
$logger->error("Redis pool not initialized");
$response->status(500);
$response->header('Content-Type', 'application/json');
$response->end(json_encode([
'status' => 'error',
'message' => 'Redis pool not initialized'
]));
return;
}
if (strtoupper($request->server['request_method'] ?? '') !== 'POST') { if (strtoupper($request->server['request_method'] ?? '') !== 'POST') {
$response->status(405); $response->status(405);
$response->header('Content-Type', 'application/json'); $response->header('Content-Type', 'application/json');
@ -106,7 +120,6 @@ $server->on("request", function (Request $request, Response $response) use (&$re
return; return;
} }
// Decode the incoming JSON data
$data = json_decode($request->rawContent(), true); $data = json_decode($request->rawContent(), true);
if (!$data || empty($data['type'])) { if (!$data || empty($data['type'])) {
$response->status(400); $response->status(400);
@ -118,7 +131,6 @@ $server->on("request", function (Request $request, Response $response) use (&$re
return; return;
} }
// Push the message onto the Redis queue
try { try {
$redis = $redisPool->get(); $redis = $redisPool->get();
$redis->lPush('message_queue', json_encode($data)); $redis->lPush('message_queue', json_encode($data));

View file

@ -26,7 +26,7 @@ use Utopia\Messaging\Adapters\SMS\Clickatell;
// Autoload Composer dependencies // Autoload Composer dependencies
require __DIR__ . '/vendor/autoload.php'; require __DIR__ . '/vendor/autoload.php';
// Load configuration and helper functions (assumed to be provided) // Load configuration and helper functions
$c = require_once 'config.php'; $c = require_once 'config.php';
require_once 'helpers.php'; require_once 'helpers.php';
@ -62,7 +62,7 @@ Swoole\Coroutine\run(function() use ($c, $logger, $maxRetries, $retryQueueKey) {
try { try {
// brPop blocks until a message is available. // brPop blocks until a message is available.
// It returns an array: [queueKey, messageData] // It returns an array: [queueKey, messageData]
$result = $redis->brPop(['message_queue', $retryQueueKey], 0); $result = $redis->brPop(['message_queue', $retryQueueKey], 1);
} catch (Exception $e) { } catch (Exception $e) {
$logger->error("Redis error", ['error' => $e->getMessage()]); $logger->error("Redis error", ['error' => $e->getMessage()]);
// Wait before trying to reconnect // Wait before trying to reconnect
@ -111,7 +111,7 @@ Swoole\Coroutine\run(function() use ($c, $logger, $maxRetries, $retryQueueKey) {
} }
$mail->send(); $mail->send();
} catch (PHPMailerException $e) { } catch (PHPMailerException $e) {
throw new Exception("PHPMailer error: " . $e->getMessage()); $logger->error("PHPMailer error: ", ['error' => $e->getMessage()]);
} }
} elseif ($c['mailer'] === 'sendgrid') { } elseif ($c['mailer'] === 'sendgrid') {
$message = new Email( $message = new Email(
@ -132,7 +132,7 @@ Swoole\Coroutine\run(function() use ($c, $logger, $maxRetries, $retryQueueKey) {
$messaging = new Mailgun($c['mailer_api_key'], $c['mailer_domain']); $messaging = new Mailgun($c['mailer_api_key'], $c['mailer_domain']);
$messaging->send($message); $messaging->send($message);
} else { } else {
throw new Exception("Invalid mailer specified"); $logger->error("Invalid mailer specified");
} }
break; break;
@ -173,7 +173,7 @@ Swoole\Coroutine\run(function() use ($c, $logger, $maxRetries, $retryQueueKey) {
$messaging = new Clickatell($c['mailer_sms_account']); $messaging = new Clickatell($c['mailer_sms_account']);
$messaging->send($message); $messaging->send($message);
} else { } else {
throw new Exception("Invalid SMS provider specified"); $logger->error("Invalid SMS provider specified");
} }
break; break;