Skip to content

Commit

Permalink
[php] Use non-blocking I/O for DB communication
Browse files Browse the repository at this point in the history
Just as AMPHP, ReactPHP requires asynchronous implementations for it to
work as expected. When using PDO we will block the process when
establishing the DB connection and sending queries.

This replaces the implementation with a fully async MySQL client, also
cleaning removing unnecessary extensions from the image.

Signed-off-by: Luís Cobucci <[email protected]>
  • Loading branch information
lcobucci committed Nov 17, 2023
1 parent f592585 commit 2a52437
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 103 deletions.
211 changes: 113 additions & 98 deletions frameworks/PHP/reactphp/app.php
Original file line number Diff line number Diff line change
@@ -1,134 +1,149 @@
<?php

use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface as Request;
use React\EventLoop\Loop;
use React\MySQL\ConnectionInterface as DbConnection;
use React\MySQL\Factory as DbFactory;
use React\Http\Message\Response;
use React\MySQL\QueryResult;
use React\Promise\PromiseInterface;

function init()
{
global $world, $fortune, $update;
$pdo = new PDO(
'mysql:host=tfb-database;dbname=hello_world',
'benchmarkdbuser',
'benchmarkdbpass',
[
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
PDO::ATTR_EMULATE_PREPARES => false
]
);
$world = $pdo->prepare('SELECT id,randomNumber FROM World WHERE id=?');
$update = $pdo->prepare('UPDATE World SET randomNumber=? WHERE id=?');
$fortune = $pdo->prepare('SELECT id,message FROM Fortune');
$fortune->setFetchMode(PDO::FETCH_KEY_PAIR);
}
use function React\Promise\all;

function router(Request $request)
/** @return Closure(Request):ResponseInterface */
function requestHandler(): Closure
{
return match($request->getUri()->getPath()) {
'/plaintext' => text(),
'/json' => json(),
'/db' => db(),
'/fortunes' => fortune(),
'/query' => query($request),
'/update' => updateraw($request),
// '/info' => info(),
default => new Response(404, [], 'Error 404'),
$connection = establishDbConnection('benchmarkdbuser:benchmarkdbpass@tfb-database/hello_world?idle=0.5');

$world = static function (int $id) use ($connection): PromiseInterface {
return $connection->query('SELECT id,randomNumber FROM World WHERE id=?', [$id]);
};
}

function text()
{
return new Response(200, [
'Content-Type' => 'text/plain'
], 'Hello, World!');
}
$fortune = static function () use ($connection): PromiseInterface {
return $connection->query('SELECT id,message FROM Fortune');
};

function json()
{
return new Response(200, [
'Content-Type' => 'application/json'
], json_encode(['message' => 'Hello, World!']));
$update = static function (int $id, int $randomNumber) use ($connection): PromiseInterface {
return $connection->query('UPDATE World SET randomNumber=? WHERE id=?', [$randomNumber, $id]);
};

return static function (Request $request) use ($world, $fortune, $update): ResponseInterface | PromiseInterface {
return match($request->getUri()->getPath()) {
'/plaintext' => Response::plaintext('Hello, World!'),
'/json' => Response::json(['message' => 'Hello, World!']),
'/db' => db($world),
'/fortunes' => fortune($fortune),
'/query' => query(queryCount($request), $world),
'/update' => updateraw(queryCount($request), $world, $update),
// '/info' => info(),
default => new Response(404, [], 'Error 404'),
};
};
}

function db()
{
global $world;
function establishDbConnection(
#[SensitiveParameter]
string $uri,
): DbConnection {
$connection = (new DbFactory())->createLazyConnection($uri);

$interrupt = static function () use ($connection, &$interrupt) {
$connection->quit();
};

$connection->on('close', static function () use (&$interrupt) {
Loop::removeSignal(SIGINT, $interrupt);
Loop::removeSignal(SIGTERM, $interrupt);
});

$world->execute([mt_rand(1, 10000)]);
Loop::addSignal(SIGINT, $interrupt);
Loop::addSignal(SIGTERM, $interrupt);

return new Response(200, [
'Content-Type' => 'application/json'
], json_encode($world->fetch()));
return $connection;
}

function query($request)
/** @param Closure(int):PromiseInterface $world */
function db(Closure $world): PromiseInterface
{
global $world;
$id = random_int(1, 10000);

return $world($id)->then(
static fn (QueryResult $result): ResponseInterface => Response::json($result->resultRows[0]),
);
}

$query_count = 1;
$q = (int) $request->getQueryParams()['q'];
if ($q > 1) {
$query_count = min($q, 500);
}
function queryCount(Request $request): int
{
$count = (int) ($request->getQueryParams()['q'] ?? 1);

while ($query_count--) {
$world->execute([mt_rand(1, 10000)]);
$arr[] = $world->fetch();
if ($count > 1) {
return min($count, 500);
}

return new Response(200, [
'Content-Type' => 'application/json'
], json_encode($arr));
return 1;
}

function updateraw($request)
/** @param Closure(int):PromiseInterface $world */
function query(int $queryCount, Closure $world): PromiseInterface
{
global $world, $update;
$processQueries = static function (int $count) use ($world): iterable {
while ($count--) {
$id = random_int(1, 10000);

$query_count = 1;
$q = (int) $request->getQueryParams()['q'];
if ($q > 1) {
$query_count = min($q, 500);
}
yield $world($id)->then(static fn (QueryResult $result): array => $result->resultRows[0]);
}
};

while ($query_count--) {
$id = mt_rand(1, 10000);
$world->execute([$id]);
$item = $world->fetch();
$update->execute(
[$item['randomNumber'] = mt_rand(1, 10000), $id]
);
return all($processQueries($queryCount))
->then(static fn (array $result): ResponseInterface => Response::json($result));
}

$arr[] = $item;
}
/**
* @param Closure(int):PromiseInterface $world
* @param Closure(int, int):PromiseInterface $update
*/
function updateraw(int $queryCount, Closure $world, Closure $update): PromiseInterface
{
$processQueries = static function (int $count) use ($world, $update): iterable {
while ($count--) {
$id = random_int(1, 10000);

yield $world($id)->then(
static function (QueryResult $result) use ($update): PromiseInterface {
$updated = $result->resultRows[0];
$updated['randomNumber'] = random_int(1, 10000);

return $update($updated['id'], $updated['randomNumber'])
->then(static fn (): array => $updated);
}
);
}
};

// $pdo->beginTransaction();
// foreach($arr as $world) {
// $update->execute([$world['randomNumber'], $world['id']]);
// }
// $pdo->commit();
return new Response(200, [
'Content-Type' => 'application/json'
], json_encode($arr));
return all($processQueries($queryCount))
->then(static fn (array $result): ResponseInterface => Response::json($result));
}

function fortune()
function fortune(Closure $fortune): PromiseInterface
{
global $fortune;
$formatResult = static function (array $rows): string {
$rows[] = ['id' => 0, 'message' => 'Additional fortune added at request time.'];
usort($rows, static fn (array $one, array $other) => $one['message'] <=> $other['message']);

$fortune->execute();
$html = '';

$arr = $fortune->fetchAll();
$arr[0] = 'Additional fortune added at request time.';
asort($arr);
foreach ($rows as $row) {
$message = htmlspecialchars($row['message'], ENT_QUOTES, 'UTF-8');

$html = '';
foreach ($arr as $id => $message) {
$message = htmlspecialchars($message, ENT_QUOTES, 'UTF-8');
$html .= "<tr><td>$id</td><td>$message</td></tr>";
}
$html .= "<tr><td>${row['id']}</td><td>${message}</td></tr>";
}

return "<!DOCTYPE html><html><head><title>Fortunes</title></head><body><table><tr><th>id</th><th>message</th></tr>$html</table></body></html>";
};

return new Response(200, [
'Content-Type' => 'text/html; charset=UTF-8',
], "<!DOCTYPE html><html><head><title>Fortunes</title></head><body><table><tr><th>id</th><th>message</th></tr>$html</table></body></html>"
return $fortune()->then(
static fn (QueryResult $result): ResponseInterface => Response::html($formatResult($result->resultRows)),
);
}

Expand All @@ -138,4 +153,4 @@ function fortune()
phpinfo();
return new Response(200, ['Content-Type' => 'text/plain'], ob_get_clean());
}
*/
*/
3 changes: 2 additions & 1 deletion frameworks/PHP/reactphp/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"psr/http-message": "^1.0",
"react/event-loop": "^1.5",
"react/http": "^1.9",
"react/socket": "^1.14"
"react/socket": "^1.14",
"react/mysql": "^0.6"
}
}
2 changes: 1 addition & 1 deletion frameworks/PHP/reactphp/reactphp.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ RUN apt-get update -yqq && apt-get install -yqq software-properties-common > /de
RUN LC_ALL=C.UTF-8 add-apt-repository ppa:ondrej/php
RUN apt-get update -yqq > /dev/null && \
apt-get install -yqq git unzip wget curl build-essential \
php8.2-cli php8.2-mbstring php8.2-dev php8.2-xml php8.2-curl php8.2-mysql > /dev/null
php8.2-cli php8.2-mbstring php8.2-dev > /dev/null

# An extension is required!
# We deal with concurrencies over 1k, which stream_select doesn't support.
Expand Down
4 changes: 1 addition & 3 deletions frameworks/PHP/reactphp/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
require __DIR__ . '/vendor/autoload.php';
require_once __DIR__.'/app.php';

init();

$server = new HttpServer(router(...));
$server = new HttpServer(requestHandler());
$socket = new SocketServer('0.0.0.0:8080');
$server->listen($socket);

Expand Down

0 comments on commit 2a52437

Please sign in to comment.