Laravel 集成消息队列完整指南
Laravel 提供了优雅的消息队列系统,支持多种队列驱动,让异步任务处理变得简单。以下是 Laravel 集成消息队列的详细方案。
一、队列驱动选择
Laravel 原生支持以下队列驱动:
- Database - 使用数据库作为队列存储
- Redis - 高性能内存队列
- Beanstalkd - 轻量级专用队列服务
- Amazon SQS - AWS 托管队列服务
- RabbitMQ - 通过第三方包支持
二、配置消息队列
1. 安装配置
# 安装依赖(以Redis为例)
composer require predis/predis
配置 .env
文件:
QUEUE_CONNECTION=redis
REDIS_HOST=127.0.0.1
REDIS_PASSWORD=null
REDIS_PORT=6379
2. 创建任务
php artisan make:job ProcessPodcast
生成的任务类位于 app/Jobs/ProcessPodcast.php
:
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 创建新任务实例
*/
public function __construct($podcast)
{
$this->podcast = $podcast;
}
/**
* 执行任务
*/
public function handle()
{
// 处理播客...
logger()->info('Processing podcast: '.$this->podcast->title);
}
}
三、使用队列
1. 分发任务
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
// 基本分发
ProcessPodcast::dispatch($podcast);
// 延迟分发(60秒后执行)
ProcessPodcast::dispatch($podcast)->delay(now()->addSeconds(60));
// 指定队列
ProcessPodcast::dispatch($podcast)->onQueue('processing');
2. 运行队列 worker
# 启动基础worker
php artisan queue:work
# 指定队列和连接
php artisan queue:work --queue=high,default --tries=3
# 守护进程模式(生产环境推荐)
php artisan queue:work --daemon
四、高级功能
1. 队列优先级
// 在任务类中设置
public $queue = 'high';
或分发时指定:
ProcessPodcast::dispatch($podcast)->onQueue('high');
2. 失败处理
创建失败任务表:
php artisan queue:failed-table
php artisan migrate
在任务类中定义失败处理:
public function failed(Throwable $exception)
{
// 发送失败通知...
}
查看失败任务:
php artisan queue:failed
重试失败任务:
php artisan queue:retry all
3. 队列监控
使用 Horizon(Redis专属):
composer require laravel/horizon
php artisan horizon
五、生产环境部署
1. Supervisor 配置
安装 Supervisor:
sudo apt-get install supervisor
创建配置文件 /etc/supervisor/conf.d/laravel-worker.conf
:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /path/to/your/project/artisan queue:work --sleep=3 --tries=3
autostart=true
autorestart=true
user=www-data
numprocs=8
redirect_stderr=true
stdout_logfile=/path/to/your/project/storage/logs/worker.log
启动 Supervisor:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
2. 性能优化建议
- 根据服务器核心数设置
numprocs
(通常为CPU核心数的2-3倍) - 对于CPU密集型任务,减少并发进程数
- 使用
--memory
参数限制内存使用 - 定期重启 worker 防止内存泄漏:
; 在Supervisor配置中添加 stopwaitsecs=3600
六、不同驱动对比
驱动 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
Database | 无需额外服务,简单 | 性能最低 | 小流量,开发环境 |
Redis | 高性能,Laravel最佳支持 | 需要Redis服务器 | 大多数生产环境 |
Beanstalk | 轻量级,专用队列服务 | 需要单独安装 | 需要简单队列系统 |
SQS | 完全托管,无需运维 | AWS依赖,有额外成本 | AWS环境应用 |
RabbitMQ | 功能全面,企业级 | 配置复杂 | 复杂消息路由需求 |
七、最佳实践
-
任务设计原则:
- 保持任务小巧专注
- 避免在任务构造函数中处理业务逻辑
- 任务应具有幂等性(可重复执行不产生副作用)
-
错误处理:
// 在任务类中 public $tries = 3; public $backoff = [60, 120, 300];
-
批量处理:
// 使用批次 $batch = Bus::batch([ new ProcessPodcast($podcast1), new ProcessPodcast($podcast2), ])->then(function (Batch $batch) { // 所有任务成功完成... })->catch(function (Batch $batch, Throwable $e) { // 首个任务失败... })->dispatch();
-
队列隔离:
- 将耗时任务与即时任务分开队列
- 为失败率高的任务设置专门队列
通过以上配置,您可以在 Laravel 应用中高效地集成消息队列系统,实现异步任务处理,提升应用性能和用户体验。
No Comments