Reference
Triggering Async Tasks
Application resources requiring lengthy processing are not uncommon. In order to prevent these processes from impacting user experience, particularly when the user does not need to wait for the process to complete, we often delegate these to a message queue.
While message queues are powerful, they also require additional infrastructure for your application, and can be hard to justify when you have a small number of heavy processes, or a small number of users.
In order to facilitate async processing, Swoole servers provides task worker processes, allowing your application to trigger tasks without the need for an external message queue, and without impacting the server worker processes — allowing your application to continue responding to requests while the server processes your task.
Configuring the Server Process
In order to take advantage of this feature, you will first need to configure the
server to start up task workers. In your local configuration for the server,
you'll need to add task_worker_num
. The number of workers you configure define
the number of concurrent tasks that can be executed at once. Tasks are queued in
the order that they triggered, meaning that a task_worker_num
of 1 will offer
no concurrency and tasks will execute one after the other.
'zend-expressive-swoole' => [
'swoole-http-server' => [
'host' => '127.0.0.1',
'port' => 8080,
'options' => [
'worker_num' => 4, // The number of HTTP Server Workers
'task_worker_num' => 4, // The number of Task Workers
],
],
];
No CLI option for task_worker_num
Unlike
worker_num
, there is no CLI option fortask_worker_num
. This is because enabling the task worker also requires registering a task worker with the server. To prevent accidental startup failures due to passing an option to specify the number of task workers without having registered a task worker, we omitted the CLI option.
Task Event Handlers
When task workers are enabled, the Swoole server will now require that you register two event callbacks with the server; without them, the server will refuse to start.
The two events are:
task
, which will define the code for handling tasks.finish
, which will execute when a task has completed.
Registering the Handlers
The signature for the task
event handler is:
function (
\Swoole\Http\Server $server,
int $taskId,
int $sourceWorkerId,
$dataForWorker
) : void
where:
$server
is the main HTTP server process$taskId
is a number that increments each time the server triggers a new task.$sourceWorkerId
is an integer that defines the worker process that is executing the workload.$dataForWorker
contains the value passed to the$server->task()
method when initially triggering the task. This value can be any PHP value, with the exception of aresource
.
To register the handler with the server, you must call it's on()
method,
before the server has been started:
$server->on('task', $callable);
As previously mentioned, you must also register an event handler for the
finish
event. This callback for this event should have the following
signature:
function (
\Swoole\Http\Server $server,
int $taskId,
$userData
) : void
The first two parameters are identical to the task
event handler. The
$userData
parameter will contain the return value of the task
event
handler.
Registering your callable for the finish
event is accomplished like this:
$server->on('finish', $callable);
There can be only one
There can be only one event handler per event type. Subsequent calls to
on('<EventName>')
replace the previously registered callable.Finishing a task
If you do not return anything from your
task
event handler, thefinish
handler will not be called. The Swoole documentation recommends that the task worker callback manually finish the task in these situations:$server->finish('');
Even if you do not call the above method, the handler must be defined, or the server will refuse to start.
An example task worker
The following example code illustrates a task worker with logging capabilities that uses a message notifier to process data:
// In src/App/TaskWorker.php:
namespace App;
use Psr\EventDispatcher\MessageInterface;
use Psr\EventDispatcher\MessageNotifierInterface;
use Psr\Log\LoggerInterface;
use Throwable;
class TaskWorker
{
private $notifier;
private $logger;
public function __construct(LoggerInterface $logger, MessageNotifierInterface $notifier)
{
$this->logger = $logger;
$this->notifier = $notifier;
}
public function __invoke($server, $taskId, $fromId, $data)
{
if (! $data instanceof MessageInterface) {
$this->logger->error('Invalid data type provided to task worker: {type}', [
'type' => is_object($data) ? get_class($data) : gettype($data)
]);
return;
}
$this->logger->notice('Starting work on task {taskId} using data: {data}', [
'taskId' => $taskId,
'data' => json_encode($data),
]);
try {
$this->notifier->notify($data);
} catch (Throwable $e) {
$this->logger->error('Error processing task {taskId}: {error}', [
'taskId' => $taskId,
'error' => $e->getTraceAsString(),
]);
}
// Notify the server that processing of the task has finished:
$server->finish('');
}
}
This invokable class needs to be attached to the $server->on('task')
event
before the server has started. The easiest place to accomplish this is in a
delegator factory
targeting the Swoole HTTP server. First, we'll create the delegator factory:
// In src/App/TaskWorkerDelegator.php:
namespace App;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Swoole\Http\Server as HttpServer;
class TaskWorkerDelegator
{
public function __invoke(ContainerInterface $container, $serviceName, callable $callback) : HttpServer
{
$server = $callback();
$logger = $container->get(LoggerInterface::class);
$server->on('task', $container->get(TaskWorker::class));
$server->on('finish', function ($server, $taskId, $data) use ($logger) {
$logger->notice('Task #{taskId} has finished processing', ['taskId' => $taskId]);
});
return $server;
}
}
Next, we'll register it with our container:
// In config/autoload/dependencies.php:
return [
'dependencies' => [
'delegators' => [
\Swoole\Http\Server::class => [
\App\TaskWorkerDelegator::class,
],
],
],
];
With this in place, we can now trigger tasks within our application. In the scenario outlined above, the task worker expects messages; it then notifies listeners of that message so they may respond to it.
Triggering Tasks in Middleware
Considering that this library provides an application runner for middleware
applications, you will likely trigger tasks from within your middleware or
request handlers. In each case, you will need to compose the Swoole HTTP server
instance as a class dependency, as tasks are triggered via the server via its
task()
method. The method can accept any value except a resource as an
argument.
In the example below, ContactMessage
will implement the MessageInterface
from the above example. The request handler uses values from the request to
create the ContactMessage
instance, and then create a task from it. It then
immediately returns a response.
// in src/App/Handler/TaskTriggeringHandler.php:
namespace App\Handler;
use Psr\Http\Message\ResponseFactoryInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Server\RequestHandlerInterface;
use Swoole\Http\Server as HttpServer;
use Zend\Expressive\Template\TemplateRendererInterface;
class TaskTriggeringHandler implements RequestHandlerInterface
{
private $responseFactory;
private $server;
private $template;
public function __construct(
HttpServer $server,
TemplateRendererInterface $template,
ResponseFactoryInterface $responseFactory
) {
$this->server = $server;
$this->template = $template;
$this->responseFactory = $responseFactory;
}
public function handle(ServerRequestInterface $request) : ResponseInterface
{
// Gather data from request
$data = $request->getParsedBody();
// A fictonal event describing a contact request:
$event = new ContactEvent([
'to' => $data['email'],
'subject' => $data['subject'],
'message' => $data['message'],
]);
// task() returns a task identifier, if you want to use it; otherwise,
// you can ignore the return value.
$taskIdentifier = $this->server->task($event);
// The task() method is asynchronous, so execution continues immediately.
$response = ($this->responseFactory()->createResponse())
->withHeader('Content-Type', 'text/html');
$response->getBody()->write($this->template->render('contact::thank-you', []);
return $response;
}
}
Found a mistake or want to contribute to the documentation? Edit this page on GitHub!