From 82c8b27f697d1b7739c870faffb94e6fed901eca Mon Sep 17 00:00:00 2001 From: Pinga <121483313+getpinga@users.noreply.github.com> Date: Thu, 6 Feb 2025 14:40:28 +0200 Subject: [PATCH] Improved Message Broker stability --- automation/msg_producer.php | 153 +++++++++++++--- automation/msg_worker.php | 342 ++++++++++++++++++++---------------- docs/configuration.md | 6 +- docs/install.sh | 9 +- docs/msg_producer.service | 22 +++ docs/msg_worker.service | 19 ++ docs/update1013.sh | 137 +++++++++++++++ 7 files changed, 502 insertions(+), 186 deletions(-) create mode 100644 docs/msg_producer.service create mode 100644 docs/msg_worker.service create mode 100644 docs/update1013.sh diff --git a/automation/msg_producer.php b/automation/msg_producer.php index 6b4a0e7..7a88d69 100644 --- a/automation/msg_producer.php +++ b/automation/msg_producer.php @@ -1,47 +1,148 @@ 'tcp', - 'host' => '127.0.0.1', - 'port' => 6379, -]); - -$server = new Server("127.0.0.1", 8250); -$server->set([ - 'daemonize' => false, - 'log_file' => '/var/log/namingo/msg_producer_app.log', - 'log_level' => SWOOLE_LOG_INFO, - 'worker_num' => swoole_cpu_num() * 2, - 'pid_file' => '/var/run/msg_producer.pid' -]); +// Set up logger for the producer (adjust log file paths as needed) $logFilePath = '/var/log/namingo/msg_producer.log'; -$log = setupLogger($logFilePath, 'Msg_Producer'); -$log->info('job started.'); +$logger = setupLogger($logFilePath, 'Msg_Producer'); -$server->on("request", function (Request $request, Response $response) use ($redis) { - $data = json_decode($request->rawContent(), true); +/** + * A simple Redis connection pool using Swoole's Coroutine Channel. + */ +class RedisPool { + private $pool; + private $host; + private $port; - if (!$data || !isset($data['type'])) { - $response->end(json_encode(['status' => 'error', 'message' => 'Invalid request'])); + /** + * Constructor. + * + * @param string $host + * @param int $port + * @param int $size Number of connections to create + */ + public function __construct(string $host, int $port, int $size = 10) { + $this->pool = new Swoole\Coroutine\Channel($size); + $this->host = $host; + $this->port = $port; + } + + /** + * Initialize pool inside a coroutine context. + */ + public function initialize(int $size = 10): void { + for ($i = 0; $i < $size; $i++) { + Swoole\Coroutine::create(function () { + $redis = new Redis(); + if (!$redis->connect($this->host, $this->port)) { + throw new Exception("Failed to connect to Redis at {$this->host}:{$this->port}"); + } + $this->pool->push($redis); + }); + } + } + + /** + * Get a Redis connection from the pool. + */ + public function get(): Redis { + return $this->pool->pop(); + } + + /** + * Return a Redis connection back to the pool. + */ + public function put(Redis $redis): void { + $this->pool->push($redis); + } +} + +// Create the Swoole HTTP server +$server = new Server("127.0.0.1", 8250); + +// Swoole server settings (adjust daemonize to true when running in production) +$server->set([ + 'daemonize' => true, // set to true for daemon mode + 'log_file' => '/var/log/namingo/msg_producer.log', + 'log_level' => SWOOLE_LOG_INFO, + 'worker_num' => swoole_cpu_num() * 2, + 'pid_file' => '/var/run/msg_producer.pid' +]); + +// Initialize the Redis pool inside a coroutine-friendly context +$server->on("start", function () use (&$redisPool) { + $redisPool = new RedisPool('127.0.0.1', 6379, 10); + $redisPool->initialize(10); +}); + +$server->on("request", function (Request $request, Response $response) use (&$redisPool, $logger) { + // Handle HTTP request and push messages to Redis + if (strtoupper($request->server['request_method'] ?? '') !== 'POST') { + $response->status(405); + $response->header('Content-Type', 'application/json'); + $response->end(json_encode([ + 'status' => 'error', + 'message' => 'Method Not Allowed' + ])); return; } - // Enqueue the message - $redis->lpush('message_queue', json_encode($data)); - - $response->end(json_encode(['status' => 'success', 'message' => 'Message queued for delivery'])); + // Decode the incoming JSON data + $data = json_decode($request->rawContent(), true); + if (!$data || empty($data['type'])) { + $response->status(400); + $response->header('Content-Type', 'application/json'); + $response->end(json_encode([ + 'status' => 'error', + 'message' => 'Invalid request: missing JSON data or "type" field' + ])); + return; + } + + // Push the message onto the Redis queue + try { + $redis = $redisPool->get(); + $redis->lPush('message_queue', json_encode($data)); + $redisPool->put($redis); + + $logger->info("Message queued", ['data' => $data]); + $response->header('Content-Type', 'application/json'); + $response->end(json_encode([ + 'status' => 'success', + 'message' => 'Message queued for delivery' + ])); + } catch (Exception $e) { + $logger->error("Failed to queue message", ['error' => $e->getMessage()]); + $response->status(500); + $response->header('Content-Type', 'application/json'); + $response->end(json_encode([ + 'status' => 'error', + 'message' => 'Internal Server Error' + ])); + } }); +// Start the server +$logger->info("Starting msg_producer server on 127.0.0.1:8250"); $server->start(); /* USAGE diff --git a/automation/msg_worker.php b/automation/msg_worker.php index d214bf6..cca34b1 100644 --- a/automation/msg_worker.php +++ b/automation/msg_worker.php @@ -1,176 +1,208 @@ 'tcp', - 'host' => '127.0.0.1', - 'port' => 6379, - 'persistent' => true, - ]); -} - -$redis = setupRedisClient(); - +// Set up logger for the worker $logFilePath = '/var/log/namingo/msg_worker.log'; -$log = setupLogger($logFilePath, 'Msg_Worker'); -$log->info('job started.'); +$logger = setupLogger($logFilePath, 'Msg_Worker'); // Maximum number of retries for a message -$maxRetries = 3; -// Key for retry messages to avoid infinite loops +$maxRetries = 3; $retryQueueKey = 'message_queue_retry'; -while (true) { - try { - $rawData = $redis->brpop(['message_queue', $retryQueueKey], 0); - } catch (ConnectionException $e) { - //$log->error("Redis connection lost. Attempting to reconnect.", ['error' => $e->getMessage()]); - sleep(5); // Pause before retrying to avoid flooding logs and giving time to recover - $redis = setupRedisClient(); // Attempt to reconnect - continue; // Skip the current iteration and try again +/** + * Creates and returns a new Coroutine Redis connection. + * + * @return Redis + * @throws Exception if connection fails. + */ +function connectRedis(): Redis { + $redis = new Redis(); + $ret = $redis->connect('127.0.0.1', 6379); + if (!$ret) { + throw new Exception("Failed to connect to Redis"); } + return $redis; +} - if (!$rawData) { - continue; // In case of an empty or failed read, continue to the next iteration - } +// Run the worker inside Swoole's coroutine runtime. +Swoole\Coroutine\run(function() use ($c, $logger, $maxRetries, $retryQueueKey) { + $redis = connectRedis(); + $logger->info("Worker started, waiting for messages..."); - $queueKey = $rawData[0]; - $data = json_decode($rawData[1], true); + while (true) { + try { + // brPop blocks until a message is available. + // It returns an array: [queueKey, messageData] + $result = $redis->brPop(['message_queue', $retryQueueKey], 0); + } catch (Exception $e) { + $logger->error("Redis error", ['error' => $e->getMessage()]); + // Wait before trying to reconnect + Coroutine::sleep(5); + try { + $redis = connectRedis(); + } catch (Exception $ex) { + $logger->error("Redis reconnection failed", ['error' => $ex->getMessage()]); + continue; + } + continue; + } - try { - switch ($data['type']) { - case 'sendmail': - if ($c['mailer'] == 'phpmailer') { - $mail = new PHPMailer(true); - try { - $mail->isSMTP(); - $mail->Host = $c['mailer_smtp_host']; - $mail->SMTPAuth = true; - $mail->Username = $c['mailer_smtp_username']; - $mail->Password = $c['mailer_smtp_password']; - $mail->SMTPSecure = 'tls'; - if (substr($data['body'], 0, 2) === "isHTML(true); + if (!$result) { + continue; + } + + // Decode the message data + $queueKey = $result[0]; + $data = json_decode($result[1], true); + if (!$data) { + $logger->warning("Received invalid message from Redis", ['raw' => $result[1]]); + continue; + } + + try { + switch ($data['type']) { + case 'sendmail': + if ($c['mailer'] === 'phpmailer') { + $mail = new PHPMailer(true); + try { + $mail->isSMTP(); + $mail->Host = $c['mailer_smtp_host']; + $mail->SMTPAuth = true; + $mail->Username = $c['mailer_smtp_username']; + $mail->Password = $c['mailer_smtp_password']; + $mail->SMTPSecure = 'tls'; + $mail->Port = $c['mailer_smtp_port']; + $mail->setFrom($c['mailer_from']); + $mail->addAddress($data['toEmail']); + $mail->Subject = $data['subject']; + $mail->Body = $data['body']; + if (substr($data['body'], 0, 2) === "isHTML(true); + $mail->AltBody = strip_tags($data['body']); + } + $mail->send(); + } catch (PHPMailerException $e) { + throw new Exception("PHPMailer error: " . $e->getMessage()); } - $mail->Port = $c['mailer_smtp_port']; - $mail->setFrom($c['mailer_from']); - $mail->addAddress($data['toEmail']); - $mail->Subject = $data['subject']; - $mail->Body = $data['body']; - if (substr($data['body'], 0, 2) === "AltBody = $data['body']; - } - $mail->send(); - } catch (Exception $e) { - throw new Exception("Failed to send email"); + } elseif ($c['mailer'] === 'sendgrid') { + $message = new Email( + from: [$c['mailer_from']], + to: [$data['toEmail']], + subject: $data['subject'], + content: $data['body'] + ); + $messaging = new SendGrid($c['mailer_api_key']); + $messaging->send($message); + } elseif ($c['mailer'] === 'mailgun') { + $message = new Email( + from: [$c['mailer_from']], + to: [$data['toEmail']], + subject: $data['subject'], + content: $data['body'] + ); + $messaging = new Mailgun($c['mailer_api_key'], $c['mailer_domain']); + $messaging->send($message); + } else { + throw new Exception("Invalid mailer specified"); } - } elseif ($c['mailer'] == 'sendgrid') { - $message = new Email( - from: [$c['mailer_from']], - to: [$data['toEmail']], - subject: $data['subject'], - content: $data['body'] - ); - $messaging = new Sendgrid($c['mailer_api_key']); - $messaging->send($message); - } elseif ($c['mailer'] == 'mailgun') { - $message = new Email( - from: [$c['mailer_from']], - to: [$data['toEmail']], - subject: $data['subject'], - content: $data['body'] - ); - $messaging = new Mailgun($c['mailer_api_key'], $c['mailer_domain']); - $messaging->send($message); - } else { - throw new Exception("Invalid mailer specified"); - } - break; - - case 'sendsms': - if ($c['mailer_sms'] == 'twilio') { - $message = new SMS( - to: [$data['toSMS']], - content: $data['contentSMS'] - ); - $messaging = new Twilio($c['mailer_sms_account'], $c['mailer_sms_auth']); - $messaging->send($message); - } elseif ($c['mailer_sms'] == 'telesign') { - $message = new SMS( - to: [$data['toSMS']], - content: $data['contentSMS'] - ); - $messaging = new Telesign($c['mailer_sms_account'], $c['mailer_sms_auth']); - $messaging->send($message); - } elseif ($c['mailer_sms'] == 'plivo') { - $message = new SMS( - to: [$data['toSMS']], - content: $data['contentSMS'] - ); - $messaging = new Plivo($c['mailer_sms_account'], $c['mailer_sms_auth']); - $messaging->send($message); - } elseif ($c['mailer_sms'] == 'vonage') { - $message = new SMS( - to: [$data['toSMS']], - content: $data['contentSMS'] - ); - $messaging = new Vonage($c['mailer_sms_account'], $c['mailer_sms_auth']); - $messaging->send($message); - } elseif ($c['mailer_sms'] == 'clickatell') { - $message = new SMS( - to: [$data['toSMS']], - content: $data['contentSMS'] - ); - $messaging = new Clickatell($c['mailer_sms_account']); - $messaging->send($message); - } else { - throw new Exception("Invalid SMS provider specified"); - } - break; + break; - default: - throw new Exception("Unknown action"); - } - - $log->info("Processed message successfully", ['type' => $data['type']]); - } catch (Exception $e) { - // Check if this message has been retried too many times - if (!isset($data['retries'])) { - $data['retries'] = 0; - } - - $data['retries']++; - - if ($data['retries'] > $maxRetries) { - // Log failure after exceeding retries - $log->error("Message processing failed after retries", ['type' => $data['type'], 'error' => $e->getMessage()]); - } else { - // Re-queue the message for retry - $redis->lpush($retryQueueKey, json_encode($data)); - $log->warning("Message processing failed, retrying", ['type' => $data['type'], 'retry' => $data['retries'], 'error' => $e->getMessage()]); + case 'sendsms': + if ($c['mailer_sms'] === 'twilio') { + $message = new SMS( + to: [$data['toSMS']], + content: $data['contentSMS'] + ); + $messaging = new Twilio($c['mailer_sms_account'], $c['mailer_sms_auth']); + $messaging->send($message); + } elseif ($c['mailer_sms'] === 'telesign') { + $message = new SMS( + to: [$data['toSMS']], + content: $data['contentSMS'] + ); + $messaging = new Telesign($c['mailer_sms_account'], $c['mailer_sms_auth']); + $messaging->send($message); + } elseif ($c['mailer_sms'] === 'plivo') { + $message = new SMS( + to: [$data['toSMS']], + content: $data['contentSMS'] + ); + $messaging = new Plivo($c['mailer_sms_account'], $c['mailer_sms_auth']); + $messaging->send($message); + } elseif ($c['mailer_sms'] === 'vonage') { + $message = new SMS( + to: [$data['toSMS']], + content: $data['contentSMS'] + ); + $messaging = new Vonage($c['mailer_sms_account'], $c['mailer_sms_auth']); + $messaging->send($message); + } elseif ($c['mailer_sms'] === 'clickatell') { + $message = new SMS( + to: [$data['toSMS']], + content: $data['contentSMS'] + ); + $messaging = new Clickatell($c['mailer_sms_account']); + $messaging->send($message); + } else { + throw new Exception("Invalid SMS provider specified"); + } + break; + + default: + throw new Exception("Unknown message type: " . $data['type']); + } + + $logger->info("Processed message successfully", ['type' => $data['type']]); + } catch (Exception $e) { + // Increment the retry counter + if (!isset($data['retries'])) { + $data['retries'] = 0; + } + $data['retries']++; + + if ($data['retries'] > $maxRetries) { + $logger->error("Message processing failed after maximum retries", [ + 'type' => $data['type'], + 'error' => $e->getMessage() + ]); + } else { + // Requeue the message for a retry + $redis->lPush($retryQueueKey, json_encode($data)); + $logger->warning("Message processing failed; requeued for retry", [ + 'type' => $data['type'], + 'retry' => $data['retries'], + 'error' => $e->getMessage() + ]); + } } } -} \ No newline at end of file +}); diff --git a/docs/configuration.md b/docs/configuration.md index 9a1bc70..64117d0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -113,12 +113,10 @@ For establishing your own mail server, both [Mox](https://github.com/mjl-/mox) a To run the Message Broker, execute the following commands: ```bash -/usr/bin/php /opt/registry/automation/msg_producer.php & -/usr/bin/php /opt/registry/automation/msg_worker.php & +systemctl start msg_producer +systemctl start msg_worker ``` -This will start the system and place it in the background, allowing it to run independently of your current terminal session. - #### 1.4.4. Setting Up an Audit Trail Database for Namingo To create an audit trail database for Namingo, start by editing the configuration file located at `/opt/registry/automation/audit.json` with the correct database details. This includes specifying the database connection parameters such as host, username, and password. Once your configuration is set up, run the command: diff --git a/docs/install.sh b/docs/install.sh index bd16fba..28ae945 100644 --- a/docs/install.sh +++ b/docs/install.sh @@ -455,7 +455,14 @@ EOF cp /opt/registry/docs/das.service /etc/systemd/system/ systemctl daemon-reload systemctl enable das.service - + + echo "Installing Message Broker." + cp /opt/registry/docs/msg_producer.service /etc/systemd/system/ + cp /opt/registry/docs/msg_worker.service /etc/systemd/system/ + systemctl daemon-reload + systemctl enable msg_producer + systemctl enable msg_worker + echo "Configuring control panel admin." sed -i "s|\$email = 'admin@example.com';|\$email = '$PANEL_EMAIL';|g" /var/www/cp/bin/create_admin_user.php sed -i "s|\$newPW = 'admin_password';|\$newPW = '$PANEL_PASSWORD';|g" /var/www/cp/bin/create_admin_user.php diff --git a/docs/msg_producer.service b/docs/msg_producer.service new file mode 100644 index 0000000..e204cd3 --- /dev/null +++ b/docs/msg_producer.service @@ -0,0 +1,22 @@ +[Unit] +Description=Message Producer Service +After=network.target redis.service +Requires=redis.service + +[Service] +Type=forking +User=root +Group=root +Restart=on-failure +ExecStart=/usr/bin/php /opt/registry/automation/msg_producer.php +ExecReload=/bin/kill -HUP $MAINPID +ExecStop=/bin/kill -INT $MAINPID +WorkingDirectory=/opt/registry/automation +StandardOutput=syslog +StandardError=syslog +SyslogIdentifier=msg_producer +PIDFile=/var/run/msg_producer.pid +LimitNOFILE=100000 + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/docs/msg_worker.service b/docs/msg_worker.service new file mode 100644 index 0000000..1c62b25 --- /dev/null +++ b/docs/msg_worker.service @@ -0,0 +1,19 @@ +[Unit] +Description=Message Worker Service +After=network.target + +[Service] +Type=simple +User=root +Group=root +Restart=on-failure +ExecStart=/usr/bin/php /opt/registry/automation/msg_worker.php +ExecReload=/bin/kill -HUP $MAINPID +ExecStop=/bin/kill -INT $MAINPID +WorkingDirectory=/opt/registry/automation +StandardOutput=syslog +StandardError=syslog +SyslogIdentifier=msg_worker + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/docs/update1013.sh b/docs/update1013.sh new file mode 100644 index 0000000..0f999eb --- /dev/null +++ b/docs/update1013.sh @@ -0,0 +1,137 @@ +#!/bin/bash + +# Prompt the user for confirmation +echo "This will update Namingo Registry from v1.0.12 to v1.0.13." +echo "Make sure you have a backup of the database, /var/www/cp, and /opt/registry." +read -p "Are you sure you want to proceed? (y/n): " confirm + +# Check user input +if [[ "$confirm" != "y" ]]; then + echo "Upgrade aborted." + exit 0 +fi + +# Create backup directory +backup_dir="/opt/backup" +mkdir -p "$backup_dir" + +# Backup directories +echo "Creating backups..." +tar -czf "$backup_dir/cp_backup_$(date +%F).tar.gz" -C / var/www/cp +tar -czf "$backup_dir/whois_backup_$(date +%F).tar.gz" -C / var/www/whois +tar -czf "$backup_dir/registry_backup_$(date +%F).tar.gz" -C / opt/registry + +# Database credentials +config_file="/opt/registry/whois/port43/config.php" +db_user=$(grep "'db_username'" "$config_file" | awk -F "=> '" '{print $2}' | sed "s/',//") +db_pass=$(grep "'db_password'" "$config_file" | awk -F "=> '" '{print $2}' | sed "s/',//") +db_host=$(grep "'db_host'" "$config_file" | awk -F "=> '" '{print $2}' | sed "s/',//") + +# List of databases to back up +databases=("registry" "registryAudit" "registryTransaction") + +# Backup specific databases +for db_name in "${databases[@]}"; do + echo "Backing up database $db_name..." + sql_backup_file="$backup_dir/db_${db_name}_backup_$(date +%F).sql" + mysqldump -u"$db_user" -p"$db_pass" -h"$db_host" "$db_name" > "$sql_backup_file" + + # Compress the SQL backup file + echo "Compressing database backup $db_name..." + tar -czf "${sql_backup_file}.tar.gz" -C "$backup_dir" "$(basename "$sql_backup_file")" + + # Remove the uncompressed SQL file + rm "$sql_backup_file" +done + +# Stop services +echo "Stopping services..." +systemctl stop caddy +systemctl stop epp +systemctl stop whois +systemctl stop rdap +systemctl stop das + +# Clear cache +echo "Clearing cache..." +php /var/www/cp/bin/clear_cache.php + +# Clone the new version of the repository +echo "Cloning v1.0.13 from the repository..." +git clone --branch v1.0.13 --single-branch https://github.com/getnamingo/registry /opt/registry1013 + +# Copy files from the new version to the appropriate directories +echo "Copying files..." + +# Function to copy files and maintain directory structure +copy_files() { + src_dir=$1 + dest_dir=$2 + + if [[ -d "$src_dir" ]]; then + echo "Copying from $src_dir to $dest_dir..." + cp -R "$src_dir/." "$dest_dir/" + else + echo "Source directory $src_dir does not exist. Skipping..." + fi +} + +# Copy specific directories +copy_files "/opt/registry1013/automation" "/opt/registry/automation" +copy_files "/opt/registry1013/cp" "/var/www/cp" +copy_files "/opt/registry1013/whois/web" "/var/www/whois" +copy_files "/opt/registry1013/das" "/opt/registry/das" +copy_files "/opt/registry1013/whois/port43" "/opt/registry/whois/port43" +copy_files "/opt/registry1013/rdap" "/opt/registry/rdap" +copy_files "/opt/registry1013/epp" "/opt/registry/epp" +copy_files "/opt/registry1013/docs" "/opt/registry/docs" + +# Run composer update in copied directories (excluding docs) +echo "Running composer update..." + +composer_update() { + dir=$1 + if [[ -d "$dir" ]]; then + echo "Updating composer in $dir..." + cd "$dir" && composer update + else + echo "Directory $dir does not exist. Skipping composer update..." + fi +} + +# Update composer in relevant directories +composer_update "/opt/registry/automation" +composer_update "/var/www/cp" +composer_update "/opt/registry/das" +composer_update "/opt/registry/whois/port43" +composer_update "/opt/registry/rdap" +composer_update "/opt/registry/epp" + +cp /opt/registry/docs/msg_producer.service /etc/systemd/system/ +cp /opt/registry/docs/msg_worker.service /etc/systemd/system/ + +systemctl daemon-reload + +# Start services +echo "Starting services..." +systemctl start epp +systemctl start whois +systemctl start rdap +systemctl start das +systemctl start caddy + +systemctl enable msg_producer +systemctl enable msg_worker + +# Check if services started successfully +if [[ $? -eq 0 ]]; then + echo "Services started successfully. Deleting /opt/registry1013..." + rm -rf /opt/registry1013 +else + echo "There was an issue starting the services. /opt/registry1013 will not be deleted." +fi + +echo "Upgrade to v1.0.13 completed successfully." +echo "Please configure and run your Message Broker with:" +echo "systemctl start msg_producer" +echo "systemctl start msg_worker" \ No newline at end of file