Improved Message Broker stability

This commit is contained in:
Pinga 2025-02-06 14:40:28 +02:00
parent f6752b0d26
commit 82c8b27f69
7 changed files with 502 additions and 186 deletions

View file

@ -1,47 +1,148 @@
<?php
/**
* msg_producer.php
*
* A Swoole HTTP server that accepts API calls and pushes messages into a Redis queue.
* Uses Swooles Coroutine Redis client with a simple connection pool.
*/
require __DIR__ . '/vendor/autoload.php';
// Enable strict types if you wish
declare(strict_types=1);
use Predis\Client as RedisClient;
use Swoole\Http\Server;
use Swoole\Http\Request;
use Swoole\Http\Response;
use Swoole\Coroutine\Redis;
// Autoload Composer dependencies
require __DIR__ . '/vendor/autoload.php';
// Load configuration and helper functions (assumed to be provided)
$c = require_once 'config.php';
require_once 'helpers.php';
$redis = new RedisClient([
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
]);
$server = new Server("127.0.0.1", 8250);
$server->set([
'daemonize' => false,
'log_file' => '/var/log/namingo/msg_producer_app.log',
'log_level' => SWOOLE_LOG_INFO,
'worker_num' => swoole_cpu_num() * 2,
'pid_file' => '/var/run/msg_producer.pid'
]);
// Set up logger for the producer (adjust log file paths as needed)
$logFilePath = '/var/log/namingo/msg_producer.log';
$log = setupLogger($logFilePath, 'Msg_Producer');
$log->info('job started.');
$logger = setupLogger($logFilePath, 'Msg_Producer');
$server->on("request", function (Request $request, Response $response) use ($redis) {
$data = json_decode($request->rawContent(), true);
/**
* A simple Redis connection pool using Swoole's Coroutine Channel.
*/
class RedisPool {
private $pool;
private $host;
private $port;
if (!$data || !isset($data['type'])) {
$response->end(json_encode(['status' => 'error', 'message' => 'Invalid request']));
/**
* Constructor.
*
* @param string $host
* @param int $port
* @param int $size Number of connections to create
*/
public function __construct(string $host, int $port, int $size = 10) {
$this->pool = new Swoole\Coroutine\Channel($size);
$this->host = $host;
$this->port = $port;
}
/**
* Initialize pool inside a coroutine context.
*/
public function initialize(int $size = 10): void {
for ($i = 0; $i < $size; $i++) {
Swoole\Coroutine::create(function () {
$redis = new Redis();
if (!$redis->connect($this->host, $this->port)) {
throw new Exception("Failed to connect to Redis at {$this->host}:{$this->port}");
}
$this->pool->push($redis);
});
}
}
/**
* Get a Redis connection from the pool.
*/
public function get(): Redis {
return $this->pool->pop();
}
/**
* Return a Redis connection back to the pool.
*/
public function put(Redis $redis): void {
$this->pool->push($redis);
}
}
// Create the Swoole HTTP server
$server = new Server("127.0.0.1", 8250);
// Swoole server settings (adjust daemonize to true when running in production)
$server->set([
'daemonize' => true, // set to true for daemon mode
'log_file' => '/var/log/namingo/msg_producer.log',
'log_level' => SWOOLE_LOG_INFO,
'worker_num' => swoole_cpu_num() * 2,
'pid_file' => '/var/run/msg_producer.pid'
]);
// Initialize the Redis pool inside a coroutine-friendly context
$server->on("start", function () use (&$redisPool) {
$redisPool = new RedisPool('127.0.0.1', 6379, 10);
$redisPool->initialize(10);
});
$server->on("request", function (Request $request, Response $response) use (&$redisPool, $logger) {
// Handle HTTP request and push messages to Redis
if (strtoupper($request->server['request_method'] ?? '') !== 'POST') {
$response->status(405);
$response->header('Content-Type', 'application/json');
$response->end(json_encode([
'status' => 'error',
'message' => 'Method Not Allowed'
]));
return;
}
// Enqueue the message
$redis->lpush('message_queue', json_encode($data));
$response->end(json_encode(['status' => 'success', 'message' => 'Message queued for delivery']));
// Decode the incoming JSON data
$data = json_decode($request->rawContent(), true);
if (!$data || empty($data['type'])) {
$response->status(400);
$response->header('Content-Type', 'application/json');
$response->end(json_encode([
'status' => 'error',
'message' => 'Invalid request: missing JSON data or "type" field'
]));
return;
}
// Push the message onto the Redis queue
try {
$redis = $redisPool->get();
$redis->lPush('message_queue', json_encode($data));
$redisPool->put($redis);
$logger->info("Message queued", ['data' => $data]);
$response->header('Content-Type', 'application/json');
$response->end(json_encode([
'status' => 'success',
'message' => 'Message queued for delivery'
]));
} catch (Exception $e) {
$logger->error("Failed to queue message", ['error' => $e->getMessage()]);
$response->status(500);
$response->header('Content-Type', 'application/json');
$response->end(json_encode([
'status' => 'error',
'message' => 'Internal Server Error'
]));
}
});
// Start the server
$logger->info("Starting msg_producer server on 127.0.0.1:8250");
$server->start();
/* USAGE

View file

@ -1,176 +1,208 @@
<?php
/**
* msg_worker.php
*
* A worker script that continuously pulls messages from a Redis queue and processes them.
* Uses Swoole's coroutine runtime and Coroutine Redis client.
*/
// Enable strict types if desired
declare(strict_types=1);
use Swoole\Coroutine;
use Swoole\Coroutine\Redis;
use PHPMailer\PHPMailer\PHPMailer;
use PHPMailer\PHPMailer\Exception as PHPMailerException;
use Utopia\Messaging\Messages\Email;
use Utopia\Messaging\Adapters\Email\SendGrid;
use Utopia\Messaging\Adapters\Email\Mailgun;
use Utopia\Messaging\Messages\SMS;
use Utopia\Messaging\Adapters\SMS\Twilio;
use Utopia\Messaging\Adapters\SMS\Telesign;
use Utopia\Messaging\Adapters\SMS\Plivo;
use Utopia\Messaging\Adapters\SMS\Vonage;
use Utopia\Messaging\Adapters\SMS\Clickatell;
// Autoload Composer dependencies
require __DIR__ . '/vendor/autoload.php';
use Predis\Client as RedisClient;
use Predis\Connection\ConnectionException;
use PHPMailer\PHPMailer\PHPMailer;
use PHPMailer\PHPMailer\Exception;
use \Utopia\Messaging\Messages\Email;
use \Utopia\Messaging\Adapters\Email\SendGrid;
use \Utopia\Messaging\Adapters\Email\Mailgun;
use \Utopia\Messaging\Messages\SMS;
use \Utopia\Messaging\Adapters\SMS\Twilio;
use \Utopia\Messaging\Adapters\SMS\Telesign;
use \Utopia\Messaging\Adapters\SMS\Plivo;
use \Utopia\Messaging\Adapters\SMS\Vonage;
use \Utopia\Messaging\Adapters\SMS\Clickatell;
// Load configuration and helper functions (assumed to be provided)
$c = require_once 'config.php';
require_once 'helpers.php';
// Setup initial Redis client
function setupRedisClient() {
return new RedisClient([
'scheme' => 'tcp',
'host' => '127.0.0.1',
'port' => 6379,
'persistent' => true,
]);
}
$redis = setupRedisClient();
// Set up logger for the worker
$logFilePath = '/var/log/namingo/msg_worker.log';
$log = setupLogger($logFilePath, 'Msg_Worker');
$log->info('job started.');
$logger = setupLogger($logFilePath, 'Msg_Worker');
// Maximum number of retries for a message
$maxRetries = 3;
// Key for retry messages to avoid infinite loops
$maxRetries = 3;
$retryQueueKey = 'message_queue_retry';
while (true) {
try {
$rawData = $redis->brpop(['message_queue', $retryQueueKey], 0);
} catch (ConnectionException $e) {
//$log->error("Redis connection lost. Attempting to reconnect.", ['error' => $e->getMessage()]);
sleep(5); // Pause before retrying to avoid flooding logs and giving time to recover
$redis = setupRedisClient(); // Attempt to reconnect
continue; // Skip the current iteration and try again
/**
* Creates and returns a new Coroutine Redis connection.
*
* @return Redis
* @throws Exception if connection fails.
*/
function connectRedis(): Redis {
$redis = new Redis();
$ret = $redis->connect('127.0.0.1', 6379);
if (!$ret) {
throw new Exception("Failed to connect to Redis");
}
return $redis;
}
if (!$rawData) {
continue; // In case of an empty or failed read, continue to the next iteration
}
// Run the worker inside Swoole's coroutine runtime.
Swoole\Coroutine\run(function() use ($c, $logger, $maxRetries, $retryQueueKey) {
$redis = connectRedis();
$logger->info("Worker started, waiting for messages...");
$queueKey = $rawData[0];
$data = json_decode($rawData[1], true);
while (true) {
try {
// brPop blocks until a message is available.
// It returns an array: [queueKey, messageData]
$result = $redis->brPop(['message_queue', $retryQueueKey], 0);
} catch (Exception $e) {
$logger->error("Redis error", ['error' => $e->getMessage()]);
// Wait before trying to reconnect
Coroutine::sleep(5);
try {
$redis = connectRedis();
} catch (Exception $ex) {
$logger->error("Redis reconnection failed", ['error' => $ex->getMessage()]);
continue;
}
continue;
}
try {
switch ($data['type']) {
case 'sendmail':
if ($c['mailer'] == 'phpmailer') {
$mail = new PHPMailer(true);
try {
$mail->isSMTP();
$mail->Host = $c['mailer_smtp_host'];
$mail->SMTPAuth = true;
$mail->Username = $c['mailer_smtp_username'];
$mail->Password = $c['mailer_smtp_password'];
$mail->SMTPSecure = 'tls';
if (substr($data['body'], 0, 2) === "<!") {
$mail->isHTML(true);
if (!$result) {
continue;
}
// Decode the message data
$queueKey = $result[0];
$data = json_decode($result[1], true);
if (!$data) {
$logger->warning("Received invalid message from Redis", ['raw' => $result[1]]);
continue;
}
try {
switch ($data['type']) {
case 'sendmail':
if ($c['mailer'] === 'phpmailer') {
$mail = new PHPMailer(true);
try {
$mail->isSMTP();
$mail->Host = $c['mailer_smtp_host'];
$mail->SMTPAuth = true;
$mail->Username = $c['mailer_smtp_username'];
$mail->Password = $c['mailer_smtp_password'];
$mail->SMTPSecure = 'tls';
$mail->Port = $c['mailer_smtp_port'];
$mail->setFrom($c['mailer_from']);
$mail->addAddress($data['toEmail']);
$mail->Subject = $data['subject'];
$mail->Body = $data['body'];
if (substr($data['body'], 0, 2) === "<!") {
$mail->isHTML(true);
$mail->AltBody = strip_tags($data['body']);
}
$mail->send();
} catch (PHPMailerException $e) {
throw new Exception("PHPMailer error: " . $e->getMessage());
}
$mail->Port = $c['mailer_smtp_port'];
$mail->setFrom($c['mailer_from']);
$mail->addAddress($data['toEmail']);
$mail->Subject = $data['subject'];
$mail->Body = $data['body'];
if (substr($data['body'], 0, 2) === "<!") {
$mail->AltBody = $data['body'];
}
$mail->send();
} catch (Exception $e) {
throw new Exception("Failed to send email");
} elseif ($c['mailer'] === 'sendgrid') {
$message = new Email(
from: [$c['mailer_from']],
to: [$data['toEmail']],
subject: $data['subject'],
content: $data['body']
);
$messaging = new SendGrid($c['mailer_api_key']);
$messaging->send($message);
} elseif ($c['mailer'] === 'mailgun') {
$message = new Email(
from: [$c['mailer_from']],
to: [$data['toEmail']],
subject: $data['subject'],
content: $data['body']
);
$messaging = new Mailgun($c['mailer_api_key'], $c['mailer_domain']);
$messaging->send($message);
} else {
throw new Exception("Invalid mailer specified");
}
} elseif ($c['mailer'] == 'sendgrid') {
$message = new Email(
from: [$c['mailer_from']],
to: [$data['toEmail']],
subject: $data['subject'],
content: $data['body']
);
$messaging = new Sendgrid($c['mailer_api_key']);
$messaging->send($message);
} elseif ($c['mailer'] == 'mailgun') {
$message = new Email(
from: [$c['mailer_from']],
to: [$data['toEmail']],
subject: $data['subject'],
content: $data['body']
);
$messaging = new Mailgun($c['mailer_api_key'], $c['mailer_domain']);
$messaging->send($message);
} else {
throw new Exception("Invalid mailer specified");
}
break;
case 'sendsms':
if ($c['mailer_sms'] == 'twilio') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Twilio($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] == 'telesign') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Telesign($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] == 'plivo') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Plivo($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] == 'vonage') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Vonage($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] == 'clickatell') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Clickatell($c['mailer_sms_account']);
$messaging->send($message);
} else {
throw new Exception("Invalid SMS provider specified");
}
break;
break;
default:
throw new Exception("Unknown action");
}
$log->info("Processed message successfully", ['type' => $data['type']]);
} catch (Exception $e) {
// Check if this message has been retried too many times
if (!isset($data['retries'])) {
$data['retries'] = 0;
}
$data['retries']++;
if ($data['retries'] > $maxRetries) {
// Log failure after exceeding retries
$log->error("Message processing failed after retries", ['type' => $data['type'], 'error' => $e->getMessage()]);
} else {
// Re-queue the message for retry
$redis->lpush($retryQueueKey, json_encode($data));
$log->warning("Message processing failed, retrying", ['type' => $data['type'], 'retry' => $data['retries'], 'error' => $e->getMessage()]);
case 'sendsms':
if ($c['mailer_sms'] === 'twilio') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Twilio($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] === 'telesign') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Telesign($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] === 'plivo') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Plivo($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] === 'vonage') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Vonage($c['mailer_sms_account'], $c['mailer_sms_auth']);
$messaging->send($message);
} elseif ($c['mailer_sms'] === 'clickatell') {
$message = new SMS(
to: [$data['toSMS']],
content: $data['contentSMS']
);
$messaging = new Clickatell($c['mailer_sms_account']);
$messaging->send($message);
} else {
throw new Exception("Invalid SMS provider specified");
}
break;
default:
throw new Exception("Unknown message type: " . $data['type']);
}
$logger->info("Processed message successfully", ['type' => $data['type']]);
} catch (Exception $e) {
// Increment the retry counter
if (!isset($data['retries'])) {
$data['retries'] = 0;
}
$data['retries']++;
if ($data['retries'] > $maxRetries) {
$logger->error("Message processing failed after maximum retries", [
'type' => $data['type'],
'error' => $e->getMessage()
]);
} else {
// Requeue the message for a retry
$redis->lPush($retryQueueKey, json_encode($data));
$logger->warning("Message processing failed; requeued for retry", [
'type' => $data['type'],
'retry' => $data['retries'],
'error' => $e->getMessage()
]);
}
}
}
}
});