這幾天我在想如何在Webman框架中使用LaravelORM并支持協(xié)程。將兩者結(jié)合起來(lái),理論上可以兼顧高并發(fā)與開發(fā)效率。
在Webman中集成LaravelORM協(xié)程版,并驗(yàn)證其性能和兼容性。
環(huán)境配置
類
Illuminate\Database\Connection
public function select($query, $bindings = [], $useReadPdo = true)
{
return $this->run($query, $bindings, function ($query, $bindings) use ($useReadPdo) {
if ($this->pretending()) {
return [];
}
// For select statements, we'll simply execute the query and return an array
// of the database result set. Each element in the array will be a single
// row from the database table, and will either be an array or objects.
$statement = $this->prepared(
$this->getPdoForSelect($useReadPdo)->prepare($query)
);
$this->bindValues($statement, $this->prepareBindings($bindings));
$statement->execute();
return $statement->fetchAll();
});
}
以select方法為例可以看到上述類中,Laravel將所有對(duì)\PDO的操作都封裝在了Connection中
并提供了ConnectionInterface
的抽象接口,這意味著如果實(shí)現(xiàn)了這個(gè)接口,就可以無(wú)縫的替換掉PDO邏輯
我選用了AMPHP的MySQL客戶端庫(kù)amphp/mysql
來(lái)實(shí)現(xiàn)這個(gè)接口
<?php declare(strict_types=1);
use Amp\Mysql\MysqlConfig;
use Amp\Mysql\MysqlConnectionPool;
use Amp\Mysql\MysqlTransaction;
use Closure;
use Exception;
use Fiber;
use Generator;
use Illuminate\Database\MySqlConnection;
use Throwable;
use function boolval;
use function in_array;
use function spl_object_hash;
use function trim;
class PConnection extends MySqlConnection
{
private const ALLOW_OPTIONS = [
'host',
'port',
'user',
'password',
'db',
'charset',
'collate',
'compression',
'local-infile',
'username',
'database'
];
/*** @var MysqlConnectionPool */
private MysqlConnectionPool $pool;
/**
* @param $pdo
* @param string $database
* @param string $tablePrefix
* @param array $config
*/
public function __construct($pdo, string $database = '', string $tablePrefix = '', array $config = [])
{
parent::__construct($pdo, $database, $tablePrefix, $config);
$dsn = '';
foreach ($config as $key => $value) {
if (in_array($key, static::ALLOW_OPTIONS, true)) {
if (!$value) {
continue;
}
$key = match ($key) {
'username' => 'user',
'database' => 'db',
default => $key
};
$dsn .= "{$key}={$value} ";
}
}
$config = MysqlConfig::fromString(trim($dsn));
$this->pool = new MysqlConnectionPool($config);
// if (isset($this->pdo)) {
// unset($this->pdo);
// }
}
/**
* @return void
*/
public function beginTransaction(): void
{
$transaction = $this->pool->beginTransaction();
;
if ($fiber = Fiber::getCurrent()) {
$this->fiber2transaction[spl_object_hash($fiber)] = $transaction;
} else {
$this->fiber2transaction['main'] = $transaction;
}
}
/**
* @return void
* @throws Exception
*/
public function commit(): void
{
if ($fiber = Fiber::getCurrent()) {
$key = spl_object_hash($fiber);
} else {
$key = 'main';
}
if (!$transaction = $this->fiber2transaction[$key] ?? null) {
throw new Exception('Transaction not found');
}
$transaction->commit();
unset($this->fiber2transaction[$key]);
}
/**
* @param $toLevel
* @return void
* @throws Exception
*/
public function rollBack($toLevel = null): void
{
if ($fiber = Fiber::getCurrent()) {
$key = spl_object_hash($fiber);
} else {
$key = 'main';
}
if (!$transaction = $this->fiber2transaction[$key] ?? null) {
throw new Exception('Transaction not found');
}
$transaction->rollback();
unset($this->fiber2transaction[$key]);
}
/**
* @var MysqlTransaction[]
*/
private array $fiber2transaction = [];
/**
* @param Closure $callback
* @param int $attempts
* @return mixed
* @throws Throwable
*/
public function transaction(Closure $callback, $attempts = 1): mixed
{
$this->beginTransaction();
try {
$result = $callback($this->getTransaction());
$this->commit();
return $result;
} catch (Throwable $e) {
$this->rollBack();
throw $e;
}
}
/**
* @return MysqlTransaction|null
*/
private function getTransaction(): MysqlTransaction|null
{
if ($fiber = Fiber::getCurrent()) {
$key = spl_object_hash($fiber);
} else {
$key = 'main';
}
if (!$transaction = $this->fiber2transaction[$key] ?? null) {
return null;
}
return $transaction;
}
/**
* @param string $query
* @param array $bindings
* @param bool $useReadPdo
* @return array
*/
public function select($query, $bindings = [], $useReadPdo = true): mixed
{
return $this->run($query, $bindings, function ($query, $bindings) use ($useReadPdo) {
if ($this->pretending()) {
return [];
}
$statement = $this->pool->prepare($query);
return $statement->execute($this->prepareBindings($bindings));
});
}
/**
* @param string $query
* @param array $bindings
* @return bool
*/
public function statement($query, $bindings = []): bool
{
return $this->run($query, $bindings, function ($query, $bindings) {
if ($this->pretending()) {
return [];
}
$statement = $this->getTransaction()?->prepare($query) ?? $this->pool->prepare($query);
return boolval($statement->execute($this->prepareBindings($bindings)));
});
}
/**
* 針對(duì)數(shù)據(jù)庫(kù)運(yùn)行 select 語(yǔ)句并返回所有結(jié)果集。
*
* @param string $query
* @param array $bindings
* @param bool $useReadPdo
* @return array
*/
public function selectResultSets($query, $bindings = [], $useReadPdo = true): array
{
return $this->run($query, $bindings, function ($query, $bindings) use ($useReadPdo) {
if ($this->pretending()) {
return [];
}
$statement = $this->pool->prepare($query);
$result = $statement->execute($this->prepareBindings($bindings));
$sets = [];
while ($result = $result->getNextResult()) {
$sets[] = $result;
}
return $sets;
});
}
/**
* 針對(duì)數(shù)據(jù)庫(kù)運(yùn)行 select 語(yǔ)句并返回一個(gè)生成器。
*
* @param string $query
* @param array $bindings
* @param bool $useReadPdo
* @return Generator
*/
public function cursor($query, $bindings = [], $useReadPdo = true): Generator
{
while ($record = $this->select($query, $bindings, $useReadPdo)) {
yield $record;
}
}
/**
* 運(yùn)行 SQL 語(yǔ)句并獲取受影響的行數(shù)。
*
* @param string $query
* @param array $bindings
* @return int
*/
public function affectingStatement($query, $bindings = []): int
{
return $this->run($query, $bindings, function ($query, $bindings) {
if ($this->pretending()) {
return 0;
}
// 對(duì)于更新或刪除語(yǔ)句,我們想要獲取受影響的行數(shù)
// 通過(guò)該語(yǔ)句并將其返回給開發(fā)人員。我們首先需要
// 執(zhí)行該語(yǔ)句,然后我們將使用 PDO 來(lái)獲取受影響的內(nèi)容。
$statement = $this->pool->prepare($query);
$result = $statement->execute($this->prepareBindings($bindings));
$this->recordsHaveBeenModified(
($count = $result->getRowCount()) > 0
);
return $count;
});
}
/**
* @return void
*/
public function reconnect()
{
//TODO: 無(wú)事可做
}
/**
* @return void
*/
public function reconnectIfMissingConnection()
{
//TODO: 無(wú)事可做
}
}
實(shí)現(xiàn)了Connection之后我們還有Hook住DatabaseManager的工廠方法`
return new class ($app) extends ConnectionFactory {
/**
* Create a new connection instance.
*
* @param string $driver
* @param PDO|Closure $connection
* @param string $database
* @param string $prefix
* @param array $config
* @return SQLiteConnection|MariaDbConnection|MySqlConnection|PostgresConnection|SqlServerConnection|Connection
*
*/
protected function createConnection($driver, $connection, $database, $prefix = '', array $config = []): SQLiteConnection|MariaDbConnection|MySqlConnection|PostgresConnection|SqlServerConnection|Connection
{
return match ($driver) {
'mysql' => new PConnection($connection, $database, $prefix, $config),
'mariadb' => new MariaDbConnection($connection, $database, $prefix, $config),
'pgsql' => new PostgresConnection($connection, $database, $prefix, $config),
'sqlite' => new SQLiteConnection($connection, $database, $prefix, $config),
'sqlsrv' => new SqlServerConnection($connection, $database, $prefix, $config),
default => throw new InvalidArgumentException("Unsupported driver [{$driver}]."),
};
}
}
為了驗(yàn)證上述無(wú)縫耦合的最終效果,我準(zhǔn)備將它安裝到Webman
我封裝了一個(gè)
Database
類,用于Hook住Laravel的DatabaseManager
use Illuminate\Container\Container;
use Illuminate\Database\Capsule\Manager;
use Illuminate\Database\DatabaseManager;
use Illuminate\Events\Dispatcher;
use Illuminate\Pagination\Cursor;
use Illuminate\Pagination\CursorPaginator;
use Illuminate\Pagination\Paginator;
use Psc\Drive\Laravel\Coroutine\Database\Factory;
use function class_exists;
use function config;
use function get_class;
use function method_exists;
use function request;
class Database extends Manager
{
/**
* @return void
*/
public static function install(): void
{
/**
* 判斷是否安裝Webman
*/
if (!class_exists(\support\Container::class)) {
return;
}
/**
* 判斷是否曾被Hook
*/
if (isset(parent::$instance) && get_class(parent::$instance) === Database::class) {
return;
}
/**
* Hook webman LaravelDB
*/
$config = config('database', []);
$connections = $config['connections'] ?? [];
if (!$connections) {
return;
}
$app = Container::getInstance();
/**
* Hook數(shù)據(jù)庫(kù)連接工廠
*/
$capsule = new Database($app);
$default = $config['default'] ?? false;
if ($default) {
$defaultConfig = $connections[$config['default']] ?? false;
if ($defaultConfig) {
$capsule->addConnection($defaultConfig);
}
}
foreach ($connections as $name => $config) {
$capsule->addConnection($config, $name);
}
if (class_exists(Dispatcher::class) && !$capsule->getEventDispatcher()) {
$capsule->setEventDispatcher(\support\Container::make(Dispatcher::class, [Container::getInstance()]));
}
// Set as global
$capsule->setAsGlobal();
$capsule->bootEloquent();
// Paginator
if (class_exists(Paginator::class)) {
if (method_exists(Paginator::class, 'queryStringResolver')) {
Paginator::queryStringResolver(function () {
$request = request();
return $request?->queryString();
});
}
Paginator::currentPathResolver(function () {
$request = request();
return $request ? $request->path() : '/';
});
Paginator::currentPageResolver(function ($pageName = 'page') {
$request = request();
if (!$request) {
return 1;
}
$page = (int)($request->input($pageName, 1));
return $page > 0 ? $page : 1;
});
if (class_exists(CursorPaginator::class)) {
CursorPaginator::currentCursorResolver(function ($cursorName = 'cursor') {
return Cursor::fromEncoded(request()->input($cursorName));
});
}
}
parent::$instance = $capsule;
}
/**
* Hook Factory
* @return void
*/
protected function setupManager(): void
{
$factory = new Factory($this->container);
$this->manager = new DatabaseManager($this->container, $factory);
}
}
為了更直觀的展現(xiàn)協(xié)程的效果,我將webman-worker數(shù)量改為了1,并且在每次請(qǐng)求中都會(huì)進(jìn)行數(shù)據(jù)庫(kù)查詢
/**
* @param Request $request
* @return string
*/
public function index(Request $request): string
{
// 手動(dòng)Hook調(diào)DatabaseManager
Database::install();
// 記錄執(zhí)行時(shí)間
$startTime = microtime(true);
// 模擬一個(gè)耗時(shí)1s的查詢
$result = Db::statement('SELECT SLEEP(1);');
// 記錄結(jié)束時(shí)間
$endTime = microtime(true);
// 輸出結(jié)果
return "{$startTime} - {$endTime}";
}
<?php declare(strict_types=1);
namespace Tests;
use GuzzleHttp\Client;
use PHPUnit\Framework\TestCase;
use Psc\Plugins\Guzzle\PHandler;
use function P\async;
use function P\tick;
class CoroutineTest extends TestCase
{
public function test_main(): void
{
$client = new Client(['handler' => new PHandler(['pool'=>0])]);
for ($i = 0; $i < 100; $i++) {
async(function () use ($client, $i) {
$response = $client->get('http://127.0.0.1:8787/');
$responseContent = $response->getBody()->getContents();
echo "Request $i: $responseContent\n";
});
}
tick();
$this->assertEquals(1, 1);
}
}
Request 0: 1723015194.3121 - 1723015195.4183
Request 1: 1723015194.3389 - 1723015195.4193
Request 2: 1723015194.339 - 1723015195.4196
Request 3: 1723015194.3391 - 1723015195.4187
Request 4: 1723015194.3391 - 1723015195.4198
Request 5: 1723015194.3392 - 1723015195.42
Request 6: 1723015194.3393 - 1723015195.4202
Request 7: 1723015194.3394 - 1723015195.4204
Request 8: 1723015194.3394 - 1723015195.4588
Request 9: 1723015194.3395 - 1723015195.4595
Request 10: 1723015194.3395 - 1723015195.4626
Request 11: 1723015194.3396 - 1723015195.4633
Request 12: 1723015194.3397 - 1723015195.4653
Request 13: 1723015194.3398 - 1723015195.4658
Request 14: 1723015194.3398 - 1723015195.4688
Request 15: 1723015194.3399 - 1723015195.4726
Request 16: 1723015194.34 - 1723015195.4735
Request 17: 1723015194.34 - 1723015195.4774
Request 18: 1723015194.3401 - 1723015195.48
Request 19: 1723015194.3402 - 1723015195.4805
Request 20: 1723015194.3402 - 1723015195.4816
Request 21: 1723015194.3403 - 1723015195.4818
Request 22: 1723015194.3404 - 1723015195.4862
Request 23: 1723015194.3404 - 1723015195.4911
Request 24: 1723015194.3405 - 1723015195.4915
Request 25: 1723015194.3406 - 1723015195.4917
Request 26: 1723015194.3406 - 1723015195.4919
Request 27: 1723015194.3408 - 1723015195.4921
Request 28: 1723015194.3408 - 1723015195.4923
Request 29: 1723015194.3409 - 1723015195.4925
Request 30: 1723015194.3409 - 1723015195.4933
Request 31: 1723015194.341 - 1723015195.4935
Request 32: 1723015194.341 - 1723015195.4936
Request 33: 1723015194.3411 - 1723015195.4938
Request 34: 1723015194.3412 - 1723015195.494
Request 35: 1723015194.3412 - 1723015195.4941
Request 36: 1723015194.3413 - 1723015195.4943
Request 37: 1723015194.3414 - 1723015195.4944
Request 38: 1723015194.3414 - 1723015195.4946
Request 39: 1723015194.3416 - 1723015195.4947
Request 40: 1723015194.3417 - 1723015195.4949
Request 41: 1723015194.3418 - 1723015195.495
Request 42: 1723015194.342 - 1723015195.5174
Request 43: 1723015194.3421 - 1723015195.518
Request 44: 1723015194.3423 - 1723015195.5184
Request 45: 1723015194.3425 - 1723015195.5191
Request 46: 1723015194.3426 - 1723015195.5194
Request 48: 1723015194.3429 - 1723015195.5215
Request 50: 1723015194.3433 - 1723015195.5219
Request 51: 1723015194.3435 - 1723015195.5221
Request 47: 1723015194.3428 - 1723015195.5225
Request 49: 1723015194.3431 - 1723015195.523
Request 52: 1723015194.3436 - 1723015195.5265
Request 53: 1723015194.3437 - 1723015195.5268
Request 54: 1723015194.3439 - 1723015195.527
Request 55: 1723015194.344 - 1723015195.5275
Request 56: 1723015194.3443 - 1723015195.5282
Request 57: 1723015194.3444 - 1723015195.5314
Request 58: 1723015194.3445 - 1723015195.5316
Request 59: 1723015194.3445 - 1723015195.5318
Request 60: 1723015194.3446 - 1723015195.5323
Request 61: 1723015194.3448 - 1723015195.5324
Request 62: 1723015194.3449 - 1723015195.5326
Request 63: 1723015194.345 - 1723015195.5328
Request 64: 1723015194.3451 - 1723015195.533
Request 65: 1723015194.3453 - 1723015195.5331
Request 66: 1723015194.3455 - 1723015195.5437
Request 67: 1723015194.3456 - 1723015195.5441
Request 69: 1723015194.3458 - 1723015195.5443
Request 70: 1723015194.3459 - 1723015195.5445
Request 71: 1723015194.346 - 1723015195.5448
Request 72: 1723015194.3464 - 1723015195.5451
Request 68: 1723015194.3457 - 1723015195.5456
Request 73: 1723015194.3471 - 1723015195.5508
Request 74: 1723015194.3475 - 1723015195.551
Request 75: 1723015194.3478 - 1723015195.5512
Request 76: 1723015194.3482 - 1723015195.5516
Request 77: 1723015194.3486 - 1723015195.5518
Request 78: 1723015194.3489 - 1723015195.5542
Request 79: 1723015194.3491 - 1723015195.5545
Request 80: 1723015194.3492 - 1723015195.5549
Request 81: 1723015194.3493 - 1723015195.5605
Request 82: 1723015194.3493 - 1723015195.561
Request 83: 1723015194.3494 - 1723015195.5633
Request 84: 1723015194.3494 - 1723015195.5638
Request 85: 1723015194.3495 - 1723015195.5641
Request 86: 1723015194.3496 - 1723015195.5661
Request 87: 1723015194.3496 - 1723015195.5678
Request 88: 1723015194.3497 - 1723015195.5681
Request 89: 1723015194.3499 - 1723015195.5684
Request 90: 1723015194.35 - 1723015195.5685
Request 91: 1723015194.3501 - 1723015195.5756
Request 92: 1723015194.3502 - 1723015195.5758
Request 93: 1723015194.3504 - 1723015195.576
Request 94: 1723015194.3505 - 1723015195.5768
Request 95: 1723015194.3506 - 1723015195.577
Request 96: 1723015194.3508 - 1723015195.5772
Request 97: 1723015194.3509 - 1723015195.5774
Request 98: 1723015194.3509 - 1723015195.5777
Request 99: 1723015194.351 - 1723015195.5781
通過(guò)上述實(shí)踐,我們可以看到LaravelORM與Webman中使用協(xié)程是非常具有可行性的,
并且在高并發(fā)&慢查詢場(chǎng)景下,協(xié)程的優(yōu)勢(shì)也在此得到了充分的體現(xiàn)
666,剛看到這篇,官網(wǎng)文檔倒是還沒(méi)有,等你這個(gè)上了。Guzzle http 本身支持異步,clichouse 用 smi2/phpclickhouse
也有 selectAsync
異步查詢,再加上 Eloquent Mysql,就基本滿足我需求了,期待
牛皮