Laravel队列源代码解析 这边文章中,将简单梳理一下Laravel队列的源代码,主要弄清楚代码的执行流.
我们将从消费队列的两个终端命令开始并以queue:work为重点
一、./artisan queue:listen –timeout=30 –tries=3 首先打开项目根下的artisan文件,可以看到调用了Illuminate\Console\Kernel对象的handle()方法
1 2 3 4 5 6 7 $kernel = $app ->make (Illuminate\Contracts\Console\Kernel ::class );$status = $kernel ->handle ( $input = new Symfony\Component\Console\Input\ArgvInput , new Symfony\Component\Console\Output\ConsoleOutput );
定位到Illuminate\Console\Kernel::handle()函数, try-catch中的主代码只有两行.其中$this->bootstrap()启动/绑定/延迟启动 一些系统服务.
1 2 $this ->bootstrap ();return $this ->getArtisan ()->run ($input , $output );
继续定位到getArtisan()函数, 这里实例化了一个Artisan对象并返回.结合上面的代码,知道这里依次执行了Artisan::resolveCommands()和Artisan::run()
1 2 3 4 5 6 7 use Illuminate \Console \Application as Artisan ; ...if (is_null ($this ->artisan)) { return $this ->artisan = (new Artisan ($this ->app, $this ->events, $this ->app->version ())) ->resolveCommands ($this ->commands); }return $this ->artisan;
继续dig, 在Artisan的父类Symfony\Component\Console\Application as SymfonyApplication中找到run方法
run方法中$exitCode = $this->doRun($input, $output);
doRun方法中$exitCode = $this->doRunCommand($command, $input, $output);
doRunCommand方法中$exitCode = $command->run($input, $output);
到这里,看到调用了$command对象的run方法.$command来自doRun方法中$command = $this->find($name);.这里不再继续调试,直接给出$command对象的一些信息
1 2 3 4 5 6 7 8 9 10 11 Illuminate\Queue\Console\ListenCommand { } ... ...
追踪Illuminate\Queue\Console\ListenCommand::fire()
1 2 3 $this ->listener->listen ( $connection , $queue , $delay , $memory , $timeout );
其中的$this->listener
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Illuminate\Queue\Listener { class : "Illuminate \Queue \Console \ListenCommand " this : Illuminate \Queue \Console \ListenCommand { parameters: array :2 [ "$type " => [] "$line " => [] ] line: "107 to 109" } }
追踪到listen函数
1 2 3 4 5 6 7 public function listen ($connection , $queue , $delay , $memory , $timeout = 60 ) { $process = $this ->makeProcess ($connection , $queue , $delay , $memory , $timeout ); while (true ) { $this ->runProcess ($process , $memory ); } }
追踪runProcess函数,找到Process类的
1 2 3 4 5 6 7 public function run ($callback = null ) { $this ->start ($callback ); return $this ->wait (); }'/usr/bin/php5' 'artisan' queue:work '' --queue='default' --delay=0 --memory=128 --sleep=3 --tries=3 --env='local'
到这里,已经很清楚了../artisan queue:listen启动了一个死循环, 不停的尝试执行./artisan queue:work
二、./artisan queue:work –daemon –tries=3 同上, 不同之处在于, doRunCommand方法中$exitCode = $command->run($input, $output);中$command
1 2 3 4 5 Illuminate\Queue\Console\WorkCommand { ... ...
追踪Illuminate\Queue\Console\WorkCommand::fire()
1 2 3 $response = $this ->runWorker ( $connection , $queue , $delay , $memory , $this ->option ('daemon' ) );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 protected function runWorker ($connection , $queue , $delay , $memory , $daemon = false ) { if ($daemon ) { $this ->worker->setCache ($this ->laravel['cache' ]->driver ()); $this ->worker->setDaemonExceptionHandler ( $this ->laravel['Illuminate\Contracts\Debug\ExceptionHandler' ] ); return $this ->worker->daemon ( $connection , $queue , $delay , $memory , $this ->option ('sleep' ), $this ->option ('tries' ) ); } return $this ->worker->pop ( $connection , $queue , $delay , $this ->option ('sleep' ), $this ->option ('tries' ) ); }
runWorker中的$this->worker
1 2 3 Illuminate\Queue\Worker {
首先对于不带--daemon的命令,那么上文中最后提到./artisan queue:work执行的就是$this->worker->pop.从队列列表中pop一个任务出来执行.执行完成之后sleep若干秒,进入while(true)死循环,无限重试pop.或者在内存使用超过--memory限制时,调用die终止进程.
然后, 继续dig --daemon模式, 找到Illuminate\Queue\Worker::daemon()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public function daemon ($connectionName , $queue = null , $delay = 0 , $memory = 128 , $sleep = 3 , $maxTries = 0 ) { $lastRestart = $this ->getTimestampOfLastQueueRestart (); while (true ) { if ($this ->daemonShouldRun ()) { $this ->runNextJobForDaemon ( $connectionName , $queue , $delay , $sleep , $maxTries ); } else { $this ->sleep ($sleep ); } if ($this ->memoryExceeded ($memory ) || $this ->queueShouldRestart ($lastRestart )) { $this ->stop (); } } }
这里还是一个死循环.依次执行
尝试daemonShouldRun(), 若网站启用了maintaince模式则返回FALSE,否则广播'illuminate.queue.looping'事件
runNextJobForDaemon()函数中,仍然是尝试从队列列表中pop任务出来执行
若daemonShouldRun()检测到FALSE, sleep若干秒
内存检测,在内存使用超过--memory限制时,调用die终止进程.