This setup includes:
1️⃣ On-Demand Worker → Starts only when needed & exits after job completion.
2️⃣ Continuous Worker → Runs 24/7, processing jobs continuously.
3️⃣ Queue File (queue.php
) → Adds jobs to Beanstalkd.
4️⃣ Supervisor Configuration → Manages continuous workers.
✅ Step 1: Install Required Packages
If not installed, install Beanstalkd and Composer dependencies:
sudo apt install beanstalkd -y
composer require pda/pheanstalk
Start Beanstalkd:
sudo systemctl enable --now beanstalkd
✅ Step 2: Queue File (queue.php
)
This file adds jobs to Beanstalkd.
📄 queue.php
<?php
require 'vendor/autoload.php';
use Pheanstalk\Pheanstalk;
$pheanstalk = Pheanstalk::create('127.0.0.1');
$tube = 'testtube';
// Example job data
$data = [
'task' => 'send_email',
'email' => '[email protected]',
'message' => 'Hello, your order is confirmed!',
'timestamp' => time(),
];
// Add job to queue
$pheanstalk->useTube($tube)->put(json_encode($data));
echo "Job added to queue.\n";
🔹 Run this file every time you need to add a job:
php queue.php
✅ Step 3: On-Demand Worker (worker-once.php
)
This worker runs only when needed and exits after finishing the job.
📄 worker-once.php
<?php require 'vendor/autoload.php'; use Pheanstalk\Pheanstalk; $pheanstalk = Pheanstalk::create('127.0.0.1'); $tube = 'testtube'; // Try to reserve a job (wait up to 5 seconds) $job = $pheanstalk->watch($tube)->reserve(5); if ($job) { $data = json_decode($job->getData(), true); echo "Processing job: " . print_r($data, true) . "\n"; // Simulate job execution sleep(2); // Remove job from queue $pheanstalk->delete($job); echo "Job completed.\n"; } else { echo "No job found. Exiting.\n"; exit; }
🔹 Run this manually or with exec() when needed:
php worker-once.php
🔹 Or trigger it via PHP when a job is added:
exec("php worker-once.php > /dev/null 2>&1 &");
✅ Step 4: Continuous Worker (worker.php
)
This worker runs indefinitely and processes jobs as they arrive.
📄 worker.php
<?php require 'vendor/autoload.php'; use Pheanstalk\Pheanstalk; $pheanstalk = Pheanstalk::create('127.0.0.1'); $tube = 'testtube'; echo "Continuous worker started...\n"; while (true) { $job = $pheanstalk->watch($tube)->reserve(); if ($job) { $data = json_decode($job->getData(), true); echo "Processing job: " . print_r($data, true) . "\n"; // Simulate job execution sleep(2); // Remove job from queue $pheanstalk->delete($job); echo "Job completed.\n"; } }
🔹 This will keep running & never exit unless stopped manually.
✅ Step 5: Supervisor Configuration
Now, let's configure Supervisor to manage the continuous worker.
1️⃣ Open Supervisor config file
sudo nano /etc/supervisor/conf.d/worker.conf
2️⃣ Add this configuration:
[program:worker] command=php /path/to/worker.php autostart=true autorestart=true numprocs=5 redirect_stderr=true stdout_logfile=/var/log/worker.log
3️⃣ Apply changes:
sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start worker
🔹 This will run 5 continuous worker instances and restart them if they crash.
✅ Final Workflow
-
Add jobs using
php queue.php
- On-demand worker (
worker-once.php
) runs when needed & exits after completing a job. - Continuous worker (
worker.php
) keeps running to process jobs in real-time. - Supervisor ensures continuous workers stay running.
🎯 Bonus: Automate On-Demand Workers
If you want to start on-demand workers dynamically based on job count, use this PHP script:
📄 start-workers.php
<?php exec("php worker-once.php > /dev/null 2>&1 &"); exec("php worker-once.php > /dev/null 2>&1 &"); exec("php worker-once.php > /dev/null 2>&1 &"); exec("php worker-once.php > /dev/null 2>&1 &"); exec("php worker-once.php > /dev/null 2>&1 &"); echo "Started 5 on-demand workers.\n";
🔹 Run this to start 5 workers only when needed:
php start-workers.php
🚀 Final Takeaways
✅ On-demand worker (worker-once.php
) → Runs & exits after finishing one job.
✅ Continuous worker (worker.php
) → Runs indefinitely for real-time job processing.
✅ Queue file (queue.php
) → Adds jobs to Beanstalkd.
✅ Supervisor manages continuous workers → Ensures worker.php
always runs.
✅ On-demand workers can be triggered dynamically when jobs are added.
💡 This is a scalable, efficient PHP job queue system! 🚀